X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;fp=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=ee4b11175e4d2767cee0058d3938d8cd47125997;hp=7bfa0f4c6195659c21efdb0e6bcf99fed0dfb7bb;hb=5f5747460a3330a1317395fedb60db33f6c00ab2;hpb=ceff53434e48f94c5d84ce12f3b1e21cabda457b diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 7bfa0f4c6..ee4b11175 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -136,11 +136,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final public Session session; final public ResourceSupport resourceSupport; + final public Semaphore requests = new Semaphore(1); + final public QueryListening listening = new QueryListening(this); QueryThread[] executors; - -// public ArrayList[] queues; public LinkedList freeScheduling = new LinkedList(); @@ -151,14 +151,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } public ThreadState[] threadStates; -// public ReentrantLock[] threadLocks; -// public Condition[] threadConditions; - - //public ArrayList[] ownTasks; - - //public ArrayList[] ownSyncTasks; - - //ArrayList[] delayQueues; final Object querySupportLock; @@ -167,28 +159,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getOwnTask(ReadGraphImpl impl) { - Set ancestors = impl.ancestorSet(); - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(ancestors)) { - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } - public SessionTask getSubTask(ReadGraphImpl impl) { - Set onlyThis = Collections.singleton(impl); synchronized(querySupportLock) { int index = 0; while(index < freeScheduling.size()) { SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(onlyThis)) { + if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) { + queueLength.decrementAndGet(); return freeScheduling.remove(index); } index++; @@ -197,68 +174,84 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap return null; } - public boolean performPending(ReadGraphImpl graph) { - SessionTask task = getOwnTask(graph); + /* + * We are running errands while waiting for requests to complete. + * We can only run work that is part of the current root request to avoid any deadlocks + */ + public boolean performPending(ReadGraphImpl graph) { + SessionTask task = getSubTask(graph); if(task != null) { task.run(QueryProcessor.thread.get()); return true; - } else { - return false; } + return false; } -// final public void scheduleOwn(int caller, SessionTask request) { -// ownTasks[caller].add(request); -// } - - final public void schedule(SessionTask request) { - - //int performer = request.thread; - -// if(DebugPolicy.SCHEDULE) -// System.out.println("schedule " + request + " " + " -> " + performer); + final public void scheduleNow(SessionTask request) { + schedule(request, false); + } - //assert(performer >= 0); + final public void scheduleLater(SessionTask request) { + schedule(request, true); + } + AtomicInteger queueLength = new AtomicInteger(0); + + final public void schedule(SessionTask request, boolean late) { + + int queueLengthEstimate = queueLength.get(); + if(!late && queueLengthEstimate > 80) { + request.run(thread.get()); + return; + } + assert(request != null); + + synchronized(querySupportLock) { -// if(caller == performer) { -// request.run(caller); -// } else { - -// if(performer == THREADS) { - - synchronized(querySupportLock) { - - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } - - freeScheduling.add(request); - - querySupportLock.notifyAll(); + if(BarrierTracing.BOOKKEEPING) { + Exception current = new Exception(); + Exception previous = BarrierTracing.tasks.put(request, current); + if(previous != null) { + previous.printStackTrace(); + current.printStackTrace(); + } + } - } + if(late) { + int pos = request.position - 1; + if(pos < freeScheduling.size()) { + freeScheduling.add(pos, request); + queueLength.incrementAndGet(); + requests.release(); + } else { + freeScheduling.addLast(request); + queueLength.incrementAndGet(); + requests.release(); + } + } + else { + if(request.getLevel() < 4) { + if(freeScheduling.size() < 100) { + freeScheduling.addFirst(request); + queueLength.incrementAndGet(); + requests.release(); + } else { + request.run(thread.get()); + } + } else { + if(freeScheduling.size() < 20) { + freeScheduling.addFirst(request); + queueLength.incrementAndGet(); + requests.release(); + } else { + request.run(thread.get()); + } + } + } - return; - -// } -// -// ReentrantLock queueLock = threadLocks[performer]; -// queueLock.lock(); -// queues[performer].add(request); -// // This thread could have been sleeping -// if(queues[performer].size() == 1) { -// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet(); -// threadConditions[performer].signalAll(); -// } -// queueLock.unlock(); -// } + + } } @@ -271,8 +264,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public static abstract class SessionTask { public final ReadGraphImpl graph; - private Set ancestors; private int counter = 0; + protected int position = 1; private Exception trace; public SessionTask(ReadGraphImpl graph) { @@ -280,13 +273,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap if(graph != null) graph.asyncBarrier.inc(); } - public boolean hasCommonParent(Set otherAncestors) { - if(graph == null) return false; - if(ancestors == null) ancestors = graph.ancestorSet(); - return !Collections.disjoint(ancestors, otherAncestors); - } + public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) { + if(r1 == null || r2 == null) return false; + return r1.getTopLevelGraph() == r2.getTopLevelGraph(); + } - public abstract void run0(int thread); + public abstract void run0(int thread); public final void run(int thread) { if(counter++ > 0) { @@ -302,10 +294,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap run0(thread); if(graph != null) graph.asyncBarrier.dec(); } + + public boolean maybeReady() { + return true; + } @Override public String toString() { - return "SessionTask[" + graph.parent + "]"; + if(graph == null) + return "SessionTask[no graph]"; + else + return "SessionTask[" + graph.parent + "]"; + } + + public int getLevel() { + if(graph == null) return 0; + else return graph.getLevel(); } } @@ -1793,6 +1797,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override protected void doDispose() { + requests.release(Integer.MAX_VALUE / 2); + for(int index = 0; index < THREADS; index++) { executors[index].dispose(); }