]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
DB request scheduling scheme fails with district diagrams
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
index 7bfa0f4c6195659c21efdb0e6bcf99fed0dfb7bb..a46bc0e8dd4e1b024dea3b0dd8d7269b10a19192 100644 (file)
@@ -136,29 +136,21 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        final public Session                                    session;
        final public ResourceSupport                            resourceSupport;
        
+       final public Semaphore                                  requests = new Semaphore(1);
+       
        final public QueryListening                            listening = new QueryListening(this);
 
        QueryThread[]                                   executors;
 
-//     public ArrayList<SessionTask>[]                           queues;
-       
-       public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
-
        enum ThreadState {
 
                INIT, RUN, SLEEP, DISPOSED
 
        }
 
+       final Scheduling scheduling;
+       
        public ThreadState[]                                                                    threadStates;
-//     public ReentrantLock[]                                                                  threadLocks;
-//     public Condition[]                                                                          threadConditions;
-
-       //public ArrayList<SessionTask>[]                           ownTasks;
-
-       //public ArrayList<SessionTask>[]                           ownSyncTasks;
-
-       //ArrayList<SessionTask>[]                           delayQueues;
        
        final Object querySupportLock;
        
@@ -167,100 +159,31 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-       public SessionTask getOwnTask(ReadGraphImpl impl) {
-               Set<ReadGraphImpl> ancestors = impl.ancestorSet();
-               synchronized(querySupportLock) {
-                       int index = 0;
-                       while(index < freeScheduling.size()) {
-                               SessionTask task = freeScheduling.get(index);
-                               if(task.hasCommonParent(ancestors)) {
-                                       return freeScheduling.remove(index);
-                               }
-                               index++;
-                       }
-               }
-               return null;
-       }
-
-    public SessionTask getSubTask(ReadGraphImpl impl) {
-        Set<ReadGraphImpl> onlyThis = Collections.singleton(impl);
-        synchronized(querySupportLock) {
-            int index = 0;
-            while(index < freeScheduling.size()) {
-                SessionTask task = freeScheduling.get(index);
-                if(task.hasCommonParent(onlyThis)) {
-                    return freeScheduling.remove(index);
-                }
-                index++;
-            }
-        }
-        return null;
-    }
 
-       public boolean performPending(ReadGraphImpl graph) {
-               SessionTask task = getOwnTask(graph);
+    /*
+     * We are running errands while waiting for requests to complete.
+     * We can only run work that is part of the current root request to avoid any deadlocks
+     */
+    public boolean performPending(ReadGraphImpl under) {
+        SessionTask task = scheduling.getSubTask(under);
                if(task != null) {
-                       task.run(QueryProcessor.thread.get());
+                       task.run(thread.get());
                        return true;
-               } else {
-                       return false;
                }
+               return false;
        }
+    
+    final public void scheduleNow(SessionTask request) {
+        SessionTask toExecute = scheduleOrReturnForExecution(request);
+        if(toExecute != null)
+            toExecute.run(thread.get());
+    }
 
-//     final public void scheduleOwn(int caller, SessionTask request) {
-//             ownTasks[caller].add(request);
-//     }
-
-       final public void schedule(SessionTask request) {
-           
-               //int performer = request.thread;
-
-//             if(DebugPolicy.SCHEDULE)
-//                     System.out.println("schedule " + request + " " + " -> " + performer);
-
-               //assert(performer >= 0);
-
-               assert(request != null);
-
-//             if(caller == performer) {
-//                     request.run(caller);
-//             } else {
-                       
-//                     if(performer == THREADS) {
-                               
-                               synchronized(querySupportLock) {
-
-                                       if(BarrierTracing.BOOKKEEPING) {
-                                               Exception current = new Exception();
-                                               Exception previous = BarrierTracing.tasks.put(request, current);
-                                               if(previous != null) {
-                                                       previous.printStackTrace();
-                                                       current.printStackTrace();
-                                               }
-                                       }
-                                   
-                                       freeScheduling.add(request);
-                                       
-                                       querySupportLock.notifyAll();
-
-                               }
-
-                               return;
-                               
-//                     }
-//                     
-//                     ReentrantLock queueLock = threadLocks[performer];
-//                     queueLock.lock();
-//                     queues[performer].add(request);
-//                     // This thread could have been sleeping
-//                     if(queues[performer].size() == 1) {
-//                             //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
-//                             threadConditions[performer].signalAll();
-//                     }
-//                     queueLock.unlock();
-//             }
+    final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
+        
+        return scheduling.scheduleOrReturnForExecution(request);
 
-       }
+    }
 
 
        final int THREADS;
@@ -270,23 +193,24 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        public static abstract class SessionTask {
 
-               public final ReadGraphImpl graph;
-               private Set<ReadGraphImpl> ancestors;
+           final protected ReadGraphImpl rootGraph;
                private int counter = 0;
+               protected int position = 1;
                private Exception trace;
 
-               public SessionTask(ReadGraphImpl graph) {
-                       this.graph = graph;
-                       if(graph != null) graph.asyncBarrier.inc();
-               }
-
-               public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
-                       if(graph == null) return false;
-                       if(ancestors == null) ancestors = graph.ancestorSet();
-                       return !Collections.disjoint(ancestors, otherAncestors);
+               public SessionTask() {
+                   this(null);
                }
+               
+        public SessionTask(ReadGraphImpl rootGraph) {
+            this.rootGraph = rootGraph;
+        }
+        
+        public boolean isSubtask(ReadGraphImpl graph) {
+            return graph.isParent(rootGraph);
+        }
 
-               public abstract void run0(int thread);
+        public abstract void run0(int thread);
 
                public final void run(int thread) {
                    if(counter++ > 0) {
@@ -300,12 +224,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                        trace = new Exception();
                    }
                    run0(thread);
-                   if(graph != null) graph.asyncBarrier.dec();
+               }
+               
+               public boolean maybeReady() {
+                       return true;
                }
 
                @Override
                public String toString() {
-                       return "SessionTask[" + graph.parent + "]";
+                       if(rootGraph == null)
+                               return "SessionTask[no graph]";
+                       else
+                               return "SessionTask[" + rootGraph.parent + "]";
                }
 
        }
@@ -323,40 +253,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
-       long waitingTime = 0;
-
-       static int koss = 0;
-       static int koss2 = 0;
-
        public boolean resume(ReadGraphImpl graph) {
                return executors[0].runSynchronized();
        }
-       
-       //private WeakReference<GarbageTracker> garbageTracker;
-       
-       private class GarbageTracker    {
-               
-               @Override
-               protected void finalize() throws Throwable {
-                       
-//                     System.err.println("GarbageTracker");
-//                     
-//                     garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-                       
-                       super.finalize();
-                       
-               }
-               
-       }
 
        public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
                        throws DatabaseException {
 
-               //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-               
                THREADS = threads;
                THREAD_MASK = threads - 1;
 
+               scheduling = new Scheduling(requests);
+               
                querySupport = core;
                cache = new QueryCache(core, threads);
                session = querySupport.getSession();
@@ -364,31 +272,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                querySupportLock = core.getLock();
 
                executors = new QueryThread[THREADS];
-//             queues = new ArrayList[THREADS];
-//             threadLocks = new ReentrantLock[THREADS];
-//             threadConditions = new Condition[THREADS];
                threadStates = new ThreadState[THREADS];
-//             ownTasks = new ArrayList[THREADS];
-//             ownSyncTasks = new ArrayList[THREADS];
-//             delayQueues = new ArrayList[THREADS * THREADS];
-
-               //        freeSchedule = new AtomicInteger(0);
-
-//             for (int i = 0; i < THREADS * THREADS; i++) {
-//                     delayQueues[i] = new ArrayList<SessionTask>();
-//             }
 
                for (int i = 0; i < THREADS; i++) {
-
-                       //            tasks[i] = new ArrayList<Runnable>();
-//                     ownTasks[i] = new ArrayList<SessionTask>();
-//                     ownSyncTasks[i] = new ArrayList<SessionTask>();
-//                     queues[i] = new ArrayList<SessionTask>();
-//                     threadLocks[i] = new ReentrantLock();
-//                     threadConditions[i] = threadLocks[i].newCondition();
-                       //            limits[i] = false;
                        threadStates[i] = ThreadState.INIT;
-
                }
 
                for (int i = 0; i < THREADS; i++) {
@@ -1793,6 +1680,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        @Override
        protected void doDispose() {
 
+               requests.release(Integer.MAX_VALUE / 2);
+               
                for(int index = 0; index < THREADS; index++) { 
                        executors[index].dispose();
                }
@@ -1966,12 +1855,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        Exception callerException = null;
 
-       public interface AsyncBarrier {
-               public void inc(); 
-               public void dec();
-               //        public void inc(String debug); 
-               //        public void dec(String debug);
-       }
+    public interface AsyncBarrier {
+        public void inc(); 
+        public void dec();
+        public void waitBarrier(Object request, ReadGraphImpl impl);
+        public boolean isBlocking();
+    }
 
 //     final public QueryProcessor processor;
 //     final public QuerySupport support;