]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.common/src/org/simantics/db/common/utils/RequestUtil.java
Use listener as listener in TimeoutingRequest
[simantics/platform.git] / bundles / org.simantics.db.common / src / org / simantics / db / common / utils / RequestUtil.java
1 package org.simantics.db.common.utils;
2
3 import java.util.concurrent.Semaphore;
4 import java.util.concurrent.TimeUnit;
5 import java.util.concurrent.atomic.AtomicInteger;
6
7 import org.simantics.db.ReadGraph;
8 import org.simantics.db.Session;
9
10 import org.simantics.db.exception.DatabaseException;
11 import org.simantics.db.procedure.Listener;
12 import org.simantics.db.procedure.Procedure;
13 import org.simantics.db.request.Read;
14
15 /**
16  * @author Tuukka Lehtonen
17  */
18 public final class RequestUtil {
19
20         private static final boolean TRACE_SCHEDULE = false;
21         private static final boolean TRACE_WAITED_TIME = false;
22         private static final boolean TRACE_TIMEOUT  = false;
23         private static final boolean TRACE_RESULT_TIME = false;
24
25         static class ResultProcedure<R> extends Semaphore implements Procedure<R> {
26
27                 private static final long serialVersionUID = 8645926264625496924L;
28
29                 private static final Object NO_RESULT = new Object();
30                 private Object resultOrException = NO_RESULT;
31
32                 public ResultProcedure() {
33                         super(0);
34                 }
35
36                 @Override
37                 public void execute(R result) {
38                         resultOrException = result;
39                         release();
40                 }
41
42                 @Override
43                 public void exception(Throwable t) {
44                         resultOrException = t;
45                         release();
46                 }
47
48                 @SuppressWarnings("unchecked")
49                 public R waitForResultOrException(long timeout, R timeoutResult) throws DatabaseException, InterruptedException {
50                         long nt0 = System.nanoTime();
51                         if (timeout < 0) {
52                                 acquire();
53                         } else {
54                                 tryAcquire(timeout, TimeUnit.MILLISECONDS);
55                         }
56                         Object r = resultOrException;
57                         if (TRACE_WAITED_TIME)
58                                 System.out.println("slept waiting for result for " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");
59                         if (r == NO_RESULT) {
60                                 if (TRACE_TIMEOUT)
61                                         System.out.println("timed out (" + timeout + " ms) waiting for request result, returning default " + timeoutResult);
62                                 return timeoutResult;
63                         }
64                         if (r instanceof Throwable) {
65                                 if (r instanceof DatabaseException)
66                                         throw (DatabaseException) r;
67                                 throw new DatabaseException((Throwable) r);
68                         }
69                         if (TRACE_RESULT_TIME)
70                                 System.out.println("got result in " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");
71                         return (R) r;
72                 }
73
74         }
75
76         static class TimeoutingRequest<R> extends Semaphore implements Read<R> {
77
78                 private static final long serialVersionUID = -5216095211800988013L;
79
80                 Read<R> wrappedRequest;
81                 Procedure<R> wrappedProcedure;
82                 /**
83                  * 0 = not running, not timed out
84                  * 1 = running
85                  * 2 = timed out
86                  */
87                 AtomicInteger state = new AtomicInteger();
88
89                 public TimeoutingRequest(Read<R> request, Procedure<R> procedure) {
90                         super(0);
91                         this.wrappedRequest = request;
92                         this.wrappedProcedure = procedure;
93                 }
94
95                 @Override
96                 public R perform(ReadGraph graph) throws DatabaseException {
97                         if (state.compareAndSet(0, 1)) {
98                                 release();
99                                 if(wrappedProcedure != null) {
100                                         if(wrappedProcedure instanceof Listener<?>) {
101                                                 return graph.syncRequest(wrappedRequest, (Listener<R>)wrappedProcedure);
102                                         } else {
103                                                 return graph.syncRequest(wrappedRequest, wrappedProcedure);
104                                         }
105                                 } else {
106                                         return graph.syncRequest(wrappedRequest);
107                                 }
108                         } else {
109                                 if (TRACE_TIMEOUT)
110                                         System.out.println("[" + System.currentTimeMillis() + "] canceling execution of request " + wrappedRequest + ", startup timed out");
111                                 release();
112                                 return null;
113                         }
114                 }
115
116                 public boolean waitForRequestStart(long timeout) throws InterruptedException {
117                         long nt0 = System.nanoTime();
118                         tryAcquire(timeout, TimeUnit.MILLISECONDS);
119                         if (TRACE_WAITED_TIME)
120                                 System.out.println("[" + wrappedRequest + "] request start wait time " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");
121                         if (state.compareAndSet(0, 2)) {
122                                 if (TRACE_TIMEOUT)
123                                         System.out.println("[" + wrappedRequest + "] starting of request timed out (>= " + timeout + " ms)");
124                                 return false;
125                         }
126                         return true;
127                 }
128
129         }
130
131         /**
132          * @param processor
133          *            database session handle
134          * @param requestStartTimeout
135          *            timeout in milliseconds for the request execution to start. If
136          *            request execution is not started within this period of time,
137          *            timeoutResult<code>timeoutResult</code>requestExecutionTimeout
138          *            will be returned. Must be &ge; 0.
139          * @param requestExecutionTimeout
140          *            the maximum time to wait for the request execution to complete
141          *            from the time the request execution was started. If negative,
142          *            there is no timeout.
143          * @param timeoutResult
144          *            the result
145          * @param read
146          * @param procedure
147          * @return
148          * @throws DatabaseException
149          * @throws InterruptedException
150          */
151         public static final <R> R trySyncRequest(
152                         Session session,
153                         long requestStartTimeout,
154                         long requestExecutionTimeout,
155                         R timeoutResult,
156                         final Read<R> read,
157                         final Procedure<R> procedure)
158                                         throws DatabaseException, InterruptedException
159         {
160                 if (requestStartTimeout < 0)
161                         throw new IllegalArgumentException("request start timeout must be >= 0, got " + requestStartTimeout);
162                 if (TRACE_SCHEDULE)
163                         System.out.println("[" + read + "] scheduling timed request with timeouts " + requestStartTimeout + "/" + requestExecutionTimeout + " ms");
164                 ResultProcedure<R> proc = new ResultProcedure<>();
165                 TimeoutingRequest<R> req = new TimeoutingRequest<>(read, procedure);
166                 session.asyncRequest(req, proc);
167                 if (!req.waitForRequestStart(requestStartTimeout))
168                         return timeoutResult;
169                 return proc.waitForResultOrException(requestExecutionTimeout, timeoutResult);
170         }
171
172         public static final <R> R trySyncRequest(
173                         Session session,
174                         long requestStartTimeout,
175                         long requestExecutionTimeout,
176                         Read<R> read,
177                         Procedure<R> procedure)
178                                         throws DatabaseException, InterruptedException
179         {
180                 return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, procedure);
181         }
182
183         public static final <R> R trySyncRequest(
184                         Session session,
185                         long requestStartTimeout,
186                         long requestExecutionTimeout,
187                         R timeoutResult,
188                         final Read<R> read)
189                                         throws DatabaseException, InterruptedException
190         {
191                 return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, timeoutResult, read, null);
192         }
193
194         public static final <R> R trySyncRequest(
195                         Session session,
196                         long requestStartTimeout,
197                         long requestExecutionTimeout,
198                         Read<R> read)
199                                         throws DatabaseException, InterruptedException
200         {
201                 return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, null);
202         }
203
204 }