public final ReadGraphImpl callerGraph;
public final AsyncProcedure<Result> procedure;
public PendingTaskSupport pendingTaskSupport;
+ public final boolean needsToBlock;
public Object result = NO_RESULT;
public Throwable exception = null;
}
public BlockingAsyncProcedure(ReadGraphImpl callerGraph, AsyncReadEntry<Result> entry, AsyncProcedure<Result> procedure, Object key, boolean needsToBlock) {
-
+
// A new graph for evaluating the query with correct parent and asyncBarrier
queryGraph = callerGraph.withParent(entry, () -> {
dispatchProcedure(queryGraph(), callerGraph, entry, procedure, needsToBlock);
- });
+ }, needsToBlock);
queryGraph.asyncBarrier.inc();
this.key = key;
this.queryGraph.asyncBarrier.inc();
this.callerGraph = callerGraph;
+ this.needsToBlock = needsToBlock;
if (BarrierTracing.BOOKKEEPING) {
BarrierTracing.registerBAP(this);
}
@SuppressWarnings("unchecked")
public Result get() throws DatabaseException {
- queryGraph.asyncBarrier.waitBarrier(key, queryGraph);
+ if(needsToBlock)
+ queryGraph.asyncBarrier.waitBarrier(key, queryGraph);
if(exception != null) {
if(exception instanceof DatabaseException) throw (DatabaseException)exception;
AsyncProcedure<Result> procedure = entry != null ? entry : procedure_;
- ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent);
+ ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent, null, needsToBlock);
executeGraph.asyncBarrier.inc();
try {
if(procedure != null) {
import org.simantics.db.exception.RuntimeDatabaseException;
import org.simantics.db.impl.query.CacheEntry;
import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
-import org.simantics.db.impl.query.QueryProcessor.SessionTask;
-final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
+public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
- private static final long serialVersionUID = 4724463372850048672L;
+ private static final long serialVersionUID = 4724463372850048672L;
- static final int WAIT_TIME = 60000;
+ static final int WAIT_TIME = 60000;
- public static final boolean PRINT = false;
+ public static final boolean PRINT = false;
- final public AsyncBarrierImpl caller;
-
- final public Runnable callback;
+ final AsyncBarrierImpl caller;
- public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry, Runnable callback) {
- super(0);
- this.caller = caller;
- this.callback = callback;
+ private final Runnable callback;
+
+ private final boolean needsToBlock;
+
+ public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry, Runnable callback, boolean needsToBlock) {
+ super(0);
+ this.caller = caller;
+ this.callback = callback;
+ this.needsToBlock = needsToBlock;
if (BarrierTracing.BOOKKEEPING) {
BarrierTracing.trace(this, entry);
}
- }
-
-
- public void inc() {
+ }
+
+ public AsyncBarrier getBlockingBarrier() {
+ if(needsToBlock)
+ return this;
+ if(caller == null)
+ return null;
+ else return caller.getBlockingBarrier();
+ }
+
+ @Override
+ public boolean isBlocking() {
+ return needsToBlock;
+ }
+
+ @Override
+ public void inc() {
+
+ if(BarrierTracing.BOOKKEEPING) {
+ BarrierTracing.inc(this);
+ } else {
+ inc(null, null);
+ }
- if(BarrierTracing.BOOKKEEPING) {
- BarrierTracing.inc(this);
- } else {
- inc(null, null);
- }
-
- }
+ }
- void inc(Object id, String info) {
+ void inc(Object id, String info) {
- if(PRINT) {
- System.err.println("inc barrier[" + get() + "] " + this);
- StackTraceElement[] elems = new Exception().getStackTrace();
- for(int i=0;i<4;i++) System.err.println(elems[i]);
- }
+ if(PRINT) {
+ System.err.println("inc barrier[" + get() + "] " + this);
+ StackTraceElement[] elems = new Exception().getStackTrace();
+ for(int i=0;i<4;i++) System.err.println(elems[i]);
+ }
- if (incrementAndGet() == 1) {
- if (caller != null) {
- if(BarrierTracing.BOOKKEEPING) {
+ if (incrementAndGet() == 1) {
+ if (caller != null) {
+ if(BarrierTracing.BOOKKEEPING) {
caller.inc(this, "Child");
- } else {
+ } else {
caller.inc(null, null);
- }
- }
- }
+ }
+ }
+ }
- }
+ }
- public void dec() {
+ @Override
+ public void dec() {
- if(PRINT) {
- System.err.println("dec barrier[" + get() + "] " + this);
- StackTraceElement[] elems = new Exception().getStackTrace();
- for(int i=0;i<3;i++) System.err.println(elems[i]);
- }
+ if(PRINT) {
+ System.err.println("dec barrier[" + get() + "] " + this);
+ StackTraceElement[] elems = new Exception().getStackTrace();
+ for(int i=0;i<3;i++) System.err.println(elems[i]);
+ }
- int count = decrementAndGet();
- if (count < 1) {
+ int count = decrementAndGet();
+ if (count < 1) {
if(BarrierTracing.BOOKKEEPING) {
BarrierTracing.dec(this, count);
}
- if (count == 0) {
- if (caller != null) {
- caller.dec();
- }
- }
- if (count < 0) {
- Logger.defaultLogError(
- "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
- new Exception());
- }
- assert (count >= 0);
-
- if(callback != null)
- callback.run();
-
- }
-
- }
-
- public static String report(AsyncBarrierImpl barrier) {
- CacheEntry<?> e = BarrierTracing.entryMap.get(barrier);
- if(e != null) return e.toString();
- else return "Barrier@" + System.identityHashCode(barrier);
- }
-
- public static void printReverse(AsyncBarrierImpl barrier, int indent) {
-
- if (barrier.get() == 0)
- return;
- for (int i = 0; i < indent; i++)
- System.err.print(" ");
- System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
-// if (BOOKKEEPING) {
-// Debugger debugger = debuggerMap.get(barrier);
-// debugger.toErr(indent + 2);
-// }
-
- Collection<AsyncBarrierImpl> children = BarrierTracing.reverseLookup.get(barrier);
- if (children != null) {
- for (AsyncBarrierImpl child : children)
- printReverse(child, indent + 2);
- }
-
- }
-
- public void waitBarrier(Object request, ReadGraphImpl impl) {
-
- if (get() > 0) {
-
- long waitCount = 0;
-
- while (get() != 0) {
-
- boolean executed = impl.performPending();
- if(executed) waitCount = 0;
-
- ++waitCount;
- if(waitCount > 100) Thread.yield();
- if(waitCount > 1000) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- if(waitCount > WAIT_TIME) {
-
- System.err.println("AsyncBarrierImpl.waitBarrier("
- + request
- + ") is taking long to execute, so far "
- + (waitCount / 1000) + " s.");
-
- if (BarrierTracing.BOOKKEEPING) {
- synchronized (BarrierTracing.reverseLookup) {
- printReverse(this, 0);
- }
- BarrierTracing.printBAPS();
- }
-
- for(SessionTask t : impl.processor.freeScheduling) {
- System.err.println("Pending task:" + t);
- }
-
-// if(Development.DEVELOPMENT) {
-// impl.processor.threadLocks[0].lock();
-// System.err.println("-queues=" + impl.processor.queues[0].size());
-// impl.processor.threadLocks[0].unlock();
-// System.err.println("-own=" + impl.processor.ownTasks[0].size());
-// System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size());
-// for(SessionTask task : impl.processor.ownSyncTasks[0]) {
-// System.err.println("--" + task);
-// }
-// }
-
- throw new RuntimeDatabaseException("Request timed out.");
- //waitCount = 0;
-
- }
-
- }
-
- }
-
- }
-
- public void restart() {
- assertReady();
- if(BarrierTracing.BOOKKEEPING) {
- BarrierTracing.restart(this);
- }
- }
-
- public void assertReady() {
- int current = get();
- if (current != 0)
- throw new AssertionError("Barrier was not finished (pending="
- + current + ").");
- }
-
- public void report() {
- // System.out.println("Barrier log:");
- // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
- // System.out.println(entry.getKey() + " " + entry.getValue());
- // }
- // System.out.println("SyncIntProcedure log:");
- // for(Map.Entry<String, Integer> entry :
- // SyncIntProcedure.counters.entrySet()) {
- // System.out.println(entry.getKey() + " " + entry.getValue());
- // }
- }
-
- @Override
- public String toString() {
- return report(this);
-// return "AsyncBarrierImpl@" + System.identityHashCode(this)
-// + " - counter = " + get() + " - caller = " + caller;
- }
+ if (count == 0) {
+ if (caller != null) {
+ caller.dec();
+ }
+ }
+ if (count < 0) {
+ Logger.defaultLogError(
+ "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
+ new Exception());
+ }
+ assert (count >= 0);
+
+ if(callback != null)
+ callback.run();
+
+ }
+
+ }
+
+ public static String report(AsyncBarrierImpl barrier) {
+ CacheEntry<?> e = BarrierTracing.entryMap.get(barrier);
+ if(e != null) return e.toString();
+ else return "Barrier@" + System.identityHashCode(barrier);
+ }
+
+ public static void printReverse(AsyncBarrierImpl barrier, int indent) {
+
+ if (barrier.get() == 0)
+ return;
+ for (int i = 0; i < indent; i++)
+ System.err.print(" ");
+ System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
+
+ Collection<AsyncBarrierImpl> children = BarrierTracing.reverseLookup.get(barrier);
+ if (children != null) {
+ for (AsyncBarrierImpl child : children)
+ printReverse(child, indent + 2);
+ }
+
+ }
+
+ public void waitBarrier(Object request, ReadGraphImpl impl) {
+
+ if (get() > 0) {
+
+ long waitCount = 0;
+
+ while (get() != 0) {
+
+ boolean executed = impl.performPending();
+ if(executed) waitCount = 0;
+
+ ++waitCount;
+ if(waitCount > 100) Thread.yield();
+ if(waitCount > 1000) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if(waitCount > WAIT_TIME) {
+
+ System.err.println("AsyncBarrierImpl.waitBarrier("
+ + request
+ + ") is taking long to execute, so far "
+ + (waitCount / 1000) + " s.");
+
+ if (BarrierTracing.BOOKKEEPING) {
+ synchronized (BarrierTracing.reverseLookup) {
+ printReverse(this, 0);
+ }
+ BarrierTracing.printBAPS();
+ }
+
+ throw new RuntimeDatabaseException("Request timed out.");
+
+ }
+
+ }
+
+ }
+
+ }
+
+ public void restart() {
+ assertReady();
+ if(BarrierTracing.BOOKKEEPING) {
+ BarrierTracing.restart(this);
+ }
+ }
+
+ public void assertReady() {
+ int current = get();
+ if (current != 0)
+ throw new AssertionError("Barrier was not finished (pending="
+ + current + ").");
+ }
+
+ public void report() {
+ // System.out.println("Barrier log:");
+ // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
+ // System.out.println(entry.getKey() + " " + entry.getValue());
+ // }
+ // System.out.println("SyncIntProcedure log:");
+ // for(Map.Entry<String, Integer> entry :
+ // SyncIntProcedure.counters.entrySet()) {
+ // System.out.println(entry.getKey() + " " + entry.getValue());
+ // }
+ }
+
+ @Override
+ public String toString() {
+ return report(this);
+ // return "AsyncBarrierImpl@" + System.identityHashCode(this)
+ // + " - counter = " + get() + " - caller = " + caller;
+ }
+
}
this.parentGraph = parentGraph;
this.parent = parent;
this.processor = support;
- this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, null);
+ this.asyncBarrier = prepareBarrier(parentGraph, parent, null, false);
}
ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support, AsyncBarrierImpl asyncBarrier) {
this(graph, parent, graph.processor);
}
- ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback) {
- this(parentGraph, parent, parentGraph.processor, new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback));
+ ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ this(parentGraph, parent, parentGraph.processor, prepareBarrier(parentGraph, parent, callback, needsToBlock));
}
+ static AsyncBarrierImpl prepareBarrier(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ return new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback, needsToBlock);
+ }
+
ReadGraphImpl(ReadGraphImpl graph) {
this(graph, graph.parent);
}
- public ReadGraphImpl withParent(CacheEntry parent, Runnable callback) {
- return new ReadGraphImpl(this, parent, callback);
+ public ReadGraphImpl withParent(CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ return new ReadGraphImpl(this, parent, callback, needsToBlock);
}
- public ReadGraphImpl withParent(CacheEntry parent) {
- return withParent(parent, null);
- }
+// public ReadGraphImpl withParent(CacheEntry parent) {
+// return withParent(parent, null);
+// }
public ReadGraphImpl syncWithParent(CacheEntry parent) {
return new ReadGraphImpl(this, parent, processor, null);
try {
- ReadGraphImpl queryGraph = graph.withParent(this);
+ ReadGraphImpl queryGraph = graph.withParent(this, null, true);
if(!registered) {
id.register(graph, this);
public <T> Object performQuery(ReadGraphImpl parentGraph, final AsyncMultiRead<T> query, final CacheEntryBase entry_, Object procedure_) throws DatabaseException {
- ReadGraphImpl queryGraph = parentGraph.withParent(entry_);
+ ReadGraphImpl queryGraph = parentGraph.withParent(entry_, null, false);
AsyncMultiReadEntry entry = (AsyncMultiReadEntry)entry_;
AsyncMultiProcedure<T> procedure = (AsyncMultiProcedure<T>)procedure_;
public <T> Object performQuery(ReadGraphImpl parentGraph, final MultiRead<T> query, final CacheEntryBase entry_, Object procedure_) throws DatabaseException {
- ReadGraphImpl queryGraph = parentGraph.withParent(entry_);
+ ReadGraphImpl queryGraph = parentGraph.withParent(entry_, null, true);
MultiReadEntry entry = (MultiReadEntry)entry_;
SyncMultiProcedure<T> procedure = (SyncMultiProcedure<T>)procedure_;
final public QueryListening listening = new QueryListening(this);
QueryThread[] executors;
-
- public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
-
- public LinkedList<SessionTask> topLevelTasks = new LinkedList<SessionTask>();
enum ThreadState {
}
+ final Scheduling scheduling;
+
public ThreadState[] threadStates;
final Object querySupportLock;
public void close() {
}
- public SessionTask getSubTask(ReadGraphImpl parent) {
- synchronized(querySupportLock) {
- int index = 0;
- while(index < freeScheduling.size()) {
- SessionTask task = freeScheduling.get(index);
- if(task.isSubtask(parent) && task.maybeReady()) {
- return freeScheduling.remove(index);
- }
- index++;
- }
- }
- return null;
- }
/*
* We are running errands while waiting for requests to complete.
* We can only run work that is part of the current root request to avoid any deadlocks
*/
public boolean performPending(ReadGraphImpl under) {
- SessionTask task = getSubTask(under);
+ SessionTask task = scheduling.getSubTask(under);
if(task != null) {
task.run(thread.get());
return true;
}
final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
-
- assert(request != null);
-
- synchronized(querySupportLock) {
-
- LinkedList<SessionTask> queue = request.rootGraph != null ? freeScheduling : topLevelTasks;
-
- if(BarrierTracing.BOOKKEEPING) {
- Exception current = new Exception();
- Exception previous = BarrierTracing.tasks.put(request, current);
- if(previous != null) {
- previous.printStackTrace();
- current.printStackTrace();
- }
- }
-
- queue.addFirst(request);
- requests.release();
-
- }
-
- return null;
+
+ return scheduling.scheduleOrReturnForExecution(request);
}
THREADS = threads;
THREAD_MASK = threads - 1;
+ scheduling = new Scheduling(requests);
+
querySupport = core;
cache = new QueryCache(core, threads);
session = querySupport.getSession();
Exception callerException = null;
- public interface AsyncBarrier {
- public void inc();
- public void dec();
- // public void inc(String debug);
- // public void dec(String debug);
- }
+ public interface AsyncBarrier {
+ public void inc();
+ public void dec();
+ public void waitBarrier(Object request, ReadGraphImpl impl);
+ public boolean isBlocking();
+ }
// final public QueryProcessor processor;
// final public QuerySupport support;
}
private boolean pumpTask() {
- // First finish existing executions
- if(!processor.freeScheduling.isEmpty()) {
- tasks.add(processor.freeScheduling.removeFirst());
- return true;
- }
- // Check for new tasks
- if(!processor.topLevelTasks.isEmpty()) {
- tasks.add(processor.topLevelTasks.removeFirst());
- return true;
- }
- return false;
+ return processor.scheduling.pumpTask(tasks);
}
ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
AsyncProcedure<T> procedure = entry != null ? entry : procedure_;
- ReadGraphImpl queryGraph = graph.withParent(entry);
+ ReadGraphImpl queryGraph = graph.withParent(entry, null, needsToBlock);
queryGraph.asyncBarrier.inc();
- ReadGraphImpl executeGraph = graph.withParent(graph.parent);
+ ReadGraphImpl executeGraph = graph.withParent(graph.parent, null, needsToBlock);
executeGraph.asyncBarrier.inc();
try {
--- /dev/null
+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+import org.simantics.db.impl.graph.BarrierTracing;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
+
+public class Scheduling {
+
+ private final Semaphore requests;
+
+ private Map<AsyncBarrier, LinkedList<SessionTask>> freeScheduling = new HashMap<>();
+
+ private LinkedList<SessionTask> topLevelTasks = new LinkedList<SessionTask>();
+
+ public Scheduling(Semaphore requests) {
+ this.requests = requests;
+ }
+
+ public SessionTask getSubTask(ReadGraphImpl parent) {
+ synchronized(this) {
+ assert(parent.asyncBarrier.isBlocking());
+ LinkedList<SessionTask> tasks = freeScheduling.get(parent.asyncBarrier);
+ if(tasks == null)
+ return null;
+ SessionTask task = tasks.removeLast();
+ if(tasks.isEmpty())
+ freeScheduling.remove(parent.asyncBarrier);
+ return task;
+ }
+ }
+
+ public boolean pumpTask(ArrayList<SessionTask> tasks) {
+
+ synchronized(this) {
+
+ // First finish existing executions
+ if(!freeScheduling.isEmpty()) {
+ Map.Entry<AsyncBarrier, LinkedList<SessionTask>> ls = freeScheduling.entrySet().iterator().next();
+ assert(ls.getKey().isBlocking());
+ tasks.add(ls.getValue().removeLast());
+ if(ls.getValue().isEmpty())
+ freeScheduling.remove(ls.getKey());
+ return true;
+ }
+ // Check for new tasks
+ if(!topLevelTasks.isEmpty()) {
+ tasks.add(topLevelTasks.removeLast());
+ return true;
+ }
+
+ return false;
+
+ }
+
+ }
+
+ final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
+
+ assert(request != null);
+
+ synchronized(this) {
+
+ if(BarrierTracing.BOOKKEEPING) {
+ Exception current = new Exception();
+ Exception previous = BarrierTracing.tasks.put(request, current);
+ if(previous != null) {
+ previous.printStackTrace();
+ current.printStackTrace();
+ }
+ }
+
+ requests.release();
+
+ if(request.rootGraph != null) {
+ AsyncBarrier sb = request.rootGraph.asyncBarrier.getBlockingBarrier();
+ if(sb != null) {
+ LinkedList<SessionTask> ls = freeScheduling.get(sb);
+ if(ls == null) {
+ ls = new LinkedList<SessionTask>();
+ freeScheduling.put(sb, ls);
+ }
+ ls.addFirst(request);
+ return null;
+ }
+ }
+
+ topLevelTasks.addFirst(request);
+
+ }
+
+ return null;
+
+ }
+
+}
@Override
public ReadGraph getIndependentGraph(ReadGraph graph) {
ReadGraphImpl impl = (ReadGraphImpl)graph;
- return impl.withParent(null);
+ return impl.withParent(null, null, false);
}
@Override