X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=6d9560940be2b1c19b73616c85a2996ee2b65d25;hp=7bfa0f4c6195659c21efdb0e6bcf99fed0dfb7bb;hb=73884c78cf2695329aa3b6559c4c6a196792ac9b;hpb=0db1af75cd14f99f16c42c920562708abf37be93 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 7bfa0f4c6..6d9560940 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -136,29 +136,21 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final public Session session; final public ResourceSupport resourceSupport; + final public Semaphore requests = new Semaphore(1); + final public QueryListening listening = new QueryListening(this); QueryThread[] executors; -// public ArrayList[] queues; - - public LinkedList freeScheduling = new LinkedList(); - enum ThreadState { INIT, RUN, SLEEP, DISPOSED } + final Scheduling scheduling; + public ThreadState[] threadStates; -// public ReentrantLock[] threadLocks; -// public Condition[] threadConditions; - - //public ArrayList[] ownTasks; - - //public ArrayList[] ownSyncTasks; - - //ArrayList[] delayQueues; final Object querySupportLock; @@ -167,100 +159,31 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getOwnTask(ReadGraphImpl impl) { - Set ancestors = impl.ancestorSet(); - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(ancestors)) { - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } - - public SessionTask getSubTask(ReadGraphImpl impl) { - Set onlyThis = Collections.singleton(impl); - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(onlyThis)) { - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } - public boolean performPending(ReadGraphImpl graph) { - SessionTask task = getOwnTask(graph); + /* + * We are running errands while waiting for requests to complete. + * We can only run work that is part of the current root request to avoid any deadlocks + */ + public boolean performPending(ReadGraphImpl under) { + SessionTask task = scheduling.getSubTask(under); if(task != null) { - task.run(QueryProcessor.thread.get()); + task.run(thread.get()); return true; - } else { - return false; } + return false; } + + final public void scheduleNow(SessionTask request) { + SessionTask toExecute = scheduleOrReturnForExecution(request); + if(toExecute != null) + toExecute.run(thread.get()); + } -// final public void scheduleOwn(int caller, SessionTask request) { -// ownTasks[caller].add(request); -// } - - final public void schedule(SessionTask request) { - - //int performer = request.thread; - -// if(DebugPolicy.SCHEDULE) -// System.out.println("schedule " + request + " " + " -> " + performer); - - //assert(performer >= 0); - - assert(request != null); - -// if(caller == performer) { -// request.run(caller); -// } else { - -// if(performer == THREADS) { - - synchronized(querySupportLock) { - - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } - - freeScheduling.add(request); - - querySupportLock.notifyAll(); - - } - - return; - -// } -// -// ReentrantLock queueLock = threadLocks[performer]; -// queueLock.lock(); -// queues[performer].add(request); -// // This thread could have been sleeping -// if(queues[performer].size() == 1) { -// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet(); -// threadConditions[performer].signalAll(); -// } -// queueLock.unlock(); -// } + final public SessionTask scheduleOrReturnForExecution(SessionTask request) { + + return scheduling.scheduleOrReturnForExecution(request); - } + } final int THREADS; @@ -270,23 +193,24 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public static abstract class SessionTask { - public final ReadGraphImpl graph; - private Set ancestors; + final protected ReadGraphImpl rootGraph; private int counter = 0; + protected int position = 1; private Exception trace; - public SessionTask(ReadGraphImpl graph) { - this.graph = graph; - if(graph != null) graph.asyncBarrier.inc(); - } - - public boolean hasCommonParent(Set otherAncestors) { - if(graph == null) return false; - if(ancestors == null) ancestors = graph.ancestorSet(); - return !Collections.disjoint(ancestors, otherAncestors); + public SessionTask() { + this(null); } + + public SessionTask(ReadGraphImpl rootGraph) { + this.rootGraph = rootGraph; + } + + public boolean isSubtask(ReadGraphImpl graph) { + return graph.isParent(rootGraph); + } - public abstract void run0(int thread); + public abstract void run0(int thread); public final void run(int thread) { if(counter++ > 0) { @@ -300,12 +224,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap trace = new Exception(); } run0(thread); - if(graph != null) graph.asyncBarrier.dec(); + } + + public boolean maybeReady() { + return true; } @Override public String toString() { - return "SessionTask[" + graph.parent + "]"; + if(rootGraph == null) + return "SessionTask[no graph]"; + else + return "SessionTask[" + rootGraph.parent + "]"; } } @@ -323,40 +253,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - long waitingTime = 0; - - static int koss = 0; - static int koss2 = 0; - public boolean resume(ReadGraphImpl graph) { return executors[0].runSynchronized(); } - - //private WeakReference garbageTracker; - - private class GarbageTracker { - - @Override - protected void finalize() throws Throwable { - -// System.err.println("GarbageTracker"); -// -// garbageTracker = new WeakReference(new GarbageTracker()); - - super.finalize(); - - } - - } public QueryProcessor(final int threads, QuerySupport core, Set threadSet) throws DatabaseException { - //garbageTracker = new WeakReference(new GarbageTracker()); - THREADS = threads; THREAD_MASK = threads - 1; + scheduling = new Scheduling(requests); + querySupport = core; cache = new QueryCache(core, threads); session = querySupport.getSession(); @@ -364,31 +272,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap querySupportLock = core.getLock(); executors = new QueryThread[THREADS]; -// queues = new ArrayList[THREADS]; -// threadLocks = new ReentrantLock[THREADS]; -// threadConditions = new Condition[THREADS]; threadStates = new ThreadState[THREADS]; -// ownTasks = new ArrayList[THREADS]; -// ownSyncTasks = new ArrayList[THREADS]; -// delayQueues = new ArrayList[THREADS * THREADS]; - - // freeSchedule = new AtomicInteger(0); - -// for (int i = 0; i < THREADS * THREADS; i++) { -// delayQueues[i] = new ArrayList(); -// } for (int i = 0; i < THREADS; i++) { - - // tasks[i] = new ArrayList(); -// ownTasks[i] = new ArrayList(); -// ownSyncTasks[i] = new ArrayList(); -// queues[i] = new ArrayList(); -// threadLocks[i] = new ReentrantLock(); -// threadConditions[i] = threadLocks[i].newCondition(); - // limits[i] = false; threadStates[i] = ThreadState.INIT; - } for (int i = 0; i < THREADS; i++) { @@ -1392,8 +1279,9 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap entry.prepareRecompute(querySupport); ReadGraphImpl parentGraph = graph.forRecompute(entry); - + parentGraph.asyncBarrier.inc(); query.recompute(parentGraph); + parentGraph.asyncBarrier.dec(); if(entry.isExcepted()) return ListenerEntry.NO_VALUE; @@ -1793,6 +1681,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override protected void doDispose() { + requests.release(Integer.MAX_VALUE / 2); + for(int index = 0; index < THREADS; index++) { executors[index].dispose(); } @@ -1966,12 +1856,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Exception callerException = null; - public interface AsyncBarrier { - public void inc(); - public void dec(); - // public void inc(String debug); - // public void dec(String debug); - } + public interface AsyncBarrier { + public void inc(); + public void dec(); + public void waitBarrier(Object request, ReadGraphImpl impl); + public boolean isBlocking(); + } // final public QueryProcessor processor; // final public QuerySupport support; @@ -4164,6 +4054,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap return L0; } + public Layer0 getL0() { + return L0; + } + public static ThreadLocal thread = new ThreadLocal() { protected Integer initialValue() { return -1;