]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
Still working for multiple readers
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
index f1e9233eb685b7fe87cb47ca4c00704503edec9a..2908bd43a83680fc7f24e3fe2faafb1b25bc1b53 100644 (file)
@@ -151,7 +151,9 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        QueryThread[]                                   executors;
 
-       public ArrayList<SessionTask>[]                           queues;
+//     public ArrayList<SessionTask>[]                           queues;
+       
+       public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
 
        enum ThreadState {
 
@@ -163,11 +165,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public ReentrantLock[]                                                                  threadLocks;
        public Condition[]                                                                          threadConditions;
 
-       public ArrayList<SessionTask>[]                           ownTasks;
+       //public ArrayList<SessionTask>[]                           ownTasks;
 
-       public ArrayList<SessionTask>[]                           ownSyncTasks;
+       //public ArrayList<SessionTask>[]                           ownSyncTasks;
 
-       ArrayList<SessionTask>[]                           delayQueues;
+       //ArrayList<SessionTask>[]                           delayQueues;
        
        final Object querySupportLock;
        
@@ -176,19 +178,21 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-       final public void scheduleOwn(int caller, SessionTask request) {
-               ownTasks[caller].add(request);
-       }
+//     final public void scheduleOwn(int caller, SessionTask request) {
+//             ownTasks[caller].add(request);
+//     }
 
        final public void scheduleAlways(int caller, SessionTask request) {
 
-               int performer = request.thread;
-               if(caller == performer) {
-                       ownTasks[caller].add(request);
-               } else {
-                       schedule(caller, request);
-               }
+//             int performer = request.thread;
+//             if(caller == performer) {
+//                     ownTasks[caller].add(request);
+//             } else {
+//                     schedule(caller, request);
+//             }
 
+               schedule(caller, request);
+               
        }
 
        final public void schedule(int caller, SessionTask request) {
@@ -202,19 +206,43 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                assert(request != null);
 
-               if(caller == performer) {
-                       request.run(caller);
-               } else {
-                       ReentrantLock queueLock = threadLocks[performer];
-                       queueLock.lock();
-                       queues[performer].add(request);
-                       // This thread could have been sleeping
-                       if(queues[performer].size() == 1) {
-                               if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
-                               threadConditions[performer].signalAll();
-                       }
-                       queueLock.unlock();
-               }
+//             if(caller == performer) {
+//                     request.run(caller);
+//             } else {
+                       
+//                     if(performer == THREADS) {
+                               
+                               synchronized(querySupportLock) {
+                                       
+                                       freeScheduling.add(request);
+
+                                       //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
+
+                                       for(int i=0;i<THREADS;i++) {
+                                               ReentrantLock queueLock = threadLocks[i];
+                                               queueLock.lock();
+                                               //queues[performer].add(request);
+                                               //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
+                                               threadConditions[i].signalAll();
+                                               queueLock.unlock();
+                                       }
+
+                               }
+
+                               return;
+                               
+//                     }
+//                     
+//                     ReentrantLock queueLock = threadLocks[performer];
+//                     queueLock.lock();
+//                     queues[performer].add(request);
+//                     // This thread could have been sleeping
+//                     if(queues[performer].size() == 1) {
+//                             //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
+//                             threadConditions[performer].signalAll();
+//                     }
+//                     queueLock.unlock();
+//             }
 
        }
 
@@ -311,26 +339,26 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                querySupportLock = core.getLock();
 
                executors = new QueryThread[THREADS];
-               queues = new ArrayList[THREADS];
+//             queues = new ArrayList[THREADS];
                threadLocks = new ReentrantLock[THREADS];
                threadConditions = new Condition[THREADS];
                threadStates = new ThreadState[THREADS];
-               ownTasks = new ArrayList[THREADS];
-               ownSyncTasks = new ArrayList[THREADS];
-               delayQueues = new ArrayList[THREADS * THREADS];
+//             ownTasks = new ArrayList[THREADS];
+//             ownSyncTasks = new ArrayList[THREADS];
+//             delayQueues = new ArrayList[THREADS * THREADS];
 
                //        freeSchedule = new AtomicInteger(0);
 
-               for (int i = 0; i < THREADS * THREADS; i++) {
-                       delayQueues[i] = new ArrayList<SessionTask>();
-               }
+//             for (int i = 0; i < THREADS * THREADS; i++) {
+//                     delayQueues[i] = new ArrayList<SessionTask>();
+//             }
 
                for (int i = 0; i < THREADS; i++) {
 
                        //            tasks[i] = new ArrayList<Runnable>();
-                       ownTasks[i] = new ArrayList<SessionTask>();
-                       ownSyncTasks[i] = new ArrayList<SessionTask>();
-                       queues[i] = new ArrayList<SessionTask>();
+//                     ownTasks[i] = new ArrayList<SessionTask>();
+//                     ownSyncTasks[i] = new ArrayList<SessionTask>();
+//                     queues[i] = new ArrayList<SessionTask>();
                        threadLocks[i] = new ReentrantLock();
                        threadConditions[i] = threadLocks[i].newCondition();
                        //            limits[i] = false;