]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
DB request scheduling scheme fails with district diagrams
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
index ee4b11175e4d2767cee0058d3938d8cd47125997..a46bc0e8dd4e1b024dea3b0dd8d7269b10a19192 100644 (file)
@@ -141,8 +141,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        final public QueryListening                            listening = new QueryListening(this);
 
        QueryThread[]                                   executors;
-       
-       public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
 
        enum ThreadState {
 
@@ -150,6 +148,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
+       final Scheduling scheduling;
+       
        public ThreadState[]                                                                    threadStates;
        
        final Object querySupportLock;
@@ -159,101 +159,31 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-    public SessionTask getSubTask(ReadGraphImpl impl) {
-        synchronized(querySupportLock) {
-            int index = 0;
-            while(index < freeScheduling.size()) {
-                SessionTask task = freeScheduling.get(index);
-                if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) {
-                    queueLength.decrementAndGet();
-                    return freeScheduling.remove(index);
-                }
-                index++;
-            }
-        }
-        return null;
-    }
 
     /*
      * We are running errands while waiting for requests to complete.
      * We can only run work that is part of the current root request to avoid any deadlocks
      */
-    public boolean performPending(ReadGraphImpl graph) {
-        SessionTask task = getSubTask(graph);
+    public boolean performPending(ReadGraphImpl under) {
+        SessionTask task = scheduling.getSubTask(under);
                if(task != null) {
-                       task.run(QueryProcessor.thread.get());
+                       task.run(thread.get());
                        return true;
                }
                return false;
        }
-
+    
     final public void scheduleNow(SessionTask request) {
-        schedule(request, false);
+        SessionTask toExecute = scheduleOrReturnForExecution(request);
+        if(toExecute != null)
+            toExecute.run(thread.get());
     }
 
-    final public void scheduleLater(SessionTask request) {
-        schedule(request, true);
-    }
-
-    AtomicInteger queueLength = new AtomicInteger(0);
-    
-    final public void schedule(SessionTask request, boolean late) {
-        
-        int queueLengthEstimate = queueLength.get();
-        if(!late && queueLengthEstimate > 80) {
-            request.run(thread.get());
-            return;
-        }
+    final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
         
-               assert(request != null);
-               
-               synchronized(querySupportLock) {
+        return scheduling.scheduleOrReturnForExecution(request);
 
-                   if(BarrierTracing.BOOKKEEPING) {
-                       Exception current = new Exception();
-                       Exception previous = BarrierTracing.tasks.put(request, current);
-                       if(previous != null) {
-                           previous.printStackTrace();
-                           current.printStackTrace();
-                       }
-                   }
-
-                   if(late) {
-                       int pos = request.position - 1;
-                       if(pos < freeScheduling.size()) { 
-                           freeScheduling.add(pos, request);
-                           queueLength.incrementAndGet();
-                           requests.release();
-                       } else {
-                           freeScheduling.addLast(request);
-                    queueLength.incrementAndGet();
-                    requests.release();
-                       }
-                   }
-                   else {
-                       if(request.getLevel() < 4) {
-                           if(freeScheduling.size() < 100) {
-                               freeScheduling.addFirst(request);
-                           queueLength.incrementAndGet();
-                           requests.release();
-                           } else {
-                               request.run(thread.get());
-                           }
-                       } else {
-                           if(freeScheduling.size() < 20) {
-                               freeScheduling.addFirst(request);
-                           queueLength.incrementAndGet();
-                           requests.release();
-                           } else {
-                               request.run(thread.get());
-                           }
-                       }
-                   }
-
-
-               }
-
-       }
+    }
 
 
        final int THREADS;
@@ -263,20 +193,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        public static abstract class SessionTask {
 
-               public final ReadGraphImpl graph;
+           final protected ReadGraphImpl rootGraph;
                private int counter = 0;
                protected int position = 1;
                private Exception trace;
 
-               public SessionTask(ReadGraphImpl graph) {
-                       this.graph = graph;
-                       if(graph != null) graph.asyncBarrier.inc();
+               public SessionTask() {
+                   this(null);
                }
-
-        public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) {
-            if(r1 == null || r2 == null) return false;
-            return r1.getTopLevelGraph() == r2.getTopLevelGraph();
-        } 
+               
+        public SessionTask(ReadGraphImpl rootGraph) {
+            this.rootGraph = rootGraph;
+        }
+        
+        public boolean isSubtask(ReadGraphImpl graph) {
+            return graph.isParent(rootGraph);
+        }
 
         public abstract void run0(int thread);
 
@@ -292,7 +224,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                        trace = new Exception();
                    }
                    run0(thread);
-                   if(graph != null) graph.asyncBarrier.dec();
                }
                
                public boolean maybeReady() {
@@ -301,15 +232,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                @Override
                public String toString() {
-                       if(graph == null)
+                       if(rootGraph == null)
                                return "SessionTask[no graph]";
                        else
-                               return "SessionTask[" + graph.parent + "]";
-               }
-               
-               public int getLevel() {
-                   if(graph == null) return 0;
-                   else return graph.getLevel();
+                               return "SessionTask[" + rootGraph.parent + "]";
                }
 
        }
@@ -327,40 +253,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
-       long waitingTime = 0;
-
-       static int koss = 0;
-       static int koss2 = 0;
-
        public boolean resume(ReadGraphImpl graph) {
                return executors[0].runSynchronized();
        }
-       
-       //private WeakReference<GarbageTracker> garbageTracker;
-       
-       private class GarbageTracker    {
-               
-               @Override
-               protected void finalize() throws Throwable {
-                       
-//                     System.err.println("GarbageTracker");
-//                     
-//                     garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-                       
-                       super.finalize();
-                       
-               }
-               
-       }
 
        public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
                        throws DatabaseException {
 
-               //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-               
                THREADS = threads;
                THREAD_MASK = threads - 1;
 
+               scheduling = new Scheduling(requests);
+               
                querySupport = core;
                cache = new QueryCache(core, threads);
                session = querySupport.getSession();
@@ -368,31 +272,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                querySupportLock = core.getLock();
 
                executors = new QueryThread[THREADS];
-//             queues = new ArrayList[THREADS];
-//             threadLocks = new ReentrantLock[THREADS];
-//             threadConditions = new Condition[THREADS];
                threadStates = new ThreadState[THREADS];
-//             ownTasks = new ArrayList[THREADS];
-//             ownSyncTasks = new ArrayList[THREADS];
-//             delayQueues = new ArrayList[THREADS * THREADS];
-
-               //        freeSchedule = new AtomicInteger(0);
-
-//             for (int i = 0; i < THREADS * THREADS; i++) {
-//                     delayQueues[i] = new ArrayList<SessionTask>();
-//             }
 
                for (int i = 0; i < THREADS; i++) {
-
-                       //            tasks[i] = new ArrayList<Runnable>();
-//                     ownTasks[i] = new ArrayList<SessionTask>();
-//                     ownSyncTasks[i] = new ArrayList<SessionTask>();
-//                     queues[i] = new ArrayList<SessionTask>();
-//                     threadLocks[i] = new ReentrantLock();
-//                     threadConditions[i] = threadLocks[i].newCondition();
-                       //            limits[i] = false;
                        threadStates[i] = ThreadState.INIT;
-
                }
 
                for (int i = 0; i < THREADS; i++) {
@@ -1972,12 +1855,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        Exception callerException = null;
 
-       public interface AsyncBarrier {
-               public void inc(); 
-               public void dec();
-               //        public void inc(String debug); 
-               //        public void dec(String debug);
-       }
+    public interface AsyncBarrier {
+        public void inc(); 
+        public void dec();
+        public void waitBarrier(Object request, ReadGraphImpl impl);
+        public boolean isBlocking();
+    }
 
 //     final public QueryProcessor processor;
 //     final public QuerySupport support;