1 package org.simantics.db.common.utils;
\r
3 import java.util.concurrent.Semaphore;
\r
4 import java.util.concurrent.TimeUnit;
\r
5 import java.util.concurrent.atomic.AtomicInteger;
\r
7 import org.simantics.db.ReadGraph;
\r
8 import org.simantics.db.Session;
\r
10 import org.simantics.db.exception.DatabaseException;
\r
11 import org.simantics.db.procedure.Procedure;
\r
12 import org.simantics.db.request.Read;
\r
15 * @author Tuukka Lehtonen
\r
17 public final class RequestUtil {
\r
19 private static final boolean TRACE_SCHEDULE = false;
\r
20 private static final boolean TRACE_WAITED_TIME = false;
\r
21 private static final boolean TRACE_TIMEOUT = false;
\r
22 private static final boolean TRACE_RESULT_TIME = false;
\r
24 static class ResultProcedure<R> extends Semaphore implements Procedure<R> {
\r
26 private static final long serialVersionUID = 8645926264625496924L;
\r
28 private static final Object NO_RESULT = new Object();
\r
29 private Object resultOrException = NO_RESULT;
\r
31 public ResultProcedure() {
\r
36 public void execute(R result) {
\r
37 resultOrException = result;
\r
42 public void exception(Throwable t) {
\r
43 resultOrException = t;
\r
47 @SuppressWarnings("unchecked")
\r
48 public R waitForResultOrException(long timeout, R timeoutResult) throws DatabaseException, InterruptedException {
\r
49 long nt0 = System.nanoTime();
\r
53 tryAcquire(timeout, TimeUnit.MILLISECONDS);
\r
55 Object r = resultOrException;
\r
56 if (TRACE_WAITED_TIME)
\r
57 System.out.println("slept waiting for result for " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");
\r
58 if (r == NO_RESULT) {
\r
60 System.out.println("timed out (" + timeout + " ms) waiting for request result, returning default " + timeoutResult);
\r
61 return timeoutResult;
\r
63 if (r instanceof Throwable) {
\r
64 if (r instanceof DatabaseException)
\r
65 throw (DatabaseException) r;
\r
66 throw new DatabaseException((Throwable) r);
\r
68 if (TRACE_RESULT_TIME)
\r
69 System.out.println("got result in " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");
\r
75 static class TimeoutingRequest<R> extends Semaphore implements Read<R> {
\r
77 private static final long serialVersionUID = -5216095211800988013L;
\r
79 Read<R> wrappedRequest;
\r
80 Procedure<R> wrappedProcedure;
\r
82 * 0 = not running, not timed out
\r
86 AtomicInteger state = new AtomicInteger();
\r
88 public TimeoutingRequest(Read<R> request, Procedure<R> procedure) {
\r
90 this.wrappedRequest = request;
\r
91 this.wrappedProcedure = procedure;
\r
95 public R perform(ReadGraph graph) throws DatabaseException {
\r
96 if (state.compareAndSet(0, 1)) {
\r
98 return wrappedProcedure != null
\r
99 ? graph.syncRequest(wrappedRequest, wrappedProcedure)
\r
100 : graph.syncRequest(wrappedRequest);
\r
103 System.out.println("[" + System.currentTimeMillis() + "] canceling execution of request " + wrappedRequest + ", startup timed out");
\r
109 public boolean waitForRequestStart(long timeout) throws InterruptedException {
\r
110 long nt0 = System.nanoTime();
\r
111 tryAcquire(timeout, TimeUnit.MILLISECONDS);
\r
112 if (TRACE_WAITED_TIME)
\r
113 System.out.println("[" + wrappedRequest + "] request start wait time " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");
\r
114 if (state.compareAndSet(0, 2)) {
\r
116 System.out.println("[" + wrappedRequest + "] starting of request timed out (>= " + timeout + " ms)");
\r
126 * database session handle
\r
127 * @param requestStartTimeout
\r
128 * timeout in milliseconds for the request execution to start. If
\r
129 * request execution is not started within this period of time,
\r
130 * timeoutResult<code>timeoutResult</code>requestExecutionTimeout
\r
131 * will be returned. Must be ≥ 0.
\r
132 * @param requestExecutionTimeout
\r
133 * the maximum time to wait for the request execution to complete
\r
134 * from the time the request execution was started. If negative,
\r
135 * there is no timeout.
\r
136 * @param timeoutResult
\r
141 * @throws DatabaseException
\r
142 * @throws InterruptedException
\r
144 public static final <R> R trySyncRequest(
\r
146 long requestStartTimeout,
\r
147 long requestExecutionTimeout,
\r
149 final Read<R> read,
\r
150 final Procedure<R> procedure)
\r
151 throws DatabaseException, InterruptedException
\r
153 if (requestStartTimeout < 0)
\r
154 throw new IllegalArgumentException("request start timeout must be >= 0, got " + requestStartTimeout);
\r
155 if (TRACE_SCHEDULE)
\r
156 System.out.println("[" + read + "] scheduling timed request with timeouts " + requestStartTimeout + "/" + requestExecutionTimeout + " ms");
\r
157 ResultProcedure<R> proc = new ResultProcedure<>();
\r
158 TimeoutingRequest<R> req = new TimeoutingRequest<>(read, procedure);
\r
159 session.asyncRequest(req, proc);
\r
160 if (!req.waitForRequestStart(requestStartTimeout))
\r
161 return timeoutResult;
\r
162 return proc.waitForResultOrException(requestExecutionTimeout, timeoutResult);
\r
165 public static final <R> R trySyncRequest(
\r
167 long requestStartTimeout,
\r
168 long requestExecutionTimeout,
\r
170 Procedure<R> procedure)
\r
171 throws DatabaseException, InterruptedException
\r
173 return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, procedure);
\r
176 public static final <R> R trySyncRequest(
\r
178 long requestStartTimeout,
\r
179 long requestExecutionTimeout,
\r
181 final Read<R> read)
\r
182 throws DatabaseException, InterruptedException
\r
184 return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, timeoutResult, read, null);
\r
187 public static final <R> R trySyncRequest(
\r
189 long requestStartTimeout,
\r
190 long requestExecutionTimeout,
\r
192 throws DatabaseException, InterruptedException
\r
194 return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, null);
\r