]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
Fix livelock situation in QueryProcessor
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
index 7bfa0f4c6195659c21efdb0e6bcf99fed0dfb7bb..ee4b11175e4d2767cee0058d3938d8cd47125997 100644 (file)
@@ -136,11 +136,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        final public Session                                    session;
        final public ResourceSupport                            resourceSupport;
        
+       final public Semaphore                                  requests = new Semaphore(1);
+       
        final public QueryListening                            listening = new QueryListening(this);
 
        QueryThread[]                                   executors;
-
-//     public ArrayList<SessionTask>[]                           queues;
        
        public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
 
@@ -151,14 +151,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        }
 
        public ThreadState[]                                                                    threadStates;
-//     public ReentrantLock[]                                                                  threadLocks;
-//     public Condition[]                                                                          threadConditions;
-
-       //public ArrayList<SessionTask>[]                           ownTasks;
-
-       //public ArrayList<SessionTask>[]                           ownSyncTasks;
-
-       //ArrayList<SessionTask>[]                           delayQueues;
        
        final Object querySupportLock;
        
@@ -167,28 +159,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-       public SessionTask getOwnTask(ReadGraphImpl impl) {
-               Set<ReadGraphImpl> ancestors = impl.ancestorSet();
-               synchronized(querySupportLock) {
-                       int index = 0;
-                       while(index < freeScheduling.size()) {
-                               SessionTask task = freeScheduling.get(index);
-                               if(task.hasCommonParent(ancestors)) {
-                                       return freeScheduling.remove(index);
-                               }
-                               index++;
-                       }
-               }
-               return null;
-       }
-
     public SessionTask getSubTask(ReadGraphImpl impl) {
-        Set<ReadGraphImpl> onlyThis = Collections.singleton(impl);
         synchronized(querySupportLock) {
             int index = 0;
             while(index < freeScheduling.size()) {
                 SessionTask task = freeScheduling.get(index);
-                if(task.hasCommonParent(onlyThis)) {
+                if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) {
+                    queueLength.decrementAndGet();
                     return freeScheduling.remove(index);
                 }
                 index++;
@@ -197,68 +174,84 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
         return null;
     }
 
-       public boolean performPending(ReadGraphImpl graph) {
-               SessionTask task = getOwnTask(graph);
+    /*
+     * We are running errands while waiting for requests to complete.
+     * We can only run work that is part of the current root request to avoid any deadlocks
+     */
+    public boolean performPending(ReadGraphImpl graph) {
+        SessionTask task = getSubTask(graph);
                if(task != null) {
                        task.run(QueryProcessor.thread.get());
                        return true;
-               } else {
-                       return false;
                }
+               return false;
        }
 
-//     final public void scheduleOwn(int caller, SessionTask request) {
-//             ownTasks[caller].add(request);
-//     }
-
-       final public void schedule(SessionTask request) {
-           
-               //int performer = request.thread;
-
-//             if(DebugPolicy.SCHEDULE)
-//                     System.out.println("schedule " + request + " " + " -> " + performer);
+    final public void scheduleNow(SessionTask request) {
+        schedule(request, false);
+    }
 
-               //assert(performer >= 0);
+    final public void scheduleLater(SessionTask request) {
+        schedule(request, true);
+    }
 
+    AtomicInteger queueLength = new AtomicInteger(0);
+    
+    final public void schedule(SessionTask request, boolean late) {
+        
+        int queueLengthEstimate = queueLength.get();
+        if(!late && queueLengthEstimate > 80) {
+            request.run(thread.get());
+            return;
+        }
+        
                assert(request != null);
+               
+               synchronized(querySupportLock) {
 
-//             if(caller == performer) {
-//                     request.run(caller);
-//             } else {
-                       
-//                     if(performer == THREADS) {
-                               
-                               synchronized(querySupportLock) {
-
-                                       if(BarrierTracing.BOOKKEEPING) {
-                                               Exception current = new Exception();
-                                               Exception previous = BarrierTracing.tasks.put(request, current);
-                                               if(previous != null) {
-                                                       previous.printStackTrace();
-                                                       current.printStackTrace();
-                                               }
-                                       }
-                                   
-                                       freeScheduling.add(request);
-                                       
-                                       querySupportLock.notifyAll();
+                   if(BarrierTracing.BOOKKEEPING) {
+                       Exception current = new Exception();
+                       Exception previous = BarrierTracing.tasks.put(request, current);
+                       if(previous != null) {
+                           previous.printStackTrace();
+                           current.printStackTrace();
+                       }
+                   }
 
-                               }
+                   if(late) {
+                       int pos = request.position - 1;
+                       if(pos < freeScheduling.size()) { 
+                           freeScheduling.add(pos, request);
+                           queueLength.incrementAndGet();
+                           requests.release();
+                       } else {
+                           freeScheduling.addLast(request);
+                    queueLength.incrementAndGet();
+                    requests.release();
+                       }
+                   }
+                   else {
+                       if(request.getLevel() < 4) {
+                           if(freeScheduling.size() < 100) {
+                               freeScheduling.addFirst(request);
+                           queueLength.incrementAndGet();
+                           requests.release();
+                           } else {
+                               request.run(thread.get());
+                           }
+                       } else {
+                           if(freeScheduling.size() < 20) {
+                               freeScheduling.addFirst(request);
+                           queueLength.incrementAndGet();
+                           requests.release();
+                           } else {
+                               request.run(thread.get());
+                           }
+                       }
+                   }
 
-                               return;
-                               
-//                     }
-//                     
-//                     ReentrantLock queueLock = threadLocks[performer];
-//                     queueLock.lock();
-//                     queues[performer].add(request);
-//                     // This thread could have been sleeping
-//                     if(queues[performer].size() == 1) {
-//                             //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
-//                             threadConditions[performer].signalAll();
-//                     }
-//                     queueLock.unlock();
-//             }
+
+               }
 
        }
 
@@ -271,8 +264,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public static abstract class SessionTask {
 
                public final ReadGraphImpl graph;
-               private Set<ReadGraphImpl> ancestors;
                private int counter = 0;
+               protected int position = 1;
                private Exception trace;
 
                public SessionTask(ReadGraphImpl graph) {
@@ -280,13 +273,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                        if(graph != null) graph.asyncBarrier.inc();
                }
 
-               public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
-                       if(graph == null) return false;
-                       if(ancestors == null) ancestors = graph.ancestorSet();
-                       return !Collections.disjoint(ancestors, otherAncestors);
-               }
+        public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) {
+            if(r1 == null || r2 == null) return false;
+            return r1.getTopLevelGraph() == r2.getTopLevelGraph();
+        } 
 
-               public abstract void run0(int thread);
+        public abstract void run0(int thread);
 
                public final void run(int thread) {
                    if(counter++ > 0) {
@@ -302,10 +294,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                    run0(thread);
                    if(graph != null) graph.asyncBarrier.dec();
                }
+               
+               public boolean maybeReady() {
+                       return true;
+               }
 
                @Override
                public String toString() {
-                       return "SessionTask[" + graph.parent + "]";
+                       if(graph == null)
+                               return "SessionTask[no graph]";
+                       else
+                               return "SessionTask[" + graph.parent + "]";
+               }
+               
+               public int getLevel() {
+                   if(graph == null) return 0;
+                   else return graph.getLevel();
                }
 
        }
@@ -1793,6 +1797,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        @Override
        protected void doDispose() {
 
+               requests.release(Integer.MAX_VALUE / 2);
+               
                for(int index = 0; index < THREADS; index++) { 
                        executors[index].dispose();
                }