-package org.simantics.db.common.utils;\r
-\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.TimeUnit;\r
-import java.util.concurrent.atomic.AtomicInteger;\r
-\r
-import org.simantics.db.ReadGraph;\r
-import org.simantics.db.Session;\r
-\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.procedure.Procedure;\r
-import org.simantics.db.request.Read;\r
-\r
-/**\r
- * @author Tuukka Lehtonen\r
- */\r
-public final class RequestUtil {\r
-\r
- private static final boolean TRACE_SCHEDULE = false;\r
- private static final boolean TRACE_WAITED_TIME = false;\r
- private static final boolean TRACE_TIMEOUT = false;\r
- private static final boolean TRACE_RESULT_TIME = false;\r
-\r
- static class ResultProcedure<R> extends Semaphore implements Procedure<R> {\r
-\r
- private static final long serialVersionUID = 8645926264625496924L;\r
-\r
- private static final Object NO_RESULT = new Object();\r
- private Object resultOrException = NO_RESULT;\r
-\r
- public ResultProcedure() {\r
- super(0);\r
- }\r
-\r
- @Override\r
- public void execute(R result) {\r
- resultOrException = result;\r
- release();\r
- }\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- resultOrException = t;\r
- release();\r
- }\r
-\r
- @SuppressWarnings("unchecked")\r
- public R waitForResultOrException(long timeout, R timeoutResult) throws DatabaseException, InterruptedException {\r
- long nt0 = System.nanoTime();\r
- if (timeout < 0) {\r
- acquire();\r
- } else {\r
- tryAcquire(timeout, TimeUnit.MILLISECONDS);\r
- }\r
- Object r = resultOrException;\r
- if (TRACE_WAITED_TIME)\r
- System.out.println("slept waiting for result for " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");\r
- if (r == NO_RESULT) {\r
- if (TRACE_TIMEOUT)\r
- System.out.println("timed out (" + timeout + " ms) waiting for request result, returning default " + timeoutResult);\r
- return timeoutResult;\r
- }\r
- if (r instanceof Throwable) {\r
- if (r instanceof DatabaseException)\r
- throw (DatabaseException) r;\r
- throw new DatabaseException((Throwable) r);\r
- }\r
- if (TRACE_RESULT_TIME)\r
- System.out.println("got result in " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");\r
- return (R) r;\r
- }\r
-\r
- }\r
-\r
- static class TimeoutingRequest<R> extends Semaphore implements Read<R> {\r
-\r
- private static final long serialVersionUID = -5216095211800988013L;\r
-\r
- Read<R> wrappedRequest;\r
- Procedure<R> wrappedProcedure;\r
- /**\r
- * 0 = not running, not timed out\r
- * 1 = running\r
- * 2 = timed out\r
- */\r
- AtomicInteger state = new AtomicInteger();\r
-\r
- public TimeoutingRequest(Read<R> request, Procedure<R> procedure) {\r
- super(0);\r
- this.wrappedRequest = request;\r
- this.wrappedProcedure = procedure;\r
- }\r
-\r
- @Override\r
- public R perform(ReadGraph graph) throws DatabaseException {\r
- if (state.compareAndSet(0, 1)) {\r
- release();\r
- return wrappedProcedure != null\r
- ? graph.syncRequest(wrappedRequest, wrappedProcedure)\r
- : graph.syncRequest(wrappedRequest);\r
- } else {\r
- if (TRACE_TIMEOUT)\r
- System.out.println("[" + System.currentTimeMillis() + "] canceling execution of request " + wrappedRequest + ", startup timed out");\r
- release();\r
- return null;\r
- }\r
- }\r
-\r
- public boolean waitForRequestStart(long timeout) throws InterruptedException {\r
- long nt0 = System.nanoTime();\r
- tryAcquire(timeout, TimeUnit.MILLISECONDS);\r
- if (TRACE_WAITED_TIME)\r
- System.out.println("[" + wrappedRequest + "] request start wait time " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");\r
- if (state.compareAndSet(0, 2)) {\r
- if (TRACE_TIMEOUT)\r
- System.out.println("[" + wrappedRequest + "] starting of request timed out (>= " + timeout + " ms)");\r
- return false;\r
- }\r
- return true;\r
- }\r
-\r
- }\r
-\r
- /**\r
- * @param processor\r
- * database session handle\r
- * @param requestStartTimeout\r
- * timeout in milliseconds for the request execution to start. If\r
- * request execution is not started within this period of time,\r
- * timeoutResult<code>timeoutResult</code>requestExecutionTimeout\r
- * will be returned. Must be ≥ 0.\r
- * @param requestExecutionTimeout\r
- * the maximum time to wait for the request execution to complete\r
- * from the time the request execution was started. If negative,\r
- * there is no timeout.\r
- * @param timeoutResult\r
- * the result\r
- * @param read\r
- * @param procedure\r
- * @return\r
- * @throws DatabaseException\r
- * @throws InterruptedException\r
- */\r
- public static final <R> R trySyncRequest(\r
- Session session,\r
- long requestStartTimeout,\r
- long requestExecutionTimeout,\r
- R timeoutResult,\r
- final Read<R> read,\r
- final Procedure<R> procedure)\r
- throws DatabaseException, InterruptedException\r
- {\r
- if (requestStartTimeout < 0)\r
- throw new IllegalArgumentException("request start timeout must be >= 0, got " + requestStartTimeout);\r
- if (TRACE_SCHEDULE)\r
- System.out.println("[" + read + "] scheduling timed request with timeouts " + requestStartTimeout + "/" + requestExecutionTimeout + " ms");\r
- ResultProcedure<R> proc = new ResultProcedure<>();\r
- TimeoutingRequest<R> req = new TimeoutingRequest<>(read, procedure);\r
- session.asyncRequest(req, proc);\r
- if (!req.waitForRequestStart(requestStartTimeout))\r
- return timeoutResult;\r
- return proc.waitForResultOrException(requestExecutionTimeout, timeoutResult);\r
- }\r
-\r
- public static final <R> R trySyncRequest(\r
- Session session,\r
- long requestStartTimeout,\r
- long requestExecutionTimeout,\r
- Read<R> read,\r
- Procedure<R> procedure)\r
- throws DatabaseException, InterruptedException\r
- {\r
- return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, procedure);\r
- }\r
-\r
- public static final <R> R trySyncRequest(\r
- Session session,\r
- long requestStartTimeout,\r
- long requestExecutionTimeout,\r
- R timeoutResult,\r
- final Read<R> read)\r
- throws DatabaseException, InterruptedException\r
- {\r
- return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, timeoutResult, read, null);\r
- }\r
-\r
- public static final <R> R trySyncRequest(\r
- Session session,\r
- long requestStartTimeout,\r
- long requestExecutionTimeout,\r
- Read<R> read)\r
- throws DatabaseException, InterruptedException\r
- {\r
- return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, null);\r
- }\r
-\r
-}\r
+package org.simantics.db.common.utils;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.simantics.db.ReadGraph;
+import org.simantics.db.Session;
+
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.procedure.Listener;
+import org.simantics.db.procedure.Procedure;
+import org.simantics.db.request.Read;
+
+/**
+ * @author Tuukka Lehtonen
+ */
+public final class RequestUtil {
+
+ private static final boolean TRACE_SCHEDULE = false;
+ private static final boolean TRACE_WAITED_TIME = false;
+ private static final boolean TRACE_TIMEOUT = false;
+ private static final boolean TRACE_RESULT_TIME = false;
+
+ static class ResultProcedure<R> extends Semaphore implements Procedure<R> {
+
+ private static final long serialVersionUID = 8645926264625496924L;
+
+ private static final Object NO_RESULT = new Object();
+ private Object resultOrException = NO_RESULT;
+
+ public ResultProcedure() {
+ super(0);
+ }
+
+ @Override
+ public void execute(R result) {
+ resultOrException = result;
+ release();
+ }
+
+ @Override
+ public void exception(Throwable t) {
+ resultOrException = t;
+ release();
+ }
+
+ @SuppressWarnings("unchecked")
+ public R waitForResultOrException(long timeout, R timeoutResult) throws DatabaseException, InterruptedException {
+ long nt0 = System.nanoTime();
+ if (timeout < 0) {
+ acquire();
+ } else {
+ tryAcquire(timeout, TimeUnit.MILLISECONDS);
+ }
+ Object r = resultOrException;
+ if (TRACE_WAITED_TIME)
+ System.out.println("slept waiting for result for " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");
+ if (r == NO_RESULT) {
+ if (TRACE_TIMEOUT)
+ System.out.println("timed out (" + timeout + " ms) waiting for request result, returning default " + timeoutResult);
+ return timeoutResult;
+ }
+ if (r instanceof Throwable) {
+ if (r instanceof DatabaseException)
+ throw (DatabaseException) r;
+ throw new DatabaseException((Throwable) r);
+ }
+ if (TRACE_RESULT_TIME)
+ System.out.println("got result in " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");
+ return (R) r;
+ }
+
+ }
+
+ static class TimeoutingRequest<R> extends Semaphore implements Read<R> {
+
+ private static final long serialVersionUID = -5216095211800988013L;
+
+ Read<R> wrappedRequest;
+ Procedure<R> wrappedProcedure;
+ /**
+ * 0 = not running, not timed out
+ * 1 = running
+ * 2 = timed out
+ */
+ AtomicInteger state = new AtomicInteger();
+
+ public TimeoutingRequest(Read<R> request, Procedure<R> procedure) {
+ super(0);
+ this.wrappedRequest = request;
+ this.wrappedProcedure = procedure;
+ }
+
+ @Override
+ public R perform(ReadGraph graph) throws DatabaseException {
+ if (state.compareAndSet(0, 1)) {
+ release();
+ if(wrappedProcedure != null) {
+ if(wrappedProcedure instanceof Listener<?>) {
+ return graph.syncRequest(wrappedRequest, (Listener<R>)wrappedProcedure);
+ } else {
+ return graph.syncRequest(wrappedRequest, wrappedProcedure);
+ }
+ } else {
+ return graph.syncRequest(wrappedRequest);
+ }
+ } else {
+ if (TRACE_TIMEOUT)
+ System.out.println("[" + System.currentTimeMillis() + "] canceling execution of request " + wrappedRequest + ", startup timed out");
+ release();
+ return null;
+ }
+ }
+
+ public boolean waitForRequestStart(long timeout) throws InterruptedException {
+ long nt0 = System.nanoTime();
+ tryAcquire(timeout, TimeUnit.MILLISECONDS);
+ if (TRACE_WAITED_TIME)
+ System.out.println("[" + wrappedRequest + "] request start wait time " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms");
+ if (state.compareAndSet(0, 2)) {
+ if (TRACE_TIMEOUT)
+ System.out.println("[" + wrappedRequest + "] starting of request timed out (>= " + timeout + " ms)");
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ /**
+ * @param processor
+ * database session handle
+ * @param requestStartTimeout
+ * timeout in milliseconds for the request execution to start. If
+ * request execution is not started within this period of time,
+ * timeoutResult<code>timeoutResult</code>requestExecutionTimeout
+ * will be returned. Must be ≥ 0.
+ * @param requestExecutionTimeout
+ * the maximum time to wait for the request execution to complete
+ * from the time the request execution was started. If negative,
+ * there is no timeout.
+ * @param timeoutResult
+ * the result
+ * @param read
+ * @param procedure
+ * @return
+ * @throws DatabaseException
+ * @throws InterruptedException
+ */
+ public static final <R> R trySyncRequest(
+ Session session,
+ long requestStartTimeout,
+ long requestExecutionTimeout,
+ R timeoutResult,
+ final Read<R> read,
+ final Procedure<R> procedure)
+ throws DatabaseException, InterruptedException
+ {
+ if (requestStartTimeout < 0)
+ throw new IllegalArgumentException("request start timeout must be >= 0, got " + requestStartTimeout);
+ if (TRACE_SCHEDULE)
+ System.out.println("[" + read + "] scheduling timed request with timeouts " + requestStartTimeout + "/" + requestExecutionTimeout + " ms");
+ ResultProcedure<R> proc = new ResultProcedure<>();
+ TimeoutingRequest<R> req = new TimeoutingRequest<>(read, procedure);
+ session.asyncRequest(req, proc);
+ if (!req.waitForRequestStart(requestStartTimeout))
+ return timeoutResult;
+ return proc.waitForResultOrException(requestExecutionTimeout, timeoutResult);
+ }
+
+ public static final <R> R trySyncRequest(
+ Session session,
+ long requestStartTimeout,
+ long requestExecutionTimeout,
+ Read<R> read,
+ Procedure<R> procedure)
+ throws DatabaseException, InterruptedException
+ {
+ return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, procedure);
+ }
+
+ public static final <R> R trySyncRequest(
+ Session session,
+ long requestStartTimeout,
+ long requestExecutionTimeout,
+ R timeoutResult,
+ final Read<R> read)
+ throws DatabaseException, InterruptedException
+ {
+ return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, timeoutResult, read, null);
+ }
+
+ public static final <R> R trySyncRequest(
+ Session session,
+ long requestStartTimeout,
+ long requestExecutionTimeout,
+ Read<R> read)
+ throws DatabaseException, InterruptedException
+ {
+ return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, null);
+ }
+
+}