X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;fp=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=eaaa9b3730976265c0673dc4547bbe35b3d4904c;hp=ee4b11175e4d2767cee0058d3938d8cd47125997;hb=90a52d58489b6c518b13dde238923125c8f63eec;hpb=bb61be97905f72c01fd99e21c263546c88edc5f7 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index ee4b11175..eaaa9b373 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -143,6 +143,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap QueryThread[] executors; public LinkedList freeScheduling = new LinkedList(); + + public LinkedList topLevelTasks = new LinkedList(); enum ThreadState { @@ -159,13 +161,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getSubTask(ReadGraphImpl impl) { + public SessionTask getSubTask(ReadGraphImpl parent) { synchronized(querySupportLock) { int index = 0; while(index < freeScheduling.size()) { SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) { - queueLength.decrementAndGet(); + if(task.isSubtask(parent) && task.maybeReady()) { return freeScheduling.remove(index); } index++; @@ -178,82 +179,46 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap * We are running errands while waiting for requests to complete. * We can only run work that is part of the current root request to avoid any deadlocks */ - public boolean performPending(ReadGraphImpl graph) { - SessionTask task = getSubTask(graph); + public boolean performPending(ReadGraphImpl under) { + SessionTask task = getSubTask(under); if(task != null) { - task.run(QueryProcessor.thread.get()); + task.run(thread.get()); return true; } return false; } - + final public void scheduleNow(SessionTask request) { - schedule(request, false); + SessionTask toExecute = scheduleOrReturnForExecution(request); + if(toExecute != null) + toExecute.run(thread.get()); } - final public void scheduleLater(SessionTask request) { - schedule(request, true); - } + final public SessionTask scheduleOrReturnForExecution(SessionTask request) { - AtomicInteger queueLength = new AtomicInteger(0); - - final public void schedule(SessionTask request, boolean late) { - - int queueLengthEstimate = queueLength.get(); - if(!late && queueLengthEstimate > 80) { - request.run(thread.get()); - return; - } - - assert(request != null); - - synchronized(querySupportLock) { + assert(request != null); - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } + synchronized(querySupportLock) { - if(late) { - int pos = request.position - 1; - if(pos < freeScheduling.size()) { - freeScheduling.add(pos, request); - queueLength.incrementAndGet(); - requests.release(); - } else { - freeScheduling.addLast(request); - queueLength.incrementAndGet(); - requests.release(); - } - } - else { - if(request.getLevel() < 4) { - if(freeScheduling.size() < 100) { - freeScheduling.addFirst(request); - queueLength.incrementAndGet(); - requests.release(); - } else { - request.run(thread.get()); - } - } else { - if(freeScheduling.size() < 20) { - freeScheduling.addFirst(request); - queueLength.incrementAndGet(); - requests.release(); - } else { - request.run(thread.get()); - } - } - } + LinkedList queue = request.rootGraph != null ? freeScheduling : topLevelTasks; + + if(BarrierTracing.BOOKKEEPING) { + Exception current = new Exception(); + Exception previous = BarrierTracing.tasks.put(request, current); + if(previous != null) { + previous.printStackTrace(); + current.printStackTrace(); + } + } + queue.addFirst(request); + requests.release(); - } + } - } + return null; + + } final int THREADS; @@ -263,20 +228,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public static abstract class SessionTask { - public final ReadGraphImpl graph; + final protected ReadGraphImpl rootGraph; private int counter = 0; protected int position = 1; private Exception trace; - public SessionTask(ReadGraphImpl graph) { - this.graph = graph; - if(graph != null) graph.asyncBarrier.inc(); + public SessionTask() { + this(null); } - - public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) { - if(r1 == null || r2 == null) return false; - return r1.getTopLevelGraph() == r2.getTopLevelGraph(); - } + + public SessionTask(ReadGraphImpl rootGraph) { + this.rootGraph = rootGraph; + } + + public boolean isSubtask(ReadGraphImpl graph) { + return graph.isParent(rootGraph); + } public abstract void run0(int thread); @@ -292,7 +259,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap trace = new Exception(); } run0(thread); - if(graph != null) graph.asyncBarrier.dec(); } public boolean maybeReady() { @@ -301,15 +267,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override public String toString() { - if(graph == null) + if(rootGraph == null) return "SessionTask[no graph]"; else - return "SessionTask[" + graph.parent + "]"; - } - - public int getLevel() { - if(graph == null) return 0; - else return graph.getLevel(); + return "SessionTask[" + rootGraph.parent + "]"; } } @@ -327,37 +288,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - long waitingTime = 0; - - static int koss = 0; - static int koss2 = 0; - public boolean resume(ReadGraphImpl graph) { return executors[0].runSynchronized(); } - - //private WeakReference garbageTracker; - - private class GarbageTracker { - - @Override - protected void finalize() throws Throwable { - -// System.err.println("GarbageTracker"); -// -// garbageTracker = new WeakReference(new GarbageTracker()); - - super.finalize(); - - } - - } public QueryProcessor(final int threads, QuerySupport core, Set threadSet) throws DatabaseException { - //garbageTracker = new WeakReference(new GarbageTracker()); - THREADS = threads; THREAD_MASK = threads - 1; @@ -368,31 +305,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap querySupportLock = core.getLock(); executors = new QueryThread[THREADS]; -// queues = new ArrayList[THREADS]; -// threadLocks = new ReentrantLock[THREADS]; -// threadConditions = new Condition[THREADS]; threadStates = new ThreadState[THREADS]; -// ownTasks = new ArrayList[THREADS]; -// ownSyncTasks = new ArrayList[THREADS]; -// delayQueues = new ArrayList[THREADS * THREADS]; - - // freeSchedule = new AtomicInteger(0); - -// for (int i = 0; i < THREADS * THREADS; i++) { -// delayQueues[i] = new ArrayList(); -// } for (int i = 0; i < THREADS; i++) { - - // tasks[i] = new ArrayList(); -// ownTasks[i] = new ArrayList(); -// ownSyncTasks[i] = new ArrayList(); -// queues[i] = new ArrayList(); -// threadLocks[i] = new ReentrantLock(); -// threadConditions[i] = threadLocks[i].newCondition(); - // limits[i] = false; threadStates[i] = ThreadState.INIT; - } for (int i = 0; i < THREADS; i++) {