]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
Multiple reader thread support for db client
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryThread.java
index a5e79244e10e16e532ad843e56a75cb64f2b763c..ab10efa0baab252a1ff5ef8b8485c313b2b32db1 100644 (file)
@@ -3,17 +3,17 @@ package org.simantics.db.impl.query;
 import java.util.ArrayList;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.simantics.db.Session;
 import org.simantics.db.common.SessionThread;
-import org.simantics.db.common.utils.Logger;
 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
+import org.slf4j.LoggerFactory;
 
 class QueryThread extends Thread implements SessionThread {
 
+       private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class);
+
        boolean disposed = false;
        private Semaphore exited = new Semaphore(0);
 
@@ -21,42 +21,44 @@ class QueryThread extends Thread implements SessionThread {
 
        private Session session;
        private QuerySupport querySupport;
+       private final QueryProcessor processor;
 
        final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
-       final private ArrayList<SessionTask> own;
-       final private ArrayList<SessionTask> ownSync;
-       final private ArrayList<SessionTask> queue;
-       final private ReentrantLock lock;
-       final private Condition condition;
+//     final private ArrayList<SessionTask> own;
+//     final private ArrayList<SessionTask> ownSync;
+//     final private ArrayList<SessionTask> queue;
+//     final private ReentrantLock lock;
+//     final private Condition condition;
        final private Object querySupportLock;
        final private int THREADS;
        final private AtomicInteger sleepers;
        final private ThreadState[] threadStates;
-       final private ArrayList<SessionTask>[] delayQueues;
-       final private QueryThread[] executors;
-       final private ReentrantLock[] threadLocks;
-       final private ArrayList<SessionTask>[] queues;
-       final private ArrayList<SessionTask>[] ownSyncTasks;
+//     final private ArrayList<SessionTask>[] delayQueues;
+//     final private QueryThread[] executors;
+//     final private ReentrantLock[] threadLocks;
+//     final private ArrayList<SessionTask>[] queues;
+//     final private ArrayList<SessionTask>[] ownSyncTasks;
 
        public QueryThread(Session session, QueryProcessor processor, int index, String name) {
                super(QueryProcessor.QueryThreadGroup, null, name);
                this.session = session;
+               this.processor = processor;
                this.index = index;
-               own = processor.ownTasks[index];
-               ownSync = processor.ownSyncTasks[index];
-               queue = processor.queues[index];
-               lock = processor.threadLocks[index];
-               condition = processor.threadConditions[index];
+//             own = processor.ownTasks[index];
+//             ownSync = processor.ownSyncTasks[index];
+//             queue = processor.queues[index];
+//             lock = processor.threadLocks[index];
+//             condition = processor.threadConditions[index];
                querySupportLock = processor.querySupportLock;
                THREADS = processor.THREADS;
                sleepers = processor.sleepers;
                querySupport = processor.querySupport;
                threadStates = processor.threadStates;
-               delayQueues = processor.delayQueues;
-               executors = processor.executors;
-               threadLocks = processor.threadLocks;
-               queues = processor.queues;
-               ownSyncTasks = processor.ownSyncTasks;
+//             delayQueues = processor.delayQueues;
+//             executors = processor.executors;
+//             threadLocks = processor.threadLocks;
+//             queues = processor.queues;
+//             ownSyncTasks = processor.ownSyncTasks;
        }
 
        synchronized void dispose() {
@@ -64,20 +66,19 @@ class QueryThread extends Thread implements SessionThread {
 //             System.err.println("qt dispose");
 
                disposed = true;
-               lock.lock();
-               condition.signalAll();
-               lock.unlock();
+//             lock.lock();
+//             condition.signalAll();
+//             lock.unlock();
                
                try {
                        exited.acquire();
                } catch (InterruptedException e) {
-                       Logger.defaultLogError(e);
+                       LOGGER.error("dispose was interrupted", e);
                }
 
                session = null;
                querySupport = null;
 
-
 //             System.err.println("qt disposed");
 
        }
@@ -94,43 +95,31 @@ class QueryThread extends Thread implements SessionThread {
 
        }
 
+       private boolean pumpTask() {
+               if(!processor.freeScheduling.isEmpty()) {
+                       tasks.add(processor.freeScheduling.removeFirst());
+                       return true;
+               }
+               return false;
+       }
+       
        ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
 
                try {
 
                        while(true) {
 
-                               // Perform own tasks first
-                               if(tasks.addAll(own)) {
-                                       own.clear();
-                               } else if (doWait && !ownSync.isEmpty()) {
-                                       tasks.add(ownSync.remove(ownSync.size()-1));
-                               }
-
-                               // Try some queued tasks
-                               lock.lock();
-                               if(tasks.addAll(queue)) {
-                                       queue.clear();
-                                       lock.unlock();
-                                       return tasks;
-                               } else {
-                                       lock.unlock();
-                               }
-
                                // Return tasks if some were found
                                if(!tasks.isEmpty()) return tasks;
+                               
                                if(!doWait) return null;
 
                                synchronized (querySupportLock) {
 
-                                       lock.lock();
-                                       
-                                       // Just maybe someone inserted tasks and notified just before synchronized block
-                                       if(tasks.addAll(queue)) {
-                                               queue.clear();
-                                               lock.unlock();
+                                       if(pumpTask())
                                                return tasks;
-                                       }
+
+//                                     lock.lock();
 
                                        // We are the last one awake
                                        if(sleepers.incrementAndGet() == THREADS) {
@@ -141,12 +130,8 @@ class QueryThread extends Thread implements SessionThread {
                                                if(querySupport == null) System.err.println("null qs");
                                                querySupport.ceased(index);
 
-                                               if(tasks.addAll(own)) {
-                                                       own.clear();
-                                               }
-                                               //                                      System.err.println("tasks after ceased: " + tasks.size());
-                                               if(!tasks.isEmpty()) {
-                                                       lock.unlock();
+                                               if(pumpTask()) {
+//                                                     lock.unlock();
                                                        return tasks;
                                                }
 
@@ -163,30 +148,44 @@ class QueryThread extends Thread implements SessionThread {
                                // We are done
                                if(isDisposed()) {
                                        threadStates[index] = ThreadState.DISPOSED;
-                                       lock.unlock();
+//                                     lock.unlock();
                                        return null;
                                }
 
+                               
                                threadStates[index] = ThreadState.SLEEP;
-                               condition.await();
+                               
+                               synchronized (querySupportLock) {
+                                       querySupportLock.wait(100);
+                                       
+                               }
+                               
+//                             boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
+//                             if(!woken) {
+//                                     synchronized (querySupportLock) {
+//                                             if(!processor.freeScheduling.isEmpty())
+//                                                     System.err.println("some tasks are available!");
+//                                     }
+//                             }
+
+                               sleepers.decrementAndGet();
 
                                // We are done
                                if(isDisposed()) {
                                        threadStates[index] = ThreadState.DISPOSED;
-                                       lock.unlock();
+                                       //lock.unlock();
                                        return null;
                                }
 
                                threadStates[index] = ThreadState.RUN;
 
-                               lock.unlock();
+                               //lock.unlock();
 
                        }
 
                } catch (InterruptedException e) {
 
-                       e.printStackTrace();
-
+                       LOGGER.error("Query handling (newTasks) was interrupted", e);
                        throw new RuntimeException("Querying was interrupted.", e);
 
                }
@@ -197,17 +196,17 @@ class QueryThread extends Thread implements SessionThread {
 
                boolean didExecute = false;
 
-               for(int performer=0;performer<THREADS;performer++) {
-                       if(!delayQueues[index * THREADS + performer].isEmpty()) {
-                               synchronized(executors[performer]) {
-                                       threadLocks[performer].lock();
-                                       queues[performer].addAll(delayQueues[index * THREADS + performer]);
-                                       delayQueues[index * THREADS + performer].clear();
-                                       executors[performer].notify();
-                                       threadLocks[performer].unlock();
-                               }
-                       }
-               }
+//             for(int performer=0;performer<THREADS;performer++) {
+//                     if(!delayQueues[index * THREADS + performer].isEmpty()) {
+//                             synchronized(executors[performer]) {
+//                                     threadLocks[performer].lock();
+//                                     queues[performer].addAll(delayQueues[index * THREADS + performer]);
+//                                     delayQueues[index * THREADS + performer].clear();
+//                                     executors[performer].notify();
+//                                     threadLocks[performer].unlock();
+//                             }
+//                     }
+//             }
 
                if(tasks.isEmpty()) {
                        ArrayList<SessionTask> finished = newTasks(false, tasks);
@@ -218,12 +217,13 @@ class QueryThread extends Thread implements SessionThread {
 
                        SessionTask task = tasks.remove(tasks.size() - 1);
 
-                       if(task.syncCaller == index) {
-                               ownSyncTasks[index].add(task);
-                       } else {
+//                     if(task.syncCaller == index) {
+//                             ownSyncTasks[index].add(task);
+//                     } else {
                                task.run(index);
+//                             System.err.println("QT(s) " + index + " runs " + task);
                                didExecute = true;
-                       }
+//                     }
 
                }
 
@@ -234,6 +234,8 @@ class QueryThread extends Thread implements SessionThread {
        @Override
        public void run() {
 
+               processor.thread.set(index);
+               
                QuerySupport support = this.querySupport;
 
                try {
@@ -248,27 +250,28 @@ class QueryThread extends Thread implements SessionThread {
                                while(!tasks.isEmpty()) {
 
                                        SessionTask task = tasks.remove(tasks.size()-1);
+//                                     System.err.println("QT " + index + " runs " + task);
                                        task.run(index);
 
                                }
 
-                               for(int performer=0;performer<THREADS;performer++) {
-                                       if(!delayQueues[index * THREADS + performer].isEmpty()) {
-                                               synchronized(executors[performer]) {
-                                                       threadLocks[performer].lock();
-                                                       queues[performer].addAll(delayQueues[index * THREADS + performer]);
-                                                       delayQueues[index * THREADS + performer].clear();
-                                                       executors[performer].notify();
-                                                       threadLocks[performer].unlock();
-                                               }
-                                       }
-                               }
+//                             for(int performer=0;performer<THREADS;performer++) {
+//                                     if(!delayQueues[index * THREADS + performer].isEmpty()) {
+//                                             synchronized(executors[performer]) {
+//                                                     threadLocks[performer].lock();
+//                                                     queues[performer].addAll(delayQueues[index * THREADS + performer]);
+//                                                     delayQueues[index * THREADS + performer].clear();
+//                                                     executors[performer].notify();
+//                                                     threadLocks[performer].unlock();
+//                                             }
+//                                     }
+//                             }
 
                        }
 
                } catch (Throwable t) {
 
-                       Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
+                       LOGGER.error("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
                        support.exit(t);
 
                } finally {