X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;fp=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=46c857794303636849112477f417b46c65d694cd;hb=26b755c7e98b7bb3d9038abba139bef0e71f6607;hp=ee4b11175e4d2767cee0058d3938d8cd47125997;hpb=501ad95ad5ca980ef4c6e65af1451a0d7b63cddc;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index ee4b11175..46c857794 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -143,6 +143,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap QueryThread[] executors; public LinkedList freeScheduling = new LinkedList(); + + public LinkedList topLevelTasks = new LinkedList(); enum ThreadState { @@ -159,13 +161,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getSubTask(ReadGraphImpl impl) { + public SessionTask getSubTask(ReadGraphImpl parent) { synchronized(querySupportLock) { int index = 0; while(index < freeScheduling.size()) { SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) { - queueLength.decrementAndGet(); + if(task.isSubtask(parent) && task.maybeReady()) { return freeScheduling.remove(index); } index++; @@ -178,82 +179,46 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap * We are running errands while waiting for requests to complete. * We can only run work that is part of the current root request to avoid any deadlocks */ - public boolean performPending(ReadGraphImpl graph) { - SessionTask task = getSubTask(graph); + public boolean performPending(ReadGraphImpl under) { + SessionTask task = getSubTask(under); if(task != null) { - task.run(QueryProcessor.thread.get()); + task.run(thread.get()); return true; } return false; } - + final public void scheduleNow(SessionTask request) { - schedule(request, false); + SessionTask toExecute = scheduleOrReturnForExecution(request); + if(toExecute != null) + toExecute.run(thread.get()); } - final public void scheduleLater(SessionTask request) { - schedule(request, true); - } + final public SessionTask scheduleOrReturnForExecution(SessionTask request) { - AtomicInteger queueLength = new AtomicInteger(0); - - final public void schedule(SessionTask request, boolean late) { - - int queueLengthEstimate = queueLength.get(); - if(!late && queueLengthEstimate > 80) { - request.run(thread.get()); - return; - } - - assert(request != null); - - synchronized(querySupportLock) { + assert(request != null); - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } + synchronized(querySupportLock) { - if(late) { - int pos = request.position - 1; - if(pos < freeScheduling.size()) { - freeScheduling.add(pos, request); - queueLength.incrementAndGet(); - requests.release(); - } else { - freeScheduling.addLast(request); - queueLength.incrementAndGet(); - requests.release(); - } - } - else { - if(request.getLevel() < 4) { - if(freeScheduling.size() < 100) { - freeScheduling.addFirst(request); - queueLength.incrementAndGet(); - requests.release(); - } else { - request.run(thread.get()); - } - } else { - if(freeScheduling.size() < 20) { - freeScheduling.addFirst(request); - queueLength.incrementAndGet(); - requests.release(); - } else { - request.run(thread.get()); - } - } - } + LinkedList queue = request.rootGraph != null ? freeScheduling : topLevelTasks; + + if(BarrierTracing.BOOKKEEPING) { + Exception current = new Exception(); + Exception previous = BarrierTracing.tasks.put(request, current); + if(previous != null) { + previous.printStackTrace(); + current.printStackTrace(); + } + } + queue.addFirst(request); + requests.release(); - } + } - } + return null; + + } final int THREADS; @@ -263,20 +228,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public static abstract class SessionTask { - public final ReadGraphImpl graph; + final protected ReadGraphImpl rootGraph; private int counter = 0; protected int position = 1; private Exception trace; - public SessionTask(ReadGraphImpl graph) { - this.graph = graph; - if(graph != null) graph.asyncBarrier.inc(); + public SessionTask() { + this(null); } - - public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) { - if(r1 == null || r2 == null) return false; - return r1.getTopLevelGraph() == r2.getTopLevelGraph(); - } + + public SessionTask(ReadGraphImpl rootGraph) { + this.rootGraph = rootGraph; + } + + public boolean isSubtask(ReadGraphImpl graph) { + return graph.isParent(rootGraph); + } public abstract void run0(int thread); @@ -292,7 +259,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap trace = new Exception(); } run0(thread); - if(graph != null) graph.asyncBarrier.dec(); + //if(graph != null && graph.asyncBarrier != null) graph.asyncBarrier.dec(); } public boolean maybeReady() { @@ -301,16 +268,16 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override public String toString() { - if(graph == null) +// if(graph == null) return "SessionTask[no graph]"; - else - return "SessionTask[" + graph.parent + "]"; +// else +// return "SessionTask[" + graph.parent + "]"; } - public int getLevel() { - if(graph == null) return 0; - else return graph.getLevel(); - } +// public int getLevel() { +// if(graph == null) return 0; +// else return graph.getLevel(); +// } }