X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.common%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fcommon%2Futils%2FRequestUtil.java;h=d1256444ba7f2ce52c51680320b556bb27bb94b3;hb=refs%2Fchanges%2F33%2F1333%2F1;hp=8babb76f44ef6c8906392436f5cc074f4ec4c8d5;hpb=969bd23cab98a79ca9101af33334000879fb60c5;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.common/src/org/simantics/db/common/utils/RequestUtil.java b/bundles/org.simantics.db.common/src/org/simantics/db/common/utils/RequestUtil.java index 8babb76f4..d1256444b 100644 --- a/bundles/org.simantics.db.common/src/org/simantics/db/common/utils/RequestUtil.java +++ b/bundles/org.simantics.db.common/src/org/simantics/db/common/utils/RequestUtil.java @@ -1,197 +1,204 @@ -package org.simantics.db.common.utils; - -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.simantics.db.ReadGraph; -import org.simantics.db.Session; - -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.procedure.Procedure; -import org.simantics.db.request.Read; - -/** - * @author Tuukka Lehtonen - */ -public final class RequestUtil { - - private static final boolean TRACE_SCHEDULE = false; - private static final boolean TRACE_WAITED_TIME = false; - private static final boolean TRACE_TIMEOUT = false; - private static final boolean TRACE_RESULT_TIME = false; - - static class ResultProcedure extends Semaphore implements Procedure { - - private static final long serialVersionUID = 8645926264625496924L; - - private static final Object NO_RESULT = new Object(); - private Object resultOrException = NO_RESULT; - - public ResultProcedure() { - super(0); - } - - @Override - public void execute(R result) { - resultOrException = result; - release(); - } - - @Override - public void exception(Throwable t) { - resultOrException = t; - release(); - } - - @SuppressWarnings("unchecked") - public R waitForResultOrException(long timeout, R timeoutResult) throws DatabaseException, InterruptedException { - long nt0 = System.nanoTime(); - if (timeout < 0) { - acquire(); - } else { - tryAcquire(timeout, TimeUnit.MILLISECONDS); - } - Object r = resultOrException; - if (TRACE_WAITED_TIME) - System.out.println("slept waiting for result for " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms"); - if (r == NO_RESULT) { - if (TRACE_TIMEOUT) - System.out.println("timed out (" + timeout + " ms) waiting for request result, returning default " + timeoutResult); - return timeoutResult; - } - if (r instanceof Throwable) { - if (r instanceof DatabaseException) - throw (DatabaseException) r; - throw new DatabaseException((Throwable) r); - } - if (TRACE_RESULT_TIME) - System.out.println("got result in " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms"); - return (R) r; - } - - } - - static class TimeoutingRequest extends Semaphore implements Read { - - private static final long serialVersionUID = -5216095211800988013L; - - Read wrappedRequest; - Procedure wrappedProcedure; - /** - * 0 = not running, not timed out - * 1 = running - * 2 = timed out - */ - AtomicInteger state = new AtomicInteger(); - - public TimeoutingRequest(Read request, Procedure procedure) { - super(0); - this.wrappedRequest = request; - this.wrappedProcedure = procedure; - } - - @Override - public R perform(ReadGraph graph) throws DatabaseException { - if (state.compareAndSet(0, 1)) { - release(); - return wrappedProcedure != null - ? graph.syncRequest(wrappedRequest, wrappedProcedure) - : graph.syncRequest(wrappedRequest); - } else { - if (TRACE_TIMEOUT) - System.out.println("[" + System.currentTimeMillis() + "] canceling execution of request " + wrappedRequest + ", startup timed out"); - release(); - return null; - } - } - - public boolean waitForRequestStart(long timeout) throws InterruptedException { - long nt0 = System.nanoTime(); - tryAcquire(timeout, TimeUnit.MILLISECONDS); - if (TRACE_WAITED_TIME) - System.out.println("[" + wrappedRequest + "] request start wait time " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms"); - if (state.compareAndSet(0, 2)) { - if (TRACE_TIMEOUT) - System.out.println("[" + wrappedRequest + "] starting of request timed out (>= " + timeout + " ms)"); - return false; - } - return true; - } - - } - - /** - * @param processor - * database session handle - * @param requestStartTimeout - * timeout in milliseconds for the request execution to start. If - * request execution is not started within this period of time, - * timeoutResulttimeoutResultrequestExecutionTimeout - * will be returned. Must be ≥ 0. - * @param requestExecutionTimeout - * the maximum time to wait for the request execution to complete - * from the time the request execution was started. If negative, - * there is no timeout. - * @param timeoutResult - * the result - * @param read - * @param procedure - * @return - * @throws DatabaseException - * @throws InterruptedException - */ - public static final R trySyncRequest( - Session session, - long requestStartTimeout, - long requestExecutionTimeout, - R timeoutResult, - final Read read, - final Procedure procedure) - throws DatabaseException, InterruptedException - { - if (requestStartTimeout < 0) - throw new IllegalArgumentException("request start timeout must be >= 0, got " + requestStartTimeout); - if (TRACE_SCHEDULE) - System.out.println("[" + read + "] scheduling timed request with timeouts " + requestStartTimeout + "/" + requestExecutionTimeout + " ms"); - ResultProcedure proc = new ResultProcedure<>(); - TimeoutingRequest req = new TimeoutingRequest<>(read, procedure); - session.asyncRequest(req, proc); - if (!req.waitForRequestStart(requestStartTimeout)) - return timeoutResult; - return proc.waitForResultOrException(requestExecutionTimeout, timeoutResult); - } - - public static final R trySyncRequest( - Session session, - long requestStartTimeout, - long requestExecutionTimeout, - Read read, - Procedure procedure) - throws DatabaseException, InterruptedException - { - return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, procedure); - } - - public static final R trySyncRequest( - Session session, - long requestStartTimeout, - long requestExecutionTimeout, - R timeoutResult, - final Read read) - throws DatabaseException, InterruptedException - { - return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, timeoutResult, read, null); - } - - public static final R trySyncRequest( - Session session, - long requestStartTimeout, - long requestExecutionTimeout, - Read read) - throws DatabaseException, InterruptedException - { - return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, null); - } - -} +package org.simantics.db.common.utils; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.simantics.db.ReadGraph; +import org.simantics.db.Session; + +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.procedure.Listener; +import org.simantics.db.procedure.Procedure; +import org.simantics.db.request.Read; + +/** + * @author Tuukka Lehtonen + */ +public final class RequestUtil { + + private static final boolean TRACE_SCHEDULE = false; + private static final boolean TRACE_WAITED_TIME = false; + private static final boolean TRACE_TIMEOUT = false; + private static final boolean TRACE_RESULT_TIME = false; + + static class ResultProcedure extends Semaphore implements Procedure { + + private static final long serialVersionUID = 8645926264625496924L; + + private static final Object NO_RESULT = new Object(); + private Object resultOrException = NO_RESULT; + + public ResultProcedure() { + super(0); + } + + @Override + public void execute(R result) { + resultOrException = result; + release(); + } + + @Override + public void exception(Throwable t) { + resultOrException = t; + release(); + } + + @SuppressWarnings("unchecked") + public R waitForResultOrException(long timeout, R timeoutResult) throws DatabaseException, InterruptedException { + long nt0 = System.nanoTime(); + if (timeout < 0) { + acquire(); + } else { + tryAcquire(timeout, TimeUnit.MILLISECONDS); + } + Object r = resultOrException; + if (TRACE_WAITED_TIME) + System.out.println("slept waiting for result for " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms"); + if (r == NO_RESULT) { + if (TRACE_TIMEOUT) + System.out.println("timed out (" + timeout + " ms) waiting for request result, returning default " + timeoutResult); + return timeoutResult; + } + if (r instanceof Throwable) { + if (r instanceof DatabaseException) + throw (DatabaseException) r; + throw new DatabaseException((Throwable) r); + } + if (TRACE_RESULT_TIME) + System.out.println("got result in " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms"); + return (R) r; + } + + } + + static class TimeoutingRequest extends Semaphore implements Read { + + private static final long serialVersionUID = -5216095211800988013L; + + Read wrappedRequest; + Procedure wrappedProcedure; + /** + * 0 = not running, not timed out + * 1 = running + * 2 = timed out + */ + AtomicInteger state = new AtomicInteger(); + + public TimeoutingRequest(Read request, Procedure procedure) { + super(0); + this.wrappedRequest = request; + this.wrappedProcedure = procedure; + } + + @Override + public R perform(ReadGraph graph) throws DatabaseException { + if (state.compareAndSet(0, 1)) { + release(); + if(wrappedProcedure != null) { + if(wrappedProcedure instanceof Listener) { + return graph.syncRequest(wrappedRequest, (Listener)wrappedProcedure); + } else { + return graph.syncRequest(wrappedRequest, wrappedProcedure); + } + } else { + return graph.syncRequest(wrappedRequest); + } + } else { + if (TRACE_TIMEOUT) + System.out.println("[" + System.currentTimeMillis() + "] canceling execution of request " + wrappedRequest + ", startup timed out"); + release(); + return null; + } + } + + public boolean waitForRequestStart(long timeout) throws InterruptedException { + long nt0 = System.nanoTime(); + tryAcquire(timeout, TimeUnit.MILLISECONDS); + if (TRACE_WAITED_TIME) + System.out.println("[" + wrappedRequest + "] request start wait time " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms"); + if (state.compareAndSet(0, 2)) { + if (TRACE_TIMEOUT) + System.out.println("[" + wrappedRequest + "] starting of request timed out (>= " + timeout + " ms)"); + return false; + } + return true; + } + + } + + /** + * @param processor + * database session handle + * @param requestStartTimeout + * timeout in milliseconds for the request execution to start. If + * request execution is not started within this period of time, + * timeoutResulttimeoutResultrequestExecutionTimeout + * will be returned. Must be ≥ 0. + * @param requestExecutionTimeout + * the maximum time to wait for the request execution to complete + * from the time the request execution was started. If negative, + * there is no timeout. + * @param timeoutResult + * the result + * @param read + * @param procedure + * @return + * @throws DatabaseException + * @throws InterruptedException + */ + public static final R trySyncRequest( + Session session, + long requestStartTimeout, + long requestExecutionTimeout, + R timeoutResult, + final Read read, + final Procedure procedure) + throws DatabaseException, InterruptedException + { + if (requestStartTimeout < 0) + throw new IllegalArgumentException("request start timeout must be >= 0, got " + requestStartTimeout); + if (TRACE_SCHEDULE) + System.out.println("[" + read + "] scheduling timed request with timeouts " + requestStartTimeout + "/" + requestExecutionTimeout + " ms"); + ResultProcedure proc = new ResultProcedure<>(); + TimeoutingRequest req = new TimeoutingRequest<>(read, procedure); + session.asyncRequest(req, proc); + if (!req.waitForRequestStart(requestStartTimeout)) + return timeoutResult; + return proc.waitForResultOrException(requestExecutionTimeout, timeoutResult); + } + + public static final R trySyncRequest( + Session session, + long requestStartTimeout, + long requestExecutionTimeout, + Read read, + Procedure procedure) + throws DatabaseException, InterruptedException + { + return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, procedure); + } + + public static final R trySyncRequest( + Session session, + long requestStartTimeout, + long requestExecutionTimeout, + R timeoutResult, + final Read read) + throws DatabaseException, InterruptedException + { + return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, timeoutResult, read, null); + } + + public static final R trySyncRequest( + Session session, + long requestStartTimeout, + long requestExecutionTimeout, + Read read) + throws DatabaseException, InterruptedException + { + return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, null); + } + +}