From: Antti Villberg Date: Mon, 3 Sep 2018 07:03:29 +0000 (+0300) Subject: Trying to wait for procedures X-Git-Tag: v1.43.0~136^2~359^2 X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=commitdiff_plain;h=4c5ff8ec11947aae4d49106365819717cfa43209 Trying to wait for procedures gitlab #94 Change-Id: I42ad471de50553ed5be01dbc7f51452b1f9f6813 --- diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncMultiProcedure.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncMultiProcedure.java index 45facb864..5f7571ea8 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncMultiProcedure.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncMultiProcedure.java @@ -14,6 +14,7 @@ package org.simantics.db.impl; import org.simantics.db.AsyncReadGraph; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; +import org.simantics.db.impl.graph.AsyncBarrierImpl; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.procedure.AsyncMultiProcedure; @@ -22,24 +23,26 @@ public class BlockingAsyncMultiProcedure implements AsyncMultiProcedure< private static final Object NO_RESULT = new Object(); private final Object key; - private final ReadGraphImpl graph; + private final AsyncBarrierImpl barrier; + private final ReadGraphImpl procedureGraph; private final AsyncMultiProcedure procedure; private Object result = NO_RESULT; private Throwable exception = null; - public BlockingAsyncMultiProcedure(ReadGraphImpl graph, AsyncMultiProcedure procedure, Object key) { + public BlockingAsyncMultiProcedure(AsyncBarrierImpl barrier, ReadGraphImpl procedureGraph, AsyncMultiProcedure procedure, Object key) { this.procedure = procedure; this.key = key; - this.graph = graph; - this.graph.asyncBarrier.inc(); + this.barrier = barrier; + this.barrier.inc(); + this.procedureGraph = procedureGraph; } @Override public void execute(AsyncReadGraph graph, Result result) { this.result = result; try { - if(procedure != null) procedure.execute(graph, result); + if(procedure != null) procedure.execute(procedureGraph, result); } catch (Throwable throwable) { Logger.defaultLogError("AsyncProcedure.execute threw for " + procedure, throwable); } @@ -47,11 +50,12 @@ public class BlockingAsyncMultiProcedure implements AsyncMultiProcedure< @Override public void finished(AsyncReadGraph graph) { - this.graph.asyncBarrier.dec(); try { - if(procedure != null) procedure.finished(graph); + if(procedure != null) procedure.finished(procedureGraph); } catch (Throwable throwable) { Logger.defaultLogError("AsyncProcedure.finish threw for " + procedure, throwable); + } finally { + barrier.dec(); } } @@ -59,23 +63,26 @@ public class BlockingAsyncMultiProcedure implements AsyncMultiProcedure< public void exception(AsyncReadGraph graph, Throwable t) { this.exception = t; try { - if (procedure != null) procedure.exception(graph, t); + if (procedure != null) procedure.exception(procedureGraph, t); } catch (Throwable throwable) { Logger.defaultLogError("AsyncProcedure.exception threw for " + procedure, throwable); } finally { - this.graph.asyncBarrier.dec(); + barrier.dec(); } } @SuppressWarnings("unchecked") public Result get() throws DatabaseException { - graph.asyncBarrier.waitBarrier(key, graph); + + barrier.waitBarrier(key, procedureGraph); + if (exception != null) { if (exception instanceof DatabaseException) throw (DatabaseException) exception; throw new DatabaseException(exception); } else { return (Result) result; } + } @SuppressWarnings("unchecked") diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java index af0604ae6..b94ec6f68 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java @@ -11,10 +11,7 @@ *******************************************************************************/ package org.simantics.db.impl.graph; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.simantics.db.common.utils.Logger; @@ -26,122 +23,46 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie private static final long serialVersionUID = 4724463372850048672L; - static final HashMap> reverseLookup = new HashMap<>(); - public static final HashMap debuggerMap = new HashMap<>(); - static final HashMap> entryMap = new HashMap<>(); - static final HashMap restartMap = new HashMap<>(); - static final int WAIT_TIME = 600; - public static final boolean BOOKKEEPING = false; public static final boolean PRINT = false; - static final boolean RESTART_GUARD = true; final public AsyncBarrierImpl caller; - //private final Semaphore sema = new Semaphore(0); - public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry entry) { super(0); - if (BOOKKEEPING) { - synchronized (entryMap) { - entryMap.put(this, entry); - } - synchronized (debuggerMap) { - debuggerMap.put(this, new Debugger()); - } - synchronized (reverseLookup) { - Collection barriers = reverseLookup - .get(caller); - if (barriers == null) { - barriers = new ArrayList(); - reverseLookup.put(caller, barriers); - } - barriers.add(this); - } - } this.caller = caller; + if (BarrierTracing.BOOKKEEPING) { + BarrierTracing.trace(this, entry); + } } - public class Debugger { - - public Map infos = new HashMap<>(); - - public synchronized void inc(AsyncBarrierImpl id, String info) { - if (id == null) - return; - String exist = infos.get(id); - if (exist != null) - throw new IllegalStateException("Already existing info " + id + " " + info); - infos.put(id, exist); - } - - public synchronized void dec(AsyncBarrierImpl id) { - if (id == null) - return; - String exist = infos.get(id); - if (exist == null) { - System.err.println("No data for " + id); - } else { - infos.remove(id); - } - } - - @Override - public synchronized String toString() { - StringBuilder b = new StringBuilder(); - for (String s : infos.values()) { - b.append("info " + s + "\r\n"); - } - return b.toString(); - } - - public boolean isEmpty() { - return infos.isEmpty(); - } - - } public void inc() { - if (BOOKKEEPING) - inc(this, new Exception().getStackTrace()[2].toString()); - else - inc(null, null); - - if (RESTART_GUARD) - if(restartMap.containsKey(this)) - throw new IllegalStateException("Unplanned restart"); - + if(BarrierTracing.BOOKKEEPING) { + BarrierTracing.inc(this); + } else { + inc(null, null); + } + } - private void inc(Object id, String info) { - - // if (PRINT) { - // if (get() < 5) - // new Exception("inc " + get() + " " + this).printStackTrace(); - // } - - if (BOOKKEEPING) { -// Debugger debugger = debuggerMap.get(this); -// if (debugger != null) -// debugger.inc(id, info); - } + void inc(Object id, String info) { if(PRINT) { - System.err.println("inc barrier[" + get() + "] " + this); StackTraceElement[] elems = new Exception().getStackTrace(); for(int i=0;i<4;i++) System.err.println(elems[i]); - } if (incrementAndGet() == 1) { if (caller != null) { - if (BOOKKEEPING) - caller.inc(this, "Child"); - else - caller.inc(null, null); + if(BarrierTracing.BOOKKEEPING) { + caller.inc(this, "Child"); + } else { + caller.inc(null, null); + } } } @@ -157,10 +78,13 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie int count = decrementAndGet(); if (count < 1) { + if(BarrierTracing.BOOKKEEPING) { + BarrierTracing.dec(this, count); + } if (count == 0) { - if (caller != null) - caller.dec(this); - // sema.release(); + if (caller != null) { + caller.dec(); + } } if (count < 0) { Logger.defaultLogError( @@ -172,43 +96,8 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie } - public void dec(Object id) { - - if (PRINT) { - if (get() < 5) - new Exception("dec" + get() + " " + this).printStackTrace(); - } - - if (BOOKKEEPING) { -// Debugger debugger = debuggerMap.get(this); -// if (debugger != null) { -// debugger.dec(id); -// if(debugger.isEmpty()) -// debuggerMap.remove(this); -// } - } - - int count = decrementAndGet(); - if (count < 1) { - if (count == 0) { - debuggerMap.remove(this); - if (caller != null) - caller.dec(this); - if (RESTART_GUARD) - restartMap.put(this, true); - } - if (count < 0) { - Logger.defaultLogError( - "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.", - new Exception()); - System.exit(-1); - } - assert (count >= 0); - } - } - public static String report(AsyncBarrierImpl barrier) { - CacheEntry e = entryMap.get(barrier); + CacheEntry e = BarrierTracing.entryMap.get(barrier); if(e != null) return e.toString(); else return "Barrier@" + System.identityHashCode(barrier); } @@ -225,7 +114,7 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie // debugger.toErr(indent + 2); // } - Collection children = reverseLookup.get(barrier); + Collection children = BarrierTracing.reverseLookup.get(barrier); if (children != null) { for (AsyncBarrierImpl child : children) printReverse(child, indent + 2); @@ -260,12 +149,10 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie + ") is taking long to execute, so far " + (waitCount / 1000) + " s."); - if (BOOKKEEPING) { - - synchronized (reverseLookup) { + if (BarrierTracing.BOOKKEEPING) { + synchronized (BarrierTracing.reverseLookup) { printReverse(this, 0); } - } // if(Development.DEVELOPMENT) { @@ -292,12 +179,9 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie public void restart() { assertReady(); - // log.clear(); - // sema.drainPermits(); - if (RESTART_GUARD) - restartMap.remove(this); - if (BOOKKEEPING) - debuggerMap.put(this, new Debugger()); + if(BarrierTracing.BOOKKEEPING) { + BarrierTracing.restart(this); + } } public void assertReady() { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/BarrierTracing.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/BarrierTracing.java new file mode 100644 index 000000000..3ff77f6bc --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/BarrierTracing.java @@ -0,0 +1,123 @@ +package org.simantics.db.impl.graph; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.simantics.db.impl.query.CacheEntry; +import org.simantics.db.impl.query.QueryProcessor.SessionTask; + +public class BarrierTracing { + + public static final boolean BOOKKEEPING = false; + static final boolean RESTART_GUARD = BOOKKEEPING && false; + + public static Map tasks = new HashMap<>(); + public static final HashMap> reverseLookup = new HashMap<>(); + public static final HashMap debuggerMap = new HashMap<>(); + public static final HashMap> entryMap = new HashMap<>(); + public static final HashMap restartMap = new HashMap<>(); + public static final HashMap startMap = new HashMap<>(); + + public static void trace(AsyncBarrierImpl barrier, CacheEntry entry) { + + if (RESTART_GUARD) { + synchronized (startMap) { + startMap.put(barrier, new Exception()); + } + } + synchronized (entryMap) { + entryMap.put(barrier, entry); + } + synchronized (debuggerMap) { + debuggerMap.put(barrier, new Debugger()); + } + synchronized (reverseLookup) { + Collection barriers = reverseLookup + .get(barrier.caller); + if (barriers == null) { + barriers = new ArrayList(); + reverseLookup.put(barrier.caller, barriers); + } + barriers.add(barrier); + } + + } + + public static void inc(AsyncBarrierImpl barrier) { + + barrier.inc(barrier, new Exception().getStackTrace()[2].toString()); + + if (RESTART_GUARD) + if(restartMap.containsKey(barrier)) { + startMap.get(barrier).printStackTrace(); + restartMap.get(barrier).printStackTrace(); + new Exception().printStackTrace(); + throw new IllegalStateException("Unplanned restart"); + } + + + } + + public static void restart(AsyncBarrierImpl barrier) { + if (RESTART_GUARD) + BarrierTracing.restartMap.remove(barrier); + if (BOOKKEEPING) + BarrierTracing.debuggerMap.put(barrier, new Debugger()); + } + + public static void dec(AsyncBarrierImpl barrier, int count) { + if (count == 0) { + if (RESTART_GUARD) { + restartMap.put(barrier, new Exception()); + } + debuggerMap.remove(barrier); + } + else if (count < 0) { + BarrierTracing.startMap.get(barrier).printStackTrace(); + BarrierTracing.restartMap.get(barrier).printStackTrace(); + new Exception().printStackTrace(); + } + } + + public static class Debugger { + + public Map infos = new HashMap<>(); + + public synchronized void inc(AsyncBarrierImpl id, String info) { + if (id == null) + return; + String exist = infos.get(id); + if (exist != null) + throw new IllegalStateException("Already existing info " + id + " " + info); + infos.put(id, exist); + } + + public synchronized void dec(AsyncBarrierImpl id) { + if (id == null) + return; + String exist = infos.get(id); + if (exist == null) { + System.err.println("No data for " + id); + } else { + infos.remove(id); + } + } + + @Override + public synchronized String toString() { + StringBuilder b = new StringBuilder(); + for (String s : infos.values()) { + b.append("info " + s + "\r\n"); + } + return b.toString(); + } + + public boolean isEmpty() { + return infos.isEmpty(); + } + + } + +} diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java index 932e68c8f..77af46d55 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java @@ -5158,7 +5158,7 @@ public class ReadGraphImpl implements AsyncReadGraph { processor.schedule(new SessionTask(this) { @Override - public void run(int thread) { + public void run0(int thread) { try { final ListenerBase listener = getListenerBase(procedure); QueryCache.runnerReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false); @@ -5238,7 +5238,7 @@ public class ReadGraphImpl implements AsyncReadGraph { processor.schedule(new SessionTask(this) { @Override - public void run(int thread) { + public void run0(int thread) { try { final ListenerBase listener = getListenerBase(procedure); QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false); diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncMultiReadEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncMultiReadEntry.java index 24f8e5375..bc3cf0a4f 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncMultiReadEntry.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncMultiReadEntry.java @@ -95,7 +95,7 @@ final public class AsyncMultiReadEntry extends CacheEntryBase proc = new BlockingAsyncMultiProcedure<>(graph, new AsyncMultiProcedure() { + BlockingAsyncMultiProcedure proc = new BlockingAsyncMultiProcedure<>(graph.asyncBarrier, graph, new AsyncMultiProcedure() { @Override public void execute(AsyncReadGraph graph, T result) { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java index f409b40c8..972bd381f 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java @@ -16,6 +16,7 @@ import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.BlockingAsyncProcedure; import org.simantics.db.impl.DebugPolicy; import org.simantics.db.impl.graph.AsyncBarrierImpl; +import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.procedure.AsyncProcedure; @@ -152,12 +153,13 @@ final public class AsyncReadEntry extends CacheEntryBase> i AsyncProcedure procedure = entry != null ? entry : procedure_; ReadGraphImpl queryGraph = graph.withParent(entry); + queryGraph.asyncBarrier.inc(); BlockingAsyncProcedure proc = new BlockingAsyncProcedure<>(queryGraph.asyncBarrier, graph, null, request); class AsyncTask extends SessionTask { - int counter = 0; + int counter = 0; T result; DatabaseException exception; @@ -166,44 +168,58 @@ final public class AsyncReadEntry extends CacheEntryBase> i } @Override - public void run(int thread) { + public void run0(int thread) { if(needsToBlock) proc.waitBarrier(); if(proc.isDone()) { + ReadGraphImpl executeGraph = graph.withParent(graph.parent); + executeGraph.asyncBarrier.inc(); try { result = (T)proc.get(); - if(procedure != null) procedure.execute(graph, result); + if(procedure != null) { + procedure.execute(executeGraph, result); + } } catch (DatabaseException e) { - if(procedure != null) procedure.exception(graph, e); + if(procedure != null) procedure.exception(executeGraph, e); exception = e; } catch (Throwable t) { DatabaseException dbe = new DatabaseException(t); - if(procedure != null) procedure.exception(graph, dbe); + if(procedure != null) procedure.exception(executeGraph, dbe); exception = dbe; } finally { - if (entry != null) - entry.performFromCache(graph, procedure_); + if (entry != null) { + // This does not throw + entry.performFromCache(executeGraph, procedure_); + } + executeGraph.asyncBarrier.dec(); + executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph); } } else { - if(counter++ > 10000) { - AsyncBarrierImpl.printReverse(queryGraph.asyncBarrier, 2); - AsyncBarrierImpl caller = queryGraph.asyncBarrier.caller; - while(caller != null) { - System.err.println("called by " + AsyncBarrierImpl.report(caller)); - caller = caller.caller; - } - for(AsyncBarrierImpl ab : AsyncBarrierImpl.debuggerMap.keySet()) { - AsyncBarrierImpl.printReverse(ab, 2); - } - throw new IllegalStateException("Eternal loop in queries."); - } - graph.processor.schedule(this); + if(counter++ > 10000) { + if(BarrierTracing.BOOKKEEPING) { + AsyncBarrierImpl.printReverse(queryGraph.asyncBarrier, 2); + AsyncBarrierImpl caller = queryGraph.asyncBarrier.caller; + while(caller != null) { + System.err.println("called by " + AsyncBarrierImpl.report(caller)); + caller = caller.caller; + } + for(AsyncBarrierImpl ab : BarrierTracing.debuggerMap.keySet()) { + AsyncBarrierImpl.printReverse(ab, 2); + } + } + throw new IllegalStateException("Eternal loop in queries."); + } + graph.processor.schedule(new AsyncTask(graph)); } } } - - request.perform(queryGraph, proc); - + + try { + request.perform(queryGraph, proc); + } finally { + queryGraph.asyncBarrier.dec(); + } + AsyncTask task = new AsyncTask(graph); if(needsToBlock) task.run(0); @@ -212,7 +228,7 @@ final public class AsyncReadEntry extends CacheEntryBase> i graph.processor.schedule(task); return null; } - + if(task.exception != null) throw task.exception; else return task.result; diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java index 57c1c68c7..ee65998d3 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java @@ -502,7 +502,7 @@ public class QueryCache extends QueryCacheBase { if(entry == null) { graph.processor.schedule(new SessionTask(graph) { @Override - public void run(int thread) { + public void run0(int thread) { try { runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock); } catch (DatabaseException e) { @@ -562,7 +562,7 @@ public class QueryCache extends QueryCacheBase { if(entry == null) { graph.processor.schedule(new SessionTask(graph) { @Override - public void run(int thread) { + public void run0(int thread) { try { runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock); } catch (DatabaseException e) { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 65b277e6c..b2951cb27 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -51,6 +51,7 @@ import org.simantics.db.exception.NoInverseException; import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.impl.DebugPolicy; import org.simantics.db.impl.ResourceImpl; +import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.ReadGraphSupport; import org.simantics.db.impl.graph.WriteGraphImpl; @@ -171,7 +172,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - SessionTask getOwnTask(ReadGraphImpl impl) { + public SessionTask getOwnTask(ReadGraphImpl impl) { Set ancestors = impl.ancestorSet(); synchronized(querySupportLock) { int index = 0; @@ -186,6 +187,21 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap return null; } + public SessionTask getSubTask(ReadGraphImpl impl) { + Set onlyThis = Collections.singleton(impl); + synchronized(querySupportLock) { + int index = 0; + while(index < freeScheduling.size()) { + SessionTask task = freeScheduling.get(index); + if(task.hasCommonParent(onlyThis)) { + return freeScheduling.remove(index); + } + index++; + } + } + return null; + } + public boolean performPending(ReadGraphImpl graph) { SessionTask task = getOwnTask(graph); if(task != null) { @@ -201,7 +217,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // } final public void schedule(SessionTask request) { - + //int performer = request.thread; // if(DebugPolicy.SCHEDULE) @@ -218,24 +234,20 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // if(performer == THREADS) { synchronized(querySupportLock) { - - //new Exception().printStackTrace(); - + + if(BarrierTracing.BOOKKEEPING) { + Exception current = new Exception(); + Exception previous = BarrierTracing.tasks.put(request, current); + if(previous != null) { + previous.printStackTrace(); + current.printStackTrace(); + } + } + freeScheduling.add(request); querySupportLock.notifyAll(); - //System.err.println("schedule free task " + request + " => " + freeScheduling.size()); - -// for(int i=0;i ancestors; + private int counter = 0; + private Exception trace; public SessionTask(ReadGraphImpl graph) { this.graph = graph; + if(graph != null) graph.asyncBarrier.inc(); } public boolean hasCommonParent(Set otherAncestors) { @@ -276,7 +291,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap return !Collections.disjoint(ancestors, otherAncestors); } - public abstract void run(int thread); + public abstract void run0(int thread); + + public final void run(int thread) { + if(counter++ > 0) { + if(BarrierTracing.BOOKKEEPING) { + trace.printStackTrace(); + new Exception().printStackTrace(); + } + throw new IllegalStateException("Multiple invocations of SessionTask!"); + } + if(BarrierTracing.BOOKKEEPING) { + trace = new Exception(); + } + run0(thread); + if(graph != null) graph.asyncBarrier.dec(); + } @Override public String toString() { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java index 61e934fe9..1ceca45a5 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java @@ -101,29 +101,49 @@ public final class ReadEntry extends CacheEntryBase> implem AsyncProcedure procedure = entry != null ? entry : procedure_; ReadGraphImpl queryGraph = graph.withParent(entry); + queryGraph.asyncBarrier.inc(); + ReadGraphImpl executeGraph = graph.withParent(graph.parent); + executeGraph.asyncBarrier.inc(); + try { + // This throws T result = request.perform(queryGraph); - if(procedure != null) procedure.execute(graph, result); + + if(procedure != null) procedure.execute(executeGraph, result); return (T)result; } catch (DatabaseException e) { - if(procedure != null) procedure.exception(graph, e); + if(procedure != null) procedure.exception(executeGraph, e); throw e; } catch (Throwable t) { DatabaseException dbe = new DatabaseException(t); - if(procedure != null) procedure.exception(graph, dbe); + if(procedure != null) procedure.exception(executeGraph, dbe); throw dbe; } finally { - if (entry != null) - entry.performFromCache(queryGraph, procedure_); + queryGraph.asyncBarrier.dec(); + + try { + + if (entry != null) { + // This also throws so must dec barrier finally + entry.performFromCache(executeGraph, procedure_); + } + + } finally { + + executeGraph.asyncBarrier.dec(); + executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph); + } + + } } diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 62b7e3825..9b152f2da 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -417,7 +417,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { if(Development.DEVELOPMENT) { try { @@ -553,7 +553,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -637,7 +637,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); Procedure stateProcedure = new Procedure() { @@ -1374,7 +1374,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1472,7 +1472,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1496,7 +1496,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1504,6 +1504,9 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + // This is never synced but increase to prevent it from visiting 0 + newGraph.asyncBarrier.inc(); + try { if (listener != null) { @@ -1614,7 +1617,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1676,7 +1679,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1732,7 +1735,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1786,7 +1789,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java index c11242163..a93476355 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java @@ -121,7 +121,7 @@ public class SessionRequestManager { session.queryProvider2.schedule(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { try { transactionState.startReadTransaction(thread); task.run(thread); @@ -145,7 +145,7 @@ public class SessionRequestManager { session.queryProvider2.schedule(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { session.fireFinishReadTransaction(); @@ -166,7 +166,7 @@ public class SessionRequestManager { session.queryProvider2.schedule(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { try { transactionState.startWriteTransaction(thread); @@ -187,7 +187,7 @@ public class SessionRequestManager { session.queryProvider2.schedule(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { // Support for DelayedWriteRequest cancels during the // read-only part of the request. @@ -274,7 +274,7 @@ public class SessionRequestManager { session.queryProvider2.schedule(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { read.run(thread); if(read.notify != null) read.notify.release(); } @@ -315,7 +315,7 @@ public class SessionRequestManager { session.queryProvider2.schedule(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { try { task.run(thread); } finally { diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java index ba0ce846b..f2061cd42 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java @@ -46,7 +46,7 @@ public class UndoRedoSupportImpl implements UndoRedoSupport { final TaskHelper th = new TaskHelper("Undo"); session.requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { session.flushCounter = 0; session.clusterStream.reallyFlush(); ClientChangesImpl cs = new ClientChangesImpl(session); diff --git a/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/GraphToDiagramSynchronizer.java b/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/GraphToDiagramSynchronizer.java index 2b72b3e0f..e2c1521ee 100644 --- a/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/GraphToDiagramSynchronizer.java +++ b/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/GraphToDiagramSynchronizer.java @@ -49,6 +49,7 @@ import org.simantics.db.common.procedure.adapter.AsyncProcedureAdapter; import org.simantics.db.common.procedure.adapter.CacheListener; import org.simantics.db.common.procedure.adapter.ListenerSupport; import org.simantics.db.common.procedure.adapter.ProcedureAdapter; +import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener; import org.simantics.db.common.request.AsyncReadRequest; import org.simantics.db.common.request.ReadRequest; import org.simantics.db.common.session.SessionEventListenerAdapter; @@ -1301,7 +1302,7 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID task4.finish(); // ITask task5 = ThreadLogger.getInstance().begin("DiagramContentRequest2"); ITask task42 = ThreadLogger.getInstance().begin("DiagramContentRequest2"); - DiagramContents contents = g.syncRequest(query); + DiagramContents contents = g.syncRequest(query, TransientCacheAsyncListener.instance()); //System.err.println("contents: " + contents); task42.finish(); // task5.finish(); @@ -1609,11 +1610,11 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID } public void clear() { - // Prevent DiagramContents leakage through DisposableListeners. - lastContent = null; - content = null; - changes = null; - + // Prevent DiagramContents leakage through DisposableListeners. + lastContent = null; + content = null; + changes = null; + this.addedElements.clear(); this.removedElements.clear(); this.addedConnectionSegments.clear(); @@ -1654,10 +1655,13 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID // TODO: Connection loading has no listening, changes :Connection will not be noticed by this code! Listener loadListener = new DisposableListener(canvasListenerSupport) { - @Override - public String toString() { - return "Connection load listener for " + element; - } + + boolean firstTime = true; + + @Override + public String toString() { + return "Connection load listener for " + element; + } @Override public void execute(IElement loaded) { // Invoked when the element has been loaded. @@ -1669,6 +1673,19 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID return; } + if (firstTime) { + + mapElement(element, loaded); + synchronized (GraphToDiagramUpdater.this) { + addedElements.add(loaded); + addedElementMap.put(element, loaded); + addedConnectionMap.put(element, loaded); + } + + firstTime = false; + + } + Object data = loaded.getHint(ElementHints.KEY_OBJECT); // Logic for disposing listener @@ -1708,16 +1725,6 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID graph.syncRequest(new ConnectionRequest(canvas, diagram, element, errorHandler, loadListener), new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, final IElement e) { - if (e == null) - return; - - //System.out.println("ConnectionRequestProcedure " + e); - mapElement(element, e); - synchronized (GraphToDiagramUpdater.this) { - addedElements.add(e); - addedElementMap.put(element, e); - addedConnectionMap.put(element, e); - } // Read connection type graph.forSingleType(element, br.DIA.Connection, new Procedure() { @@ -1736,6 +1743,7 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID } } }); + } @Override @@ -1746,10 +1754,13 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID } else if (content.nodeSet.contains(element)) { Listener loadListener = new DisposableListener(canvasListenerSupport) { - @Override - public String toString() { - return "Node load listener for " + element; - } + + boolean firstTime = true; + + @Override + public String toString() { + return "Node load listener for " + element; + } @Override public void execute(IElement loaded) { // Invoked when the element has been loaded. @@ -1761,6 +1772,22 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID return; } + if (firstTime) { + + // This is invoked before the element is actually loaded. + //System.out.println("NodeRequestProcedure " + e); + if (DebugPolicy.DEBUG_NODE_LOAD) + System.out.println("MAPPING ADDED NODE: " + element + " -> " + loaded); + mapElement(element, loaded); + synchronized (GraphToDiagramUpdater.this) { + addedElements.add(loaded); + addedElementMap.put(element, loaded); + } + + firstTime = false; + + } + Object data = loaded.getHint(ElementHints.KEY_OBJECT); // Logic for disposing listener @@ -1801,18 +1828,6 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID graph.syncRequest(new NodeRequest(canvas, diagram, element, loadListener), new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, IElement e) { - if (e == null) - return; - - // This is invoked before the element is actually loaded. - //System.out.println("NodeRequestProcedure " + e); - if (DebugPolicy.DEBUG_NODE_LOAD) - System.out.println("MAPPING ADDED NODE: " + element + " -> " + e); - mapElement(element, e); - synchronized (GraphToDiagramUpdater.this) { - addedElements.add(e); - addedElementMap.put(element, e); - } } @Override @@ -1953,10 +1968,13 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID continue; Listener loadListener = new DisposableListener(canvasListenerSupport) { - @Override - public String toString() { - return "processRouteGraphConnections " + connection; - } + + boolean firstTime = true; + + @Override + public String toString() { + return "processRouteGraphConnections " + connection; + } @Override public void execute(IElement loaded) { // Invoked when the element has been loaded. @@ -1968,6 +1986,18 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID return; } + if(firstTime) { + if (DebugPolicy.DEBUG_NODE_LOAD) + System.out.println("MAPPING ADDED ROUTE GRAPH CONNECTION: " + connection + " -> " + loaded); + mapElement(connection, loaded); + synchronized (GraphToDiagramUpdater.this) { + addedElements.add(loaded); + addedElementMap.put(connection, loaded); + addedRouteGraphConnectionMap.put(connection, loaded); + } + firstTime = false; + } + Object data = loaded.getHint(ElementHints.KEY_OBJECT); // Logic for disposing listener @@ -2022,18 +2052,6 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID graph.syncRequest(new ConnectionRequest(canvas, diagram, connection, errorHandler, loadListener), new Procedure() { @Override public void execute(final IElement e) { - if (e == null) - return; - - //System.out.println("ConnectionRequestProcedure " + e); - if (DebugPolicy.DEBUG_NODE_LOAD) - System.out.println("MAPPING ADDED ROUTE GRAPH CONNECTION: " + connection + " -> " + e); - mapElement(connection, e); - synchronized (GraphToDiagramUpdater.this) { - addedElements.add(e); - addedElementMap.put(connection, e); - addedRouteGraphConnectionMap.put(connection, e); - } } @Override public void exception(Throwable throwable) { @@ -2083,10 +2101,13 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID }); Listener loadListener = new DisposableListener(canvasListenerSupport) { - @Override - public String toString() { - return "processBranchPoints for " + element; - } + + boolean firstTime = true; + + @Override + public String toString() { + return "processBranchPoints for " + element; + } @Override public void execute(IElement loaded) { // Invoked when the element has been loaded. @@ -2098,6 +2119,21 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID return; } + if (firstTime) { + + mapElement(element, loaded); + synchronized (GraphToDiagramUpdater.this) { + addedBranchPoints.add(loaded); + addedElementMap.put(element, loaded); + ConnectionEntityImpl ce = getConnectionEntity(element); + loaded.setHint(ElementHints.KEY_CONNECTION_ENTITY, ce); + loaded.setHint(ElementHints.KEY_PARENT_ELEMENT, ce.getConnectionElement()); + } + + firstTime = false; + + } + Object data = loaded.getHint(ElementHints.KEY_OBJECT); if (addedElementMap.containsKey(data)) { // This element was just loaded, in @@ -2128,16 +2164,6 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID graph.syncRequest(new NodeRequest(canvas, diagram, element, loadListener), new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, IElement e) { - if (e != null) { - mapElement(element, e); - synchronized (GraphToDiagramUpdater.this) { - addedBranchPoints.add(e); - addedElementMap.put(element, e); - ConnectionEntityImpl ce = getConnectionEntity(element); - e.setHint(ElementHints.KEY_CONNECTION_ENTITY, ce); - e.setHint(ElementHints.KEY_PARENT_ELEMENT, ce.getConnectionElement()); - } - } } @Override @@ -2385,12 +2411,12 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID return "defaultConnectionSegmentAdapter"; } }, new DisposableListener(listenerSupport) { - - @Override - public String toString() { - return "DefaultConnectionSegmentAdapter listener for " + edge; - } - + + @Override + public String toString() { + return "DefaultConnectionSegmentAdapter listener for " + edge; + } + @Override public void execute(IElement loaded) { // Invoked when the element has been loaded. @@ -2667,9 +2693,9 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID GraphToDiagramUpdater updater = new GraphToDiagramUpdater(lastContent, content, changes); GraphToDiagramSynchronizer.this.currentUpdater = updater; try { - updater.process(graph); + updater.process(graph); } finally { - GraphToDiagramSynchronizer.this.currentUpdater = null; + GraphToDiagramSynchronizer.this.currentUpdater = null; } Timing.END(task);