X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2FBlockingAsyncProcedure.java;fp=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2FBlockingAsyncProcedure.java;h=c491fb377464e366f5b34e8e8eac2e0e205840c5;hp=2d3e2804c6ebf4d630984892195c6fe2e2e4b0fa;hb=90a52d58489b6c518b13dde238923125c8f63eec;hpb=bb61be97905f72c01fd99e21c263546c88edc5f7 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java index 2d3e2804c..c491fb377 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java @@ -12,64 +12,79 @@ package org.simantics.db.impl; import org.simantics.db.AsyncReadGraph; -import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; -import org.simantics.db.impl.graph.AsyncBarrierImpl; +import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; +import org.simantics.db.impl.query.AsyncReadEntry; +import org.simantics.db.impl.query.PendingTaskSupport; import org.simantics.db.procedure.AsyncProcedure; public class BlockingAsyncProcedure implements AsyncProcedure { private static final Object NO_RESULT = new Object(); - private final Object key; - private final AsyncBarrierImpl barrier; - private final ReadGraphImpl procedureGraph; - private final AsyncProcedure procedure; + public final Object key; + public final ReadGraphImpl queryGraph; + public final ReadGraphImpl callerGraph; + public final AsyncProcedure procedure; + public PendingTaskSupport pendingTaskSupport; + public Object result = NO_RESULT; + public Throwable exception = null; - private Object result = NO_RESULT; - private Throwable exception = null; + private ReadGraphImpl queryGraph() { + return queryGraph; + } + + public BlockingAsyncProcedure(ReadGraphImpl callerGraph, AsyncReadEntry entry, AsyncProcedure procedure, Object key, boolean needsToBlock) { + + // A new graph for evaluating the query with correct parent and asyncBarrier + queryGraph = callerGraph.withParent(entry, () -> { + + dispatchProcedure(queryGraph(), callerGraph, entry, procedure, needsToBlock); + + }); + + queryGraph.asyncBarrier.inc(); - public BlockingAsyncProcedure(AsyncBarrierImpl barrier, ReadGraphImpl procedureGraph, AsyncProcedure procedure, Object key) { this.procedure = procedure; this.key = key; - this.barrier = barrier; - this.barrier.inc(); - this.procedureGraph = procedureGraph; + this.queryGraph.asyncBarrier.inc(); + this.callerGraph = callerGraph; + if (BarrierTracing.BOOKKEEPING) { + BarrierTracing.registerBAP(this); + } } @Override public void execute(AsyncReadGraph graph_, Result result) { + this.result = result; - try { - if(procedure != null) procedure.execute(procedureGraph, result); - } catch (Throwable throwable) { - Logger.defaultLogError("AsyncProcedure.execute threw for " + procedure, throwable); - } finally { - barrier.dec(); - } + queryGraph.asyncBarrier.dec(); + } @Override public void exception(AsyncReadGraph graph_, Throwable t) { + this.exception = t; - try { - if(procedure != null) procedure.exception(procedureGraph, t); - } catch (Throwable throwable) { - Logger.defaultLogError("AsyncProcedure.exception threw for " + procedure, throwable); - } finally { - barrier.dec(); - } + queryGraph.asyncBarrier.dec(); + } - + public void waitBarrier() { - barrier.waitBarrier(key, procedureGraph); + queryGraph.asyncBarrier.waitBarrier(key, queryGraph); + } + + public void dec() { + + queryGraph.asyncBarrier.dec(); + } @SuppressWarnings("unchecked") public Result get() throws DatabaseException { - barrier.waitBarrier(key, procedureGraph); + queryGraph.asyncBarrier.waitBarrier(key, queryGraph); if(exception != null) { if(exception instanceof DatabaseException) throw (DatabaseException)exception; @@ -80,10 +95,6 @@ public class BlockingAsyncProcedure implements AsyncProcedure { } - public boolean isDone() { - return barrier.get() == 0; - } - @SuppressWarnings("unchecked") public Result getResult() { return (Result)result; @@ -97,5 +108,41 @@ public class BlockingAsyncProcedure implements AsyncProcedure { public String toString() { return "." + procedure; } + + private void dispatchProcedure(ReadGraphImpl queryGraph, ReadGraphImpl parentGraph, AsyncReadEntry entry, AsyncProcedure procedure_, boolean needsToBlock) { + + AsyncProcedure procedure = entry != null ? entry : procedure_; + + ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent); + executeGraph.asyncBarrier.inc(); + try { + if(procedure != null) { + procedure.execute(executeGraph, get()); + } + } catch (DatabaseException e) { + if(procedure != null) procedure.exception(executeGraph, e); + exception = e; + } catch (Throwable t) { + DatabaseException dbe = new DatabaseException(t); + if(procedure != null) procedure.exception(executeGraph, dbe); + exception = dbe; + } finally { + + if (entry != null) { + assert(entry.isReady()); + // This does not throw + entry.performFromCache(executeGraph, procedure_); + } + + executeGraph.asyncBarrier.dec(); + if(needsToBlock) + executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph); + } + + if (BarrierTracing.BOOKKEEPING) { + BarrierTracing.unregisterBAP(this); + } + + } }