]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.common/src/org/simantics/db/common/utils/RequestUtil.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.db.common / src / org / simantics / db / common / utils / RequestUtil.java
1 package org.simantics.db.common.utils;\r
2 \r
3 import java.util.concurrent.Semaphore;\r
4 import java.util.concurrent.TimeUnit;\r
5 import java.util.concurrent.atomic.AtomicInteger;\r
6 \r
7 import org.simantics.db.ReadGraph;\r
8 import org.simantics.db.Session;\r
9 \r
10 import org.simantics.db.exception.DatabaseException;\r
11 import org.simantics.db.procedure.Procedure;\r
12 import org.simantics.db.request.Read;\r
13 \r
14 /**\r
15  * @author Tuukka Lehtonen\r
16  */\r
17 public final class RequestUtil {\r
18 \r
19         private static final boolean TRACE_SCHEDULE = false;\r
20         private static final boolean TRACE_WAITED_TIME = false;\r
21         private static final boolean TRACE_TIMEOUT  = false;\r
22         private static final boolean TRACE_RESULT_TIME = false;\r
23 \r
24         static class ResultProcedure<R> extends Semaphore implements Procedure<R> {\r
25 \r
26                 private static final long serialVersionUID = 8645926264625496924L;\r
27 \r
28                 private static final Object NO_RESULT = new Object();\r
29                 private Object resultOrException = NO_RESULT;\r
30 \r
31                 public ResultProcedure() {\r
32                         super(0);\r
33                 }\r
34 \r
35                 @Override\r
36                 public void execute(R result) {\r
37                         resultOrException = result;\r
38                         release();\r
39                 }\r
40 \r
41                 @Override\r
42                 public void exception(Throwable t) {\r
43                         resultOrException = t;\r
44                         release();\r
45                 }\r
46 \r
47                 @SuppressWarnings("unchecked")\r
48                 public R waitForResultOrException(long timeout, R timeoutResult) throws DatabaseException, InterruptedException {\r
49                         long nt0 = System.nanoTime();\r
50                         if (timeout < 0) {\r
51                                 acquire();\r
52                         } else {\r
53                                 tryAcquire(timeout, TimeUnit.MILLISECONDS);\r
54                         }\r
55                         Object r = resultOrException;\r
56                         if (TRACE_WAITED_TIME)\r
57                                 System.out.println("slept waiting for result for " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");\r
58                         if (r == NO_RESULT) {\r
59                                 if (TRACE_TIMEOUT)\r
60                                         System.out.println("timed out (" + timeout + " ms) waiting for request result, returning default " + timeoutResult);\r
61                                 return timeoutResult;\r
62                         }\r
63                         if (r instanceof Throwable) {\r
64                                 if (r instanceof DatabaseException)\r
65                                         throw (DatabaseException) r;\r
66                                 throw new DatabaseException((Throwable) r);\r
67                         }\r
68                         if (TRACE_RESULT_TIME)\r
69                                 System.out.println("got result in " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");\r
70                         return (R) r;\r
71                 }\r
72 \r
73         }\r
74 \r
75         static class TimeoutingRequest<R> extends Semaphore implements Read<R> {\r
76 \r
77                 private static final long serialVersionUID = -5216095211800988013L;\r
78 \r
79                 Read<R> wrappedRequest;\r
80                 Procedure<R> wrappedProcedure;\r
81                 /**\r
82                  * 0 = not running, not timed out\r
83                  * 1 = running\r
84                  * 2 = timed out\r
85                  */\r
86                 AtomicInteger state = new AtomicInteger();\r
87 \r
88                 public TimeoutingRequest(Read<R> request, Procedure<R> procedure) {\r
89                         super(0);\r
90                         this.wrappedRequest = request;\r
91                         this.wrappedProcedure = procedure;\r
92                 }\r
93 \r
94                 @Override\r
95                 public R perform(ReadGraph graph) throws DatabaseException {\r
96                         if (state.compareAndSet(0, 1)) {\r
97                                 release();\r
98                                 return wrappedProcedure != null\r
99                                                 ? graph.syncRequest(wrappedRequest, wrappedProcedure)\r
100                                                 : graph.syncRequest(wrappedRequest);\r
101                         } else {\r
102                                 if (TRACE_TIMEOUT)\r
103                                         System.out.println("[" + System.currentTimeMillis() + "] canceling execution of request " + wrappedRequest + ", startup timed out");\r
104                                 release();\r
105                                 return null;\r
106                         }\r
107                 }\r
108 \r
109                 public boolean waitForRequestStart(long timeout) throws InterruptedException {\r
110                         long nt0 = System.nanoTime();\r
111                         tryAcquire(timeout, TimeUnit.MILLISECONDS);\r
112                         if (TRACE_WAITED_TIME)\r
113                                 System.out.println("[" + wrappedRequest + "] request start wait time " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");\r
114                         if (state.compareAndSet(0, 2)) {\r
115                                 if (TRACE_TIMEOUT)\r
116                                         System.out.println("[" + wrappedRequest + "] starting of request timed out (>= " + timeout + " ms)");\r
117                                 return false;\r
118                         }\r
119                         return true;\r
120                 }\r
121 \r
122         }\r
123 \r
124         /**\r
125          * @param processor\r
126          *            database session handle\r
127          * @param requestStartTimeout\r
128          *            timeout in milliseconds for the request execution to start. If\r
129          *            request execution is not started within this period of time,\r
130          *            timeoutResult<code>timeoutResult</code>requestExecutionTimeout\r
131          *            will be returned. Must be &ge; 0.\r
132          * @param requestExecutionTimeout\r
133          *            the maximum time to wait for the request execution to complete\r
134          *            from the time the request execution was started. If negative,\r
135          *            there is no timeout.\r
136          * @param timeoutResult\r
137          *            the result\r
138          * @param read\r
139          * @param procedure\r
140          * @return\r
141          * @throws DatabaseException\r
142          * @throws InterruptedException\r
143          */\r
144         public static final <R> R trySyncRequest(\r
145                         Session session,\r
146                         long requestStartTimeout,\r
147                         long requestExecutionTimeout,\r
148                         R timeoutResult,\r
149                         final Read<R> read,\r
150                         final Procedure<R> procedure)\r
151                                         throws DatabaseException, InterruptedException\r
152         {\r
153                 if (requestStartTimeout < 0)\r
154                         throw new IllegalArgumentException("request start timeout must be >= 0, got " + requestStartTimeout);\r
155                 if (TRACE_SCHEDULE)\r
156                         System.out.println("[" + read + "] scheduling timed request with timeouts " + requestStartTimeout + "/" + requestExecutionTimeout + " ms");\r
157                 ResultProcedure<R> proc = new ResultProcedure<>();\r
158                 TimeoutingRequest<R> req = new TimeoutingRequest<>(read, procedure);\r
159                 session.asyncRequest(req, proc);\r
160                 if (!req.waitForRequestStart(requestStartTimeout))\r
161                         return timeoutResult;\r
162                 return proc.waitForResultOrException(requestExecutionTimeout, timeoutResult);\r
163         }\r
164 \r
165         public static final <R> R trySyncRequest(\r
166                         Session session,\r
167                         long requestStartTimeout,\r
168                         long requestExecutionTimeout,\r
169                         Read<R> read,\r
170                         Procedure<R> procedure)\r
171                                         throws DatabaseException, InterruptedException\r
172         {\r
173                 return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, procedure);\r
174         }\r
175 \r
176         public static final <R> R trySyncRequest(\r
177                         Session session,\r
178                         long requestStartTimeout,\r
179                         long requestExecutionTimeout,\r
180                         R timeoutResult,\r
181                         final Read<R> read)\r
182                                         throws DatabaseException, InterruptedException\r
183         {\r
184                 return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, timeoutResult, read, null);\r
185         }\r
186 \r
187         public static final <R> R trySyncRequest(\r
188                         Session session,\r
189                         long requestStartTimeout,\r
190                         long requestExecutionTimeout,\r
191                         Read<R> read)\r
192                                         throws DatabaseException, InterruptedException\r
193         {\r
194                 return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, null);\r
195         }\r
196 \r
197 }\r