package org.simantics.db.common.utils; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.simantics.db.ReadGraph; import org.simantics.db.Session; import org.simantics.db.exception.DatabaseException; import org.simantics.db.procedure.Listener; import org.simantics.db.procedure.Procedure; import org.simantics.db.request.Read; /** * @author Tuukka Lehtonen */ public final class RequestUtil { private static final boolean TRACE_SCHEDULE = false; private static final boolean TRACE_WAITED_TIME = false; private static final boolean TRACE_TIMEOUT = false; private static final boolean TRACE_RESULT_TIME = false; static class ResultProcedure extends Semaphore implements Procedure { private static final long serialVersionUID = 8645926264625496924L; private static final Object NO_RESULT = new Object(); private Object resultOrException = NO_RESULT; public ResultProcedure() { super(0); } @Override public void execute(R result) { resultOrException = result; release(); } @Override public void exception(Throwable t) { resultOrException = t; release(); } @SuppressWarnings("unchecked") public R waitForResultOrException(long timeout, R timeoutResult) throws DatabaseException, InterruptedException { long nt0 = System.nanoTime(); if (timeout < 0) { acquire(); } else { tryAcquire(timeout, TimeUnit.MILLISECONDS); } Object r = resultOrException; if (TRACE_WAITED_TIME) System.out.println("slept waiting for result for " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms"); if (r == NO_RESULT) { if (TRACE_TIMEOUT) System.out.println("timed out (" + timeout + " ms) waiting for request result, returning default " + timeoutResult); return timeoutResult; } if (r instanceof Throwable) { if (r instanceof DatabaseException) throw (DatabaseException) r; throw new DatabaseException((Throwable) r); } if (TRACE_RESULT_TIME) System.out.println("got result in " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms"); return (R) r; } } static class TimeoutingRequest extends Semaphore implements Read { private static final long serialVersionUID = -5216095211800988013L; Read wrappedRequest; Procedure wrappedProcedure; /** * 0 = not running, not timed out * 1 = running * 2 = timed out */ AtomicInteger state = new AtomicInteger(); public TimeoutingRequest(Read request, Procedure procedure) { super(0); this.wrappedRequest = request; this.wrappedProcedure = procedure; } @Override public R perform(ReadGraph graph) throws DatabaseException { if (state.compareAndSet(0, 1)) { release(); if(wrappedProcedure != null) { if(wrappedProcedure instanceof Listener) { return graph.syncRequest(wrappedRequest, (Listener)wrappedProcedure); } else { return graph.syncRequest(wrappedRequest, wrappedProcedure); } } else { return graph.syncRequest(wrappedRequest); } } else { if (TRACE_TIMEOUT) System.out.println("[" + System.currentTimeMillis() + "] canceling execution of request " + wrappedRequest + ", startup timed out"); release(); return null; } } public boolean waitForRequestStart(long timeout) throws InterruptedException { long nt0 = System.nanoTime(); tryAcquire(timeout, TimeUnit.MILLISECONDS); if (TRACE_WAITED_TIME) System.out.println("[" + wrappedRequest + "] request start wait time " + ((double) (System.nanoTime() - nt0) / 1e6) + " ms"); if (state.compareAndSet(0, 2)) { if (TRACE_TIMEOUT) System.out.println("[" + wrappedRequest + "] starting of request timed out (>= " + timeout + " ms)"); return false; } return true; } } /** * @param processor * database session handle * @param requestStartTimeout * timeout in milliseconds for the request execution to start. If * request execution is not started within this period of time, * timeoutResulttimeoutResultrequestExecutionTimeout * will be returned. Must be ≥ 0. * @param requestExecutionTimeout * the maximum time to wait for the request execution to complete * from the time the request execution was started. If negative, * there is no timeout. * @param timeoutResult * the result * @param read * @param procedure * @return * @throws DatabaseException * @throws InterruptedException */ public static final R trySyncRequest( Session session, long requestStartTimeout, long requestExecutionTimeout, R timeoutResult, final Read read, final Procedure procedure) throws DatabaseException, InterruptedException { if (requestStartTimeout < 0) throw new IllegalArgumentException("request start timeout must be >= 0, got " + requestStartTimeout); if (TRACE_SCHEDULE) System.out.println("[" + read + "] scheduling timed request with timeouts " + requestStartTimeout + "/" + requestExecutionTimeout + " ms"); ResultProcedure proc = new ResultProcedure<>(); TimeoutingRequest req = new TimeoutingRequest<>(read, procedure); session.asyncRequest(req, proc); if (!req.waitForRequestStart(requestStartTimeout)) return timeoutResult; return proc.waitForResultOrException(requestExecutionTimeout, timeoutResult); } public static final R trySyncRequest( Session session, long requestStartTimeout, long requestExecutionTimeout, Read read, Procedure procedure) throws DatabaseException, InterruptedException { return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, procedure); } public static final R trySyncRequest( Session session, long requestStartTimeout, long requestExecutionTimeout, R timeoutResult, final Read read) throws DatabaseException, InterruptedException { return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, timeoutResult, read, null); } public static final R trySyncRequest( Session session, long requestStartTimeout, long requestExecutionTimeout, Read read) throws DatabaseException, InterruptedException { return trySyncRequest(session, requestStartTimeout, requestExecutionTimeout, null, read, null); } }