]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
Multiple readers in db client
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
index ee4b11175e4d2767cee0058d3938d8cd47125997..eaaa9b3730976265c0673dc4547bbe35b3d4904c 100644 (file)
@@ -143,6 +143,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        QueryThread[]                                   executors;
        
        public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
+    
+    public LinkedList<SessionTask>                           topLevelTasks = new LinkedList<SessionTask>();
 
        enum ThreadState {
 
@@ -159,13 +161,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-    public SessionTask getSubTask(ReadGraphImpl impl) {
+    public SessionTask getSubTask(ReadGraphImpl parent) {
         synchronized(querySupportLock) {
             int index = 0;
             while(index < freeScheduling.size()) {
                 SessionTask task = freeScheduling.get(index);
-                if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) {
-                    queueLength.decrementAndGet();
+                if(task.isSubtask(parent) && task.maybeReady()) {
                     return freeScheduling.remove(index);
                 }
                 index++;
@@ -178,82 +179,46 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
      * We are running errands while waiting for requests to complete.
      * We can only run work that is part of the current root request to avoid any deadlocks
      */
-    public boolean performPending(ReadGraphImpl graph) {
-        SessionTask task = getSubTask(graph);
+    public boolean performPending(ReadGraphImpl under) {
+        SessionTask task = getSubTask(under);
                if(task != null) {
-                       task.run(QueryProcessor.thread.get());
+                       task.run(thread.get());
                        return true;
                }
                return false;
        }
-
+    
     final public void scheduleNow(SessionTask request) {
-        schedule(request, false);
+        SessionTask toExecute = scheduleOrReturnForExecution(request);
+        if(toExecute != null)
+            toExecute.run(thread.get());
     }
 
-    final public void scheduleLater(SessionTask request) {
-        schedule(request, true);
-    }
+    final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
 
-    AtomicInteger queueLength = new AtomicInteger(0);
-    
-    final public void schedule(SessionTask request, boolean late) {
-        
-        int queueLengthEstimate = queueLength.get();
-        if(!late && queueLengthEstimate > 80) {
-            request.run(thread.get());
-            return;
-        }
-        
-               assert(request != null);
-               
-               synchronized(querySupportLock) {
+        assert(request != null);
 
-                   if(BarrierTracing.BOOKKEEPING) {
-                       Exception current = new Exception();
-                       Exception previous = BarrierTracing.tasks.put(request, current);
-                       if(previous != null) {
-                           previous.printStackTrace();
-                           current.printStackTrace();
-                       }
-                   }
+        synchronized(querySupportLock) {
 
-                   if(late) {
-                       int pos = request.position - 1;
-                       if(pos < freeScheduling.size()) { 
-                           freeScheduling.add(pos, request);
-                           queueLength.incrementAndGet();
-                           requests.release();
-                       } else {
-                           freeScheduling.addLast(request);
-                    queueLength.incrementAndGet();
-                    requests.release();
-                       }
-                   }
-                   else {
-                       if(request.getLevel() < 4) {
-                           if(freeScheduling.size() < 100) {
-                               freeScheduling.addFirst(request);
-                           queueLength.incrementAndGet();
-                           requests.release();
-                           } else {
-                               request.run(thread.get());
-                           }
-                       } else {
-                           if(freeScheduling.size() < 20) {
-                               freeScheduling.addFirst(request);
-                           queueLength.incrementAndGet();
-                           requests.release();
-                           } else {
-                               request.run(thread.get());
-                           }
-                       }
-                   }
+            LinkedList<SessionTask> queue = request.rootGraph != null ? freeScheduling : topLevelTasks;
+            
+            if(BarrierTracing.BOOKKEEPING) {
+                Exception current = new Exception();
+                Exception previous = BarrierTracing.tasks.put(request, current);
+                if(previous != null) {
+                    previous.printStackTrace();
+                    current.printStackTrace();
+                }
+            }
 
+            queue.addFirst(request);
+            requests.release();
 
-               }
+        }
 
-       }
+        return null;
+
+    }
 
 
        final int THREADS;
@@ -263,20 +228,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        public static abstract class SessionTask {
 
-               public final ReadGraphImpl graph;
+           final protected ReadGraphImpl rootGraph;
                private int counter = 0;
                protected int position = 1;
                private Exception trace;
 
-               public SessionTask(ReadGraphImpl graph) {
-                       this.graph = graph;
-                       if(graph != null) graph.asyncBarrier.inc();
+               public SessionTask() {
+                   this(null);
                }
-
-        public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) {
-            if(r1 == null || r2 == null) return false;
-            return r1.getTopLevelGraph() == r2.getTopLevelGraph();
-        } 
+               
+        public SessionTask(ReadGraphImpl rootGraph) {
+            this.rootGraph = rootGraph;
+        }
+        
+        public boolean isSubtask(ReadGraphImpl graph) {
+            return graph.isParent(rootGraph);
+        }
 
         public abstract void run0(int thread);
 
@@ -292,7 +259,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                        trace = new Exception();
                    }
                    run0(thread);
-                   if(graph != null) graph.asyncBarrier.dec();
                }
                
                public boolean maybeReady() {
@@ -301,15 +267,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                @Override
                public String toString() {
-                       if(graph == null)
+                       if(rootGraph == null)
                                return "SessionTask[no graph]";
                        else
-                               return "SessionTask[" + graph.parent + "]";
-               }
-               
-               public int getLevel() {
-                   if(graph == null) return 0;
-                   else return graph.getLevel();
+                               return "SessionTask[" + rootGraph.parent + "]";
                }
 
        }
@@ -327,37 +288,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
-       long waitingTime = 0;
-
-       static int koss = 0;
-       static int koss2 = 0;
-
        public boolean resume(ReadGraphImpl graph) {
                return executors[0].runSynchronized();
        }
-       
-       //private WeakReference<GarbageTracker> garbageTracker;
-       
-       private class GarbageTracker    {
-               
-               @Override
-               protected void finalize() throws Throwable {
-                       
-//                     System.err.println("GarbageTracker");
-//                     
-//                     garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-                       
-                       super.finalize();
-                       
-               }
-               
-       }
 
        public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
                        throws DatabaseException {
 
-               //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-               
                THREADS = threads;
                THREAD_MASK = threads - 1;
 
@@ -368,31 +305,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                querySupportLock = core.getLock();
 
                executors = new QueryThread[THREADS];
-//             queues = new ArrayList[THREADS];
-//             threadLocks = new ReentrantLock[THREADS];
-//             threadConditions = new Condition[THREADS];
                threadStates = new ThreadState[THREADS];
-//             ownTasks = new ArrayList[THREADS];
-//             ownSyncTasks = new ArrayList[THREADS];
-//             delayQueues = new ArrayList[THREADS * THREADS];
-
-               //        freeSchedule = new AtomicInteger(0);
-
-//             for (int i = 0; i < THREADS * THREADS; i++) {
-//                     delayQueues[i] = new ArrayList<SessionTask>();
-//             }
 
                for (int i = 0; i < THREADS; i++) {
-
-                       //            tasks[i] = new ArrayList<Runnable>();
-//                     ownTasks[i] = new ArrayList<SessionTask>();
-//                     ownSyncTasks[i] = new ArrayList<SessionTask>();
-//                     queues[i] = new ArrayList<SessionTask>();
-//                     threadLocks[i] = new ReentrantLock();
-//                     threadConditions[i] = threadLocks[i].newCondition();
-                       //            limits[i] = false;
                        threadStates[i] = ThreadState.INIT;
-
                }
 
                for (int i = 0; i < THREADS; i++) {