X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=a46bc0e8dd4e1b024dea3b0dd8d7269b10a19192;hp=ee4b11175e4d2767cee0058d3938d8cd47125997;hb=a88c02c2d0f4250caf887a130b9f4314c6564722;hpb=5f5747460a3330a1317395fedb60db33f6c00ab2 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index ee4b11175..a46bc0e8d 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -141,8 +141,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final public QueryListening listening = new QueryListening(this); QueryThread[] executors; - - public LinkedList freeScheduling = new LinkedList(); enum ThreadState { @@ -150,6 +148,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } + final Scheduling scheduling; + public ThreadState[] threadStates; final Object querySupportLock; @@ -159,101 +159,31 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getSubTask(ReadGraphImpl impl) { - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) { - queueLength.decrementAndGet(); - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } /* * We are running errands while waiting for requests to complete. * We can only run work that is part of the current root request to avoid any deadlocks */ - public boolean performPending(ReadGraphImpl graph) { - SessionTask task = getSubTask(graph); + public boolean performPending(ReadGraphImpl under) { + SessionTask task = scheduling.getSubTask(under); if(task != null) { - task.run(QueryProcessor.thread.get()); + task.run(thread.get()); return true; } return false; } - + final public void scheduleNow(SessionTask request) { - schedule(request, false); + SessionTask toExecute = scheduleOrReturnForExecution(request); + if(toExecute != null) + toExecute.run(thread.get()); } - final public void scheduleLater(SessionTask request) { - schedule(request, true); - } - - AtomicInteger queueLength = new AtomicInteger(0); - - final public void schedule(SessionTask request, boolean late) { - - int queueLengthEstimate = queueLength.get(); - if(!late && queueLengthEstimate > 80) { - request.run(thread.get()); - return; - } + final public SessionTask scheduleOrReturnForExecution(SessionTask request) { - assert(request != null); - - synchronized(querySupportLock) { + return scheduling.scheduleOrReturnForExecution(request); - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } - - if(late) { - int pos = request.position - 1; - if(pos < freeScheduling.size()) { - freeScheduling.add(pos, request); - queueLength.incrementAndGet(); - requests.release(); - } else { - freeScheduling.addLast(request); - queueLength.incrementAndGet(); - requests.release(); - } - } - else { - if(request.getLevel() < 4) { - if(freeScheduling.size() < 100) { - freeScheduling.addFirst(request); - queueLength.incrementAndGet(); - requests.release(); - } else { - request.run(thread.get()); - } - } else { - if(freeScheduling.size() < 20) { - freeScheduling.addFirst(request); - queueLength.incrementAndGet(); - requests.release(); - } else { - request.run(thread.get()); - } - } - } - - - } - - } + } final int THREADS; @@ -263,20 +193,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public static abstract class SessionTask { - public final ReadGraphImpl graph; + final protected ReadGraphImpl rootGraph; private int counter = 0; protected int position = 1; private Exception trace; - public SessionTask(ReadGraphImpl graph) { - this.graph = graph; - if(graph != null) graph.asyncBarrier.inc(); + public SessionTask() { + this(null); } - - public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) { - if(r1 == null || r2 == null) return false; - return r1.getTopLevelGraph() == r2.getTopLevelGraph(); - } + + public SessionTask(ReadGraphImpl rootGraph) { + this.rootGraph = rootGraph; + } + + public boolean isSubtask(ReadGraphImpl graph) { + return graph.isParent(rootGraph); + } public abstract void run0(int thread); @@ -292,7 +224,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap trace = new Exception(); } run0(thread); - if(graph != null) graph.asyncBarrier.dec(); } public boolean maybeReady() { @@ -301,15 +232,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override public String toString() { - if(graph == null) + if(rootGraph == null) return "SessionTask[no graph]"; else - return "SessionTask[" + graph.parent + "]"; - } - - public int getLevel() { - if(graph == null) return 0; - else return graph.getLevel(); + return "SessionTask[" + rootGraph.parent + "]"; } } @@ -327,40 +253,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - long waitingTime = 0; - - static int koss = 0; - static int koss2 = 0; - public boolean resume(ReadGraphImpl graph) { return executors[0].runSynchronized(); } - - //private WeakReference garbageTracker; - - private class GarbageTracker { - - @Override - protected void finalize() throws Throwable { - -// System.err.println("GarbageTracker"); -// -// garbageTracker = new WeakReference(new GarbageTracker()); - - super.finalize(); - - } - - } public QueryProcessor(final int threads, QuerySupport core, Set threadSet) throws DatabaseException { - //garbageTracker = new WeakReference(new GarbageTracker()); - THREADS = threads; THREAD_MASK = threads - 1; + scheduling = new Scheduling(requests); + querySupport = core; cache = new QueryCache(core, threads); session = querySupport.getSession(); @@ -368,31 +272,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap querySupportLock = core.getLock(); executors = new QueryThread[THREADS]; -// queues = new ArrayList[THREADS]; -// threadLocks = new ReentrantLock[THREADS]; -// threadConditions = new Condition[THREADS]; threadStates = new ThreadState[THREADS]; -// ownTasks = new ArrayList[THREADS]; -// ownSyncTasks = new ArrayList[THREADS]; -// delayQueues = new ArrayList[THREADS * THREADS]; - - // freeSchedule = new AtomicInteger(0); - -// for (int i = 0; i < THREADS * THREADS; i++) { -// delayQueues[i] = new ArrayList(); -// } for (int i = 0; i < THREADS; i++) { - - // tasks[i] = new ArrayList(); -// ownTasks[i] = new ArrayList(); -// ownSyncTasks[i] = new ArrayList(); -// queues[i] = new ArrayList(); -// threadLocks[i] = new ReentrantLock(); -// threadConditions[i] = threadLocks[i].newCondition(); - // limits[i] = false; threadStates[i] = ThreadState.INIT; - } for (int i = 0; i < THREADS; i++) { @@ -1972,12 +1855,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Exception callerException = null; - public interface AsyncBarrier { - public void inc(); - public void dec(); - // public void inc(String debug); - // public void dec(String debug); - } + public interface AsyncBarrier { + public void inc(); + public void dec(); + public void waitBarrier(Object request, ReadGraphImpl impl); + public boolean isBlocking(); + } // final public QueryProcessor processor; // final public QuerySupport support;