]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
Multiple reader thread support for db client
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryThread.java
index b8916f3ee6de0aab7ba8076ccecd88ed8d9c44b6..ab10efa0baab252a1ff5ef8b8485c313b2b32db1 100644 (file)
-package org.simantics.db.impl.query;\r
-\r
-import java.util.ArrayList;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.atomic.AtomicInteger;\r
-import java.util.concurrent.locks.Condition;\r
-import java.util.concurrent.locks.ReentrantLock;\r
-\r
-import org.simantics.db.Session;\r
-import org.simantics.db.common.SessionThread;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
-import org.simantics.db.impl.query.QueryProcessor.ThreadState;\r
-\r
-class QueryThread extends Thread implements SessionThread {\r
-\r
-       boolean disposed = false;\r
-       private Semaphore exited = new Semaphore(0);\r
-\r
-       final int index;\r
-\r
-       private Session session;\r
-       private QuerySupport querySupport;\r
-\r
-       final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();\r
-       final private ArrayList<SessionTask> own;\r
-       final private ArrayList<SessionTask> ownSync;\r
-       final private ArrayList<SessionTask> queue;\r
-       final private ReentrantLock lock;\r
-       final private Condition condition;\r
-       final private Object querySupportLock;\r
-       final private int THREADS;\r
-       final private AtomicInteger sleepers;\r
-       final private ThreadState[] threadStates;\r
-       final private ArrayList<SessionTask>[] delayQueues;\r
-       final private QueryThread[] executors;\r
-       final private ReentrantLock[] threadLocks;\r
-       final private ArrayList<SessionTask>[] queues;\r
-       final private ArrayList<SessionTask>[] ownSyncTasks;\r
-\r
-       public QueryThread(Session session, QueryProcessor processor, int index, String name) {\r
-               super(QueryProcessor.QueryThreadGroup, null, name);\r
-               this.session = session;\r
-               this.index = index;\r
-               own = processor.ownTasks[index];\r
-               ownSync = processor.ownSyncTasks[index];\r
-               queue = processor.queues[index];\r
-               lock = processor.threadLocks[index];\r
-               condition = processor.threadConditions[index];\r
-               querySupportLock = processor.querySupportLock;\r
-               THREADS = processor.THREADS;\r
-               sleepers = processor.sleepers;\r
-               querySupport = processor.querySupport;\r
-               threadStates = processor.threadStates;\r
-               delayQueues = processor.delayQueues;\r
-               executors = processor.executors;\r
-               threadLocks = processor.threadLocks;\r
-               queues = processor.queues;\r
-               ownSyncTasks = processor.ownSyncTasks;\r
-       }\r
-\r
-       synchronized void dispose() {\r
-\r
-//             System.err.println("qt dispose");\r
-\r
-               disposed = true;\r
-               lock.lock();\r
-               condition.signalAll();\r
-               lock.unlock();\r
-               \r
-               try {\r
-                       exited.acquire();\r
-               } catch (InterruptedException e) {\r
-                       Logger.defaultLogError(e);\r
-               }\r
-\r
-               session = null;\r
-               querySupport = null;\r
-\r
-\r
-//             System.err.println("qt disposed");\r
-\r
-       }\r
-\r
-       boolean isDisposed() { \r
-\r
-               return disposed;\r
-\r
-       }\r
-\r
-       public Session getSession() {\r
-\r
-               return session;\r
-\r
-       }\r
-\r
-       ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {\r
-\r
-               try {\r
-\r
-                       while(true) {\r
-\r
-                               // Perform own tasks first\r
-                               if(tasks.addAll(own)) {\r
-                                       own.clear();\r
-                               } else if (doWait && !ownSync.isEmpty()) {\r
-                                       tasks.add(ownSync.remove(ownSync.size()-1));\r
-                               }\r
-\r
-                               // Try some queued tasks\r
-                               lock.lock();\r
-                               if(tasks.addAll(queue)) {\r
-                                       queue.clear();\r
-                                       lock.unlock();\r
-                                       return tasks;\r
-                               } else {\r
-                                       lock.unlock();\r
-                               }\r
-\r
-                               // Return tasks if some were found\r
-                               if(!tasks.isEmpty()) return tasks;\r
-                               if(!doWait) return null;\r
-\r
-                               synchronized (querySupportLock) {\r
-\r
-                                       lock.lock();\r
-                                       \r
-                                       // Just maybe someone inserted tasks and notified just before synchronized block\r
-                                       if(tasks.addAll(queue)) {\r
-                                               queue.clear();\r
-                                               lock.unlock();\r
-                                               return tasks;\r
-                                       }\r
-\r
-                                       // We are the last one awake\r
-                                       if(sleepers.incrementAndGet() == THREADS) {\r
-\r
-                                               // Do not indicate sleeping yet\r
-                                               sleepers.decrementAndGet();\r
-                                               // Ceased can create new own tasks\r
-                                               if(querySupport == null) System.err.println("null qs");\r
-                                               querySupport.ceased(index);\r
-\r
-                                               if(tasks.addAll(own)) {\r
-                                                       own.clear();\r
-                                               }\r
-                                               //                                      System.err.println("tasks after ceased: " + tasks.size());\r
-                                               if(!tasks.isEmpty()) {\r
-                                                       lock.unlock();\r
-                                                       return tasks;\r
-                                               }\r
-\r
-                                               // OK, now we are going to sleep\r
-                                               sleepers.incrementAndGet();\r
-\r
-                                       }\r
-\r
-                               }\r
-\r
-                               // Nope nothing. Sleep & wait\r
-                               // Whoever releases this calls sleepers.decrementAndGet()\r
-\r
-                               // We are done\r
-                               if(isDisposed()) {\r
-                                       threadStates[index] = ThreadState.DISPOSED;\r
-                                       lock.unlock();\r
-                                       return null;\r
-                               }\r
-\r
-                               threadStates[index] = ThreadState.SLEEP;\r
-                               condition.await();\r
-\r
-                               // We are done\r
-                               if(isDisposed()) {\r
-                                       threadStates[index] = ThreadState.DISPOSED;\r
-                                       lock.unlock();\r
-                                       return null;\r
-                               }\r
-\r
-                               threadStates[index] = ThreadState.RUN;\r
-\r
-                               lock.unlock();\r
-\r
-                       }\r
-\r
-               } catch (InterruptedException e) {\r
-\r
-                       e.printStackTrace();\r
-\r
-                       throw new RuntimeException("Querying was interrupted.", e);\r
-\r
-               }\r
-\r
-       }\r
-\r
-       public boolean runSynchronized() {\r
-\r
-               boolean didExecute = false;\r
-\r
-               for(int performer=0;performer<THREADS;performer++) {\r
-                       if(!delayQueues[index * THREADS + performer].isEmpty()) {\r
-                               synchronized(executors[performer]) {\r
-                                       threadLocks[performer].lock();\r
-                                       queues[performer].addAll(delayQueues[index * THREADS + performer]);\r
-                                       delayQueues[index * THREADS + performer].clear();\r
-                                       executors[performer].notify();\r
-                                       threadLocks[performer].unlock();\r
-                               }\r
-                       }\r
-               }\r
-\r
-               if(tasks.isEmpty()) {\r
-                       ArrayList<SessionTask> finished = newTasks(false, tasks);\r
-                       if(finished == null) return didExecute; \r
-               }\r
-\r
-               while(!tasks.isEmpty()) {\r
-\r
-                       SessionTask task = tasks.remove(tasks.size() - 1);\r
-\r
-                       if(task.syncCaller == index) {\r
-                               ownSyncTasks[index].add(task);\r
-                       } else {\r
-                               task.run(index);\r
-                               didExecute = true;\r
-                       }\r
-\r
-               }\r
-\r
-               return didExecute;\r
-\r
-       }\r
-\r
-       @Override\r
-       public void run() {\r
-\r
-               QuerySupport support = this.querySupport;\r
-\r
-               try {\r
-\r
-                       while (true) {\r
-\r
-                               boolean finished = newTasks(true, tasks) == null;\r
-                               if(finished) {\r
-                                       return;\r
-                               }\r
-                               \r
-                               while(!tasks.isEmpty()) {\r
-\r
-                                       SessionTask task = tasks.remove(tasks.size()-1);\r
-                                       task.run(index);\r
-\r
-                               }\r
-\r
-                               for(int performer=0;performer<THREADS;performer++) {\r
-                                       if(!delayQueues[index * THREADS + performer].isEmpty()) {\r
-                                               synchronized(executors[performer]) {\r
-                                                       threadLocks[performer].lock();\r
-                                                       queues[performer].addAll(delayQueues[index * THREADS + performer]);\r
-                                                       delayQueues[index * THREADS + performer].clear();\r
-                                                       executors[performer].notify();\r
-                                                       threadLocks[performer].unlock();\r
-                                               }\r
-                                       }\r
-                               }\r
-\r
-                       }\r
-\r
-               } catch (Throwable t) {\r
-\r
-                       Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);\r
-                       support.exit(t);\r
-\r
-               } finally {\r
-                       \r
-                       exited.release();\r
-\r
-               }\r
-\r
-       }\r
-\r
+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.simantics.db.Session;
+import org.simantics.db.common.SessionThread;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
+import org.simantics.db.impl.query.QueryProcessor.ThreadState;
+import org.slf4j.LoggerFactory;
+
+class QueryThread extends Thread implements SessionThread {
+
+       private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class);
+
+       boolean disposed = false;
+       private Semaphore exited = new Semaphore(0);
+
+       final int index;
+
+       private Session session;
+       private QuerySupport querySupport;
+       private final QueryProcessor processor;
+
+       final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
+//     final private ArrayList<SessionTask> own;
+//     final private ArrayList<SessionTask> ownSync;
+//     final private ArrayList<SessionTask> queue;
+//     final private ReentrantLock lock;
+//     final private Condition condition;
+       final private Object querySupportLock;
+       final private int THREADS;
+       final private AtomicInteger sleepers;
+       final private ThreadState[] threadStates;
+//     final private ArrayList<SessionTask>[] delayQueues;
+//     final private QueryThread[] executors;
+//     final private ReentrantLock[] threadLocks;
+//     final private ArrayList<SessionTask>[] queues;
+//     final private ArrayList<SessionTask>[] ownSyncTasks;
+
+       public QueryThread(Session session, QueryProcessor processor, int index, String name) {
+               super(QueryProcessor.QueryThreadGroup, null, name);
+               this.session = session;
+               this.processor = processor;
+               this.index = index;
+//             own = processor.ownTasks[index];
+//             ownSync = processor.ownSyncTasks[index];
+//             queue = processor.queues[index];
+//             lock = processor.threadLocks[index];
+//             condition = processor.threadConditions[index];
+               querySupportLock = processor.querySupportLock;
+               THREADS = processor.THREADS;
+               sleepers = processor.sleepers;
+               querySupport = processor.querySupport;
+               threadStates = processor.threadStates;
+//             delayQueues = processor.delayQueues;
+//             executors = processor.executors;
+//             threadLocks = processor.threadLocks;
+//             queues = processor.queues;
+//             ownSyncTasks = processor.ownSyncTasks;
+       }
+
+       synchronized void dispose() {
+
+//             System.err.println("qt dispose");
+
+               disposed = true;
+//             lock.lock();
+//             condition.signalAll();
+//             lock.unlock();
+               
+               try {
+                       exited.acquire();
+               } catch (InterruptedException e) {
+                       LOGGER.error("dispose was interrupted", e);
+               }
+
+               session = null;
+               querySupport = null;
+
+//             System.err.println("qt disposed");
+
+       }
+
+       boolean isDisposed() { 
+
+               return disposed;
+
+       }
+
+       public Session getSession() {
+
+               return session;
+
+       }
+
+       private boolean pumpTask() {
+               if(!processor.freeScheduling.isEmpty()) {
+                       tasks.add(processor.freeScheduling.removeFirst());
+                       return true;
+               }
+               return false;
+       }
+       
+       ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
+
+               try {
+
+                       while(true) {
+
+                               // Return tasks if some were found
+                               if(!tasks.isEmpty()) return tasks;
+                               
+                               if(!doWait) return null;
+
+                               synchronized (querySupportLock) {
+
+                                       if(pumpTask())
+                                               return tasks;
+
+//                                     lock.lock();
+
+                                       // We are the last one awake
+                                       if(sleepers.incrementAndGet() == THREADS) {
+
+                                               // Do not indicate sleeping yet
+                                               sleepers.decrementAndGet();
+                                               // Ceased can create new own tasks
+                                               if(querySupport == null) System.err.println("null qs");
+                                               querySupport.ceased(index);
+
+                                               if(pumpTask()) {
+//                                                     lock.unlock();
+                                                       return tasks;
+                                               }
+
+                                               // OK, now we are going to sleep
+                                               sleepers.incrementAndGet();
+
+                                       }
+
+                               }
+
+                               // Nope nothing. Sleep & wait
+                               // Whoever releases this calls sleepers.decrementAndGet()
+
+                               // We are done
+                               if(isDisposed()) {
+                                       threadStates[index] = ThreadState.DISPOSED;
+//                                     lock.unlock();
+                                       return null;
+                               }
+
+                               
+                               threadStates[index] = ThreadState.SLEEP;
+                               
+                               synchronized (querySupportLock) {
+                                       querySupportLock.wait(100);
+                                       
+                               }
+                               
+//                             boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
+//                             if(!woken) {
+//                                     synchronized (querySupportLock) {
+//                                             if(!processor.freeScheduling.isEmpty())
+//                                                     System.err.println("some tasks are available!");
+//                                     }
+//                             }
+
+                               sleepers.decrementAndGet();
+
+                               // We are done
+                               if(isDisposed()) {
+                                       threadStates[index] = ThreadState.DISPOSED;
+                                       //lock.unlock();
+                                       return null;
+                               }
+
+                               threadStates[index] = ThreadState.RUN;
+
+                               //lock.unlock();
+
+                       }
+
+               } catch (InterruptedException e) {
+
+                       LOGGER.error("Query handling (newTasks) was interrupted", e);
+                       throw new RuntimeException("Querying was interrupted.", e);
+
+               }
+
+       }
+
+       public boolean runSynchronized() {
+
+               boolean didExecute = false;
+
+//             for(int performer=0;performer<THREADS;performer++) {
+//                     if(!delayQueues[index * THREADS + performer].isEmpty()) {
+//                             synchronized(executors[performer]) {
+//                                     threadLocks[performer].lock();
+//                                     queues[performer].addAll(delayQueues[index * THREADS + performer]);
+//                                     delayQueues[index * THREADS + performer].clear();
+//                                     executors[performer].notify();
+//                                     threadLocks[performer].unlock();
+//                             }
+//                     }
+//             }
+
+               if(tasks.isEmpty()) {
+                       ArrayList<SessionTask> finished = newTasks(false, tasks);
+                       if(finished == null) return didExecute; 
+               }
+
+               while(!tasks.isEmpty()) {
+
+                       SessionTask task = tasks.remove(tasks.size() - 1);
+
+//                     if(task.syncCaller == index) {
+//                             ownSyncTasks[index].add(task);
+//                     } else {
+                               task.run(index);
+//                             System.err.println("QT(s) " + index + " runs " + task);
+                               didExecute = true;
+//                     }
+
+               }
+
+               return didExecute;
+
+       }
+
+       @Override
+       public void run() {
+
+               processor.thread.set(index);
+               
+               QuerySupport support = this.querySupport;
+
+               try {
+
+                       while (true) {
+
+                               boolean finished = newTasks(true, tasks) == null;
+                               if(finished) {
+                                       return;
+                               }
+                               
+                               while(!tasks.isEmpty()) {
+
+                                       SessionTask task = tasks.remove(tasks.size()-1);
+//                                     System.err.println("QT " + index + " runs " + task);
+                                       task.run(index);
+
+                               }
+
+//                             for(int performer=0;performer<THREADS;performer++) {
+//                                     if(!delayQueues[index * THREADS + performer].isEmpty()) {
+//                                             synchronized(executors[performer]) {
+//                                                     threadLocks[performer].lock();
+//                                                     queues[performer].addAll(delayQueues[index * THREADS + performer]);
+//                                                     delayQueues[index * THREADS + performer].clear();
+//                                                     executors[performer].notify();
+//                                                     threadLocks[performer].unlock();
+//                                             }
+//                                     }
+//                             }
+
+                       }
+
+               } catch (Throwable t) {
+
+                       LOGGER.error("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
+                       support.exit(t);
+
+               } finally {
+                       
+                       exited.release();
+
+               }
+
+       }
+
 }
\ No newline at end of file