From: Antti Villberg Date: Sun, 2 Feb 2020 17:46:02 +0000 (+0200) Subject: DB request scheduling scheme fails with district diagrams X-Git-Tag: v1.43.0~109^2~1 X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F25%2F3825%2F1;p=simantics%2Fplatform.git DB request scheduling scheme fails with district diagrams gitlab #457 Change-Id: I3f8b3504cd4b26f670fd45c79ab4e192fb0091d8 --- diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java index c491fb377..ecb66ff91 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java @@ -28,6 +28,7 @@ public class BlockingAsyncProcedure implements AsyncProcedure { public final ReadGraphImpl callerGraph; public final AsyncProcedure procedure; public PendingTaskSupport pendingTaskSupport; + public final boolean needsToBlock; public Object result = NO_RESULT; public Throwable exception = null; @@ -36,13 +37,13 @@ public class BlockingAsyncProcedure implements AsyncProcedure { } public BlockingAsyncProcedure(ReadGraphImpl callerGraph, AsyncReadEntry entry, AsyncProcedure procedure, Object key, boolean needsToBlock) { - + // A new graph for evaluating the query with correct parent and asyncBarrier queryGraph = callerGraph.withParent(entry, () -> { dispatchProcedure(queryGraph(), callerGraph, entry, procedure, needsToBlock); - }); + }, needsToBlock); queryGraph.asyncBarrier.inc(); @@ -50,6 +51,7 @@ public class BlockingAsyncProcedure implements AsyncProcedure { this.key = key; this.queryGraph.asyncBarrier.inc(); this.callerGraph = callerGraph; + this.needsToBlock = needsToBlock; if (BarrierTracing.BOOKKEEPING) { BarrierTracing.registerBAP(this); } @@ -84,7 +86,8 @@ public class BlockingAsyncProcedure implements AsyncProcedure { @SuppressWarnings("unchecked") public Result get() throws DatabaseException { - queryGraph.asyncBarrier.waitBarrier(key, queryGraph); + if(needsToBlock) + queryGraph.asyncBarrier.waitBarrier(key, queryGraph); if(exception != null) { if(exception instanceof DatabaseException) throw (DatabaseException)exception; @@ -113,7 +116,7 @@ public class BlockingAsyncProcedure implements AsyncProcedure { AsyncProcedure procedure = entry != null ? entry : procedure_; - ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent); + ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent, null, needsToBlock); executeGraph.asyncBarrier.inc(); try { if(procedure != null) { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java index 060fc22ae..637db3f26 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java @@ -18,209 +18,206 @@ import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.impl.query.CacheEntry; import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier; -import org.simantics.db.impl.query.QueryProcessor.SessionTask; -final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier { +public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier { - private static final long serialVersionUID = 4724463372850048672L; + private static final long serialVersionUID = 4724463372850048672L; - static final int WAIT_TIME = 60000; + static final int WAIT_TIME = 60000; - public static final boolean PRINT = false; + public static final boolean PRINT = false; - final public AsyncBarrierImpl caller; - - final public Runnable callback; + final AsyncBarrierImpl caller; - public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry entry, Runnable callback) { - super(0); - this.caller = caller; - this.callback = callback; + private final Runnable callback; + + private final boolean needsToBlock; + + public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry entry, Runnable callback, boolean needsToBlock) { + super(0); + this.caller = caller; + this.callback = callback; + this.needsToBlock = needsToBlock; if (BarrierTracing.BOOKKEEPING) { BarrierTracing.trace(this, entry); } - } - - - public void inc() { + } + + public AsyncBarrier getBlockingBarrier() { + if(needsToBlock) + return this; + if(caller == null) + return null; + else return caller.getBlockingBarrier(); + } + + @Override + public boolean isBlocking() { + return needsToBlock; + } + + @Override + public void inc() { + + if(BarrierTracing.BOOKKEEPING) { + BarrierTracing.inc(this); + } else { + inc(null, null); + } - if(BarrierTracing.BOOKKEEPING) { - BarrierTracing.inc(this); - } else { - inc(null, null); - } - - } + } - void inc(Object id, String info) { + void inc(Object id, String info) { - if(PRINT) { - System.err.println("inc barrier[" + get() + "] " + this); - StackTraceElement[] elems = new Exception().getStackTrace(); - for(int i=0;i<4;i++) System.err.println(elems[i]); - } + if(PRINT) { + System.err.println("inc barrier[" + get() + "] " + this); + StackTraceElement[] elems = new Exception().getStackTrace(); + for(int i=0;i<4;i++) System.err.println(elems[i]); + } - if (incrementAndGet() == 1) { - if (caller != null) { - if(BarrierTracing.BOOKKEEPING) { + if (incrementAndGet() == 1) { + if (caller != null) { + if(BarrierTracing.BOOKKEEPING) { caller.inc(this, "Child"); - } else { + } else { caller.inc(null, null); - } - } - } + } + } + } - } + } - public void dec() { + @Override + public void dec() { - if(PRINT) { - System.err.println("dec barrier[" + get() + "] " + this); - StackTraceElement[] elems = new Exception().getStackTrace(); - for(int i=0;i<3;i++) System.err.println(elems[i]); - } + if(PRINT) { + System.err.println("dec barrier[" + get() + "] " + this); + StackTraceElement[] elems = new Exception().getStackTrace(); + for(int i=0;i<3;i++) System.err.println(elems[i]); + } - int count = decrementAndGet(); - if (count < 1) { + int count = decrementAndGet(); + if (count < 1) { if(BarrierTracing.BOOKKEEPING) { BarrierTracing.dec(this, count); } - if (count == 0) { - if (caller != null) { - caller.dec(); - } - } - if (count < 0) { - Logger.defaultLogError( - "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.", - new Exception()); - } - assert (count >= 0); - - if(callback != null) - callback.run(); - - } - - } - - public static String report(AsyncBarrierImpl barrier) { - CacheEntry e = BarrierTracing.entryMap.get(barrier); - if(e != null) return e.toString(); - else return "Barrier@" + System.identityHashCode(barrier); - } - - public static void printReverse(AsyncBarrierImpl barrier, int indent) { - - if (barrier.get() == 0) - return; - for (int i = 0; i < indent; i++) - System.err.print(" "); - System.err.println("[" + barrier.get() + " requests]: " + report(barrier)); -// if (BOOKKEEPING) { -// Debugger debugger = debuggerMap.get(barrier); -// debugger.toErr(indent + 2); -// } - - Collection children = BarrierTracing.reverseLookup.get(barrier); - if (children != null) { - for (AsyncBarrierImpl child : children) - printReverse(child, indent + 2); - } - - } - - public void waitBarrier(Object request, ReadGraphImpl impl) { - - if (get() > 0) { - - long waitCount = 0; - - while (get() != 0) { - - boolean executed = impl.performPending(); - if(executed) waitCount = 0; - - ++waitCount; - if(waitCount > 100) Thread.yield(); - if(waitCount > 1000) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - if(waitCount > WAIT_TIME) { - - System.err.println("AsyncBarrierImpl.waitBarrier(" - + request - + ") is taking long to execute, so far " - + (waitCount / 1000) + " s."); - - if (BarrierTracing.BOOKKEEPING) { - synchronized (BarrierTracing.reverseLookup) { - printReverse(this, 0); - } - BarrierTracing.printBAPS(); - } - - for(SessionTask t : impl.processor.freeScheduling) { - System.err.println("Pending task:" + t); - } - -// if(Development.DEVELOPMENT) { -// impl.processor.threadLocks[0].lock(); -// System.err.println("-queues=" + impl.processor.queues[0].size()); -// impl.processor.threadLocks[0].unlock(); -// System.err.println("-own=" + impl.processor.ownTasks[0].size()); -// System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size()); -// for(SessionTask task : impl.processor.ownSyncTasks[0]) { -// System.err.println("--" + task); -// } -// } - - throw new RuntimeDatabaseException("Request timed out."); - //waitCount = 0; - - } - - } - - } - - } - - public void restart() { - assertReady(); - if(BarrierTracing.BOOKKEEPING) { - BarrierTracing.restart(this); - } - } - - public void assertReady() { - int current = get(); - if (current != 0) - throw new AssertionError("Barrier was not finished (pending=" - + current + ")."); - } - - public void report() { - // System.out.println("Barrier log:"); - // for(Map.Entry entry : sources.entrySet()) { - // System.out.println(entry.getKey() + " " + entry.getValue()); - // } - // System.out.println("SyncIntProcedure log:"); - // for(Map.Entry entry : - // SyncIntProcedure.counters.entrySet()) { - // System.out.println(entry.getKey() + " " + entry.getValue()); - // } - } - - @Override - public String toString() { - return report(this); -// return "AsyncBarrierImpl@" + System.identityHashCode(this) -// + " - counter = " + get() + " - caller = " + caller; - } + if (count == 0) { + if (caller != null) { + caller.dec(); + } + } + if (count < 0) { + Logger.defaultLogError( + "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.", + new Exception()); + } + assert (count >= 0); + + if(callback != null) + callback.run(); + + } + + } + + public static String report(AsyncBarrierImpl barrier) { + CacheEntry e = BarrierTracing.entryMap.get(barrier); + if(e != null) return e.toString(); + else return "Barrier@" + System.identityHashCode(barrier); + } + + public static void printReverse(AsyncBarrierImpl barrier, int indent) { + + if (barrier.get() == 0) + return; + for (int i = 0; i < indent; i++) + System.err.print(" "); + System.err.println("[" + barrier.get() + " requests]: " + report(barrier)); + + Collection children = BarrierTracing.reverseLookup.get(barrier); + if (children != null) { + for (AsyncBarrierImpl child : children) + printReverse(child, indent + 2); + } + + } + + public void waitBarrier(Object request, ReadGraphImpl impl) { + + if (get() > 0) { + + long waitCount = 0; + + while (get() != 0) { + + boolean executed = impl.performPending(); + if(executed) waitCount = 0; + + ++waitCount; + if(waitCount > 100) Thread.yield(); + if(waitCount > 1000) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if(waitCount > WAIT_TIME) { + + System.err.println("AsyncBarrierImpl.waitBarrier(" + + request + + ") is taking long to execute, so far " + + (waitCount / 1000) + " s."); + + if (BarrierTracing.BOOKKEEPING) { + synchronized (BarrierTracing.reverseLookup) { + printReverse(this, 0); + } + BarrierTracing.printBAPS(); + } + + throw new RuntimeDatabaseException("Request timed out."); + + } + + } + + } + + } + + public void restart() { + assertReady(); + if(BarrierTracing.BOOKKEEPING) { + BarrierTracing.restart(this); + } + } + + public void assertReady() { + int current = get(); + if (current != 0) + throw new AssertionError("Barrier was not finished (pending=" + + current + ")."); + } + + public void report() { + // System.out.println("Barrier log:"); + // for(Map.Entry entry : sources.entrySet()) { + // System.out.println(entry.getKey() + " " + entry.getValue()); + // } + // System.out.println("SyncIntProcedure log:"); + // for(Map.Entry entry : + // SyncIntProcedure.counters.entrySet()) { + // System.out.println(entry.getKey() + " " + entry.getValue()); + // } + } + + @Override + public String toString() { + return report(this); + // return "AsyncBarrierImpl@" + System.identityHashCode(this) + // + " - counter = " + get() + " - caller = " + caller; + } + } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java index 2eced2b7c..dfde4ec12 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java @@ -5662,7 +5662,7 @@ public class ReadGraphImpl implements AsyncReadGraph { this.parentGraph = parentGraph; this.parent = parent; this.processor = support; - this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, null); + this.asyncBarrier = prepareBarrier(parentGraph, parent, null, false); } ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support, AsyncBarrierImpl asyncBarrier) { @@ -5676,21 +5676,25 @@ public class ReadGraphImpl implements AsyncReadGraph { this(graph, parent, graph.processor); } - ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback) { - this(parentGraph, parent, parentGraph.processor, new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback)); + ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) { + this(parentGraph, parent, parentGraph.processor, prepareBarrier(parentGraph, parent, callback, needsToBlock)); } + static AsyncBarrierImpl prepareBarrier(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) { + return new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback, needsToBlock); + } + ReadGraphImpl(ReadGraphImpl graph) { this(graph, graph.parent); } - public ReadGraphImpl withParent(CacheEntry parent, Runnable callback) { - return new ReadGraphImpl(this, parent, callback); + public ReadGraphImpl withParent(CacheEntry parent, Runnable callback, boolean needsToBlock) { + return new ReadGraphImpl(this, parent, callback, needsToBlock); } - public ReadGraphImpl withParent(CacheEntry parent) { - return withParent(parent, null); - } +// public ReadGraphImpl withParent(CacheEntry parent) { +// return withParent(parent, null); +// } public ReadGraphImpl syncWithParent(CacheEntry parent) { return new ReadGraphImpl(this, parent, processor, null); diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ExternalReadEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ExternalReadEntry.java index 556d5fc3e..d04404339 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ExternalReadEntry.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ExternalReadEntry.java @@ -180,7 +180,7 @@ final public class ExternalReadEntry extends CacheEntryBase try { - ReadGraphImpl queryGraph = graph.withParent(this); + ReadGraphImpl queryGraph = graph.withParent(this, null, true); if(!registered) { id.register(graph, this); diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java index deeb65fdd..f80fb5098 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java @@ -106,7 +106,7 @@ public class QueryCacheBase { public Object performQuery(ReadGraphImpl parentGraph, final AsyncMultiRead query, final CacheEntryBase entry_, Object procedure_) throws DatabaseException { - ReadGraphImpl queryGraph = parentGraph.withParent(entry_); + ReadGraphImpl queryGraph = parentGraph.withParent(entry_, null, false); AsyncMultiReadEntry entry = (AsyncMultiReadEntry)entry_; AsyncMultiProcedure procedure = (AsyncMultiProcedure)procedure_; @@ -169,7 +169,7 @@ public class QueryCacheBase { public Object performQuery(ReadGraphImpl parentGraph, final MultiRead query, final CacheEntryBase entry_, Object procedure_) throws DatabaseException { - ReadGraphImpl queryGraph = parentGraph.withParent(entry_); + ReadGraphImpl queryGraph = parentGraph.withParent(entry_, null, true); MultiReadEntry entry = (MultiReadEntry)entry_; SyncMultiProcedure procedure = (SyncMultiProcedure)procedure_; diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index eaaa9b373..a46bc0e8d 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -141,10 +141,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final public QueryListening listening = new QueryListening(this); QueryThread[] executors; - - public LinkedList freeScheduling = new LinkedList(); - - public LinkedList topLevelTasks = new LinkedList(); enum ThreadState { @@ -152,6 +148,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } + final Scheduling scheduling; + public ThreadState[] threadStates; final Object querySupportLock; @@ -161,26 +159,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getSubTask(ReadGraphImpl parent) { - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.isSubtask(parent) && task.maybeReady()) { - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } /* * We are running errands while waiting for requests to complete. * We can only run work that is part of the current root request to avoid any deadlocks */ public boolean performPending(ReadGraphImpl under) { - SessionTask task = getSubTask(under); + SessionTask task = scheduling.getSubTask(under); if(task != null) { task.run(thread.get()); return true; @@ -195,28 +180,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } final public SessionTask scheduleOrReturnForExecution(SessionTask request) { - - assert(request != null); - - synchronized(querySupportLock) { - - LinkedList queue = request.rootGraph != null ? freeScheduling : topLevelTasks; - - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } - - queue.addFirst(request); - requests.release(); - - } - - return null; + + return scheduling.scheduleOrReturnForExecution(request); } @@ -298,6 +263,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap THREADS = threads; THREAD_MASK = threads - 1; + scheduling = new Scheduling(requests); + querySupport = core; cache = new QueryCache(core, threads); session = querySupport.getSession(); @@ -1888,12 +1855,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Exception callerException = null; - public interface AsyncBarrier { - public void inc(); - public void dec(); - // public void inc(String debug); - // public void dec(String debug); - } + public interface AsyncBarrier { + public void inc(); + public void dec(); + public void waitBarrier(Object request, ReadGraphImpl impl); + public boolean isBlocking(); + } // final public QueryProcessor processor; // final public QuerySupport support; diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java index cfa088a2d..7aa89b3f0 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java @@ -98,17 +98,7 @@ class QueryThread extends Thread implements SessionThread { } private boolean pumpTask() { - // First finish existing executions - if(!processor.freeScheduling.isEmpty()) { - tasks.add(processor.freeScheduling.removeFirst()); - return true; - } - // Check for new tasks - if(!processor.topLevelTasks.isEmpty()) { - tasks.add(processor.topLevelTasks.removeFirst()); - return true; - } - return false; + return processor.scheduling.pumpTask(tasks); } ArrayList newTasks(boolean doWait, ArrayList tasks) { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java index 9edcbb997..8f547d22b 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java @@ -100,10 +100,10 @@ public final class ReadEntry extends CacheEntryBase> implem AsyncProcedure procedure = entry != null ? entry : procedure_; - ReadGraphImpl queryGraph = graph.withParent(entry); + ReadGraphImpl queryGraph = graph.withParent(entry, null, needsToBlock); queryGraph.asyncBarrier.inc(); - ReadGraphImpl executeGraph = graph.withParent(graph.parent); + ReadGraphImpl executeGraph = graph.withParent(graph.parent, null, needsToBlock); executeGraph.asyncBarrier.inc(); try { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduling.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduling.java new file mode 100644 index 000000000..061435f25 --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduling.java @@ -0,0 +1,102 @@ +package org.simantics.db.impl.query; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.Semaphore; + +import org.simantics.db.impl.graph.BarrierTracing; +import org.simantics.db.impl.graph.ReadGraphImpl; +import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier; +import org.simantics.db.impl.query.QueryProcessor.SessionTask; + +public class Scheduling { + + private final Semaphore requests; + + private Map> freeScheduling = new HashMap<>(); + + private LinkedList topLevelTasks = new LinkedList(); + + public Scheduling(Semaphore requests) { + this.requests = requests; + } + + public SessionTask getSubTask(ReadGraphImpl parent) { + synchronized(this) { + assert(parent.asyncBarrier.isBlocking()); + LinkedList tasks = freeScheduling.get(parent.asyncBarrier); + if(tasks == null) + return null; + SessionTask task = tasks.removeLast(); + if(tasks.isEmpty()) + freeScheduling.remove(parent.asyncBarrier); + return task; + } + } + + public boolean pumpTask(ArrayList tasks) { + + synchronized(this) { + + // First finish existing executions + if(!freeScheduling.isEmpty()) { + Map.Entry> ls = freeScheduling.entrySet().iterator().next(); + assert(ls.getKey().isBlocking()); + tasks.add(ls.getValue().removeLast()); + if(ls.getValue().isEmpty()) + freeScheduling.remove(ls.getKey()); + return true; + } + // Check for new tasks + if(!topLevelTasks.isEmpty()) { + tasks.add(topLevelTasks.removeLast()); + return true; + } + + return false; + + } + + } + + final public SessionTask scheduleOrReturnForExecution(SessionTask request) { + + assert(request != null); + + synchronized(this) { + + if(BarrierTracing.BOOKKEEPING) { + Exception current = new Exception(); + Exception previous = BarrierTracing.tasks.put(request, current); + if(previous != null) { + previous.printStackTrace(); + current.printStackTrace(); + } + } + + requests.release(); + + if(request.rootGraph != null) { + AsyncBarrier sb = request.rootGraph.asyncBarrier.getBlockingBarrier(); + if(sb != null) { + LinkedList ls = freeScheduling.get(sb); + if(ls == null) { + ls = new LinkedList(); + freeScheduling.put(sb, ls); + } + ls.addFirst(request); + return null; + } + } + + topLevelTasks.addFirst(request); + + } + + return null; + + } + +} diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/QueryControlImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/QueryControlImpl.java index 7165abef3..237e8b50b 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/QueryControlImpl.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/QueryControlImpl.java @@ -139,7 +139,7 @@ public class QueryControlImpl implements QueryControl { @Override public ReadGraph getIndependentGraph(ReadGraph graph) { ReadGraphImpl impl = (ReadGraphImpl)graph; - return impl.withParent(null); + return impl.withParent(null, null, false); } @Override