]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryThread.java
diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
new file mode 100644 (file)
index 0000000..b8916f3
--- /dev/null
@@ -0,0 +1,282 @@
+package org.simantics.db.impl.query;\r
+\r
+import java.util.ArrayList;\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+import java.util.concurrent.locks.Condition;\r
+import java.util.concurrent.locks.ReentrantLock;\r
+\r
+import org.simantics.db.Session;\r
+import org.simantics.db.common.SessionThread;\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
+import org.simantics.db.impl.query.QueryProcessor.ThreadState;\r
+\r
+class QueryThread extends Thread implements SessionThread {\r
+\r
+       boolean disposed = false;\r
+       private Semaphore exited = new Semaphore(0);\r
+\r
+       final int index;\r
+\r
+       private Session session;\r
+       private QuerySupport querySupport;\r
+\r
+       final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();\r
+       final private ArrayList<SessionTask> own;\r
+       final private ArrayList<SessionTask> ownSync;\r
+       final private ArrayList<SessionTask> queue;\r
+       final private ReentrantLock lock;\r
+       final private Condition condition;\r
+       final private Object querySupportLock;\r
+       final private int THREADS;\r
+       final private AtomicInteger sleepers;\r
+       final private ThreadState[] threadStates;\r
+       final private ArrayList<SessionTask>[] delayQueues;\r
+       final private QueryThread[] executors;\r
+       final private ReentrantLock[] threadLocks;\r
+       final private ArrayList<SessionTask>[] queues;\r
+       final private ArrayList<SessionTask>[] ownSyncTasks;\r
+\r
+       public QueryThread(Session session, QueryProcessor processor, int index, String name) {\r
+               super(QueryProcessor.QueryThreadGroup, null, name);\r
+               this.session = session;\r
+               this.index = index;\r
+               own = processor.ownTasks[index];\r
+               ownSync = processor.ownSyncTasks[index];\r
+               queue = processor.queues[index];\r
+               lock = processor.threadLocks[index];\r
+               condition = processor.threadConditions[index];\r
+               querySupportLock = processor.querySupportLock;\r
+               THREADS = processor.THREADS;\r
+               sleepers = processor.sleepers;\r
+               querySupport = processor.querySupport;\r
+               threadStates = processor.threadStates;\r
+               delayQueues = processor.delayQueues;\r
+               executors = processor.executors;\r
+               threadLocks = processor.threadLocks;\r
+               queues = processor.queues;\r
+               ownSyncTasks = processor.ownSyncTasks;\r
+       }\r
+\r
+       synchronized void dispose() {\r
+\r
+//             System.err.println("qt dispose");\r
+\r
+               disposed = true;\r
+               lock.lock();\r
+               condition.signalAll();\r
+               lock.unlock();\r
+               \r
+               try {\r
+                       exited.acquire();\r
+               } catch (InterruptedException e) {\r
+                       Logger.defaultLogError(e);\r
+               }\r
+\r
+               session = null;\r
+               querySupport = null;\r
+\r
+\r
+//             System.err.println("qt disposed");\r
+\r
+       }\r
+\r
+       boolean isDisposed() { \r
+\r
+               return disposed;\r
+\r
+       }\r
+\r
+       public Session getSession() {\r
+\r
+               return session;\r
+\r
+       }\r
+\r
+       ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {\r
+\r
+               try {\r
+\r
+                       while(true) {\r
+\r
+                               // Perform own tasks first\r
+                               if(tasks.addAll(own)) {\r
+                                       own.clear();\r
+                               } else if (doWait && !ownSync.isEmpty()) {\r
+                                       tasks.add(ownSync.remove(ownSync.size()-1));\r
+                               }\r
+\r
+                               // Try some queued tasks\r
+                               lock.lock();\r
+                               if(tasks.addAll(queue)) {\r
+                                       queue.clear();\r
+                                       lock.unlock();\r
+                                       return tasks;\r
+                               } else {\r
+                                       lock.unlock();\r
+                               }\r
+\r
+                               // Return tasks if some were found\r
+                               if(!tasks.isEmpty()) return tasks;\r
+                               if(!doWait) return null;\r
+\r
+                               synchronized (querySupportLock) {\r
+\r
+                                       lock.lock();\r
+                                       \r
+                                       // Just maybe someone inserted tasks and notified just before synchronized block\r
+                                       if(tasks.addAll(queue)) {\r
+                                               queue.clear();\r
+                                               lock.unlock();\r
+                                               return tasks;\r
+                                       }\r
+\r
+                                       // We are the last one awake\r
+                                       if(sleepers.incrementAndGet() == THREADS) {\r
+\r
+                                               // Do not indicate sleeping yet\r
+                                               sleepers.decrementAndGet();\r
+                                               // Ceased can create new own tasks\r
+                                               if(querySupport == null) System.err.println("null qs");\r
+                                               querySupport.ceased(index);\r
+\r
+                                               if(tasks.addAll(own)) {\r
+                                                       own.clear();\r
+                                               }\r
+                                               //                                      System.err.println("tasks after ceased: " + tasks.size());\r
+                                               if(!tasks.isEmpty()) {\r
+                                                       lock.unlock();\r
+                                                       return tasks;\r
+                                               }\r
+\r
+                                               // OK, now we are going to sleep\r
+                                               sleepers.incrementAndGet();\r
+\r
+                                       }\r
+\r
+                               }\r
+\r
+                               // Nope nothing. Sleep & wait\r
+                               // Whoever releases this calls sleepers.decrementAndGet()\r
+\r
+                               // We are done\r
+                               if(isDisposed()) {\r
+                                       threadStates[index] = ThreadState.DISPOSED;\r
+                                       lock.unlock();\r
+                                       return null;\r
+                               }\r
+\r
+                               threadStates[index] = ThreadState.SLEEP;\r
+                               condition.await();\r
+\r
+                               // We are done\r
+                               if(isDisposed()) {\r
+                                       threadStates[index] = ThreadState.DISPOSED;\r
+                                       lock.unlock();\r
+                                       return null;\r
+                               }\r
+\r
+                               threadStates[index] = ThreadState.RUN;\r
+\r
+                               lock.unlock();\r
+\r
+                       }\r
+\r
+               } catch (InterruptedException e) {\r
+\r
+                       e.printStackTrace();\r
+\r
+                       throw new RuntimeException("Querying was interrupted.", e);\r
+\r
+               }\r
+\r
+       }\r
+\r
+       public boolean runSynchronized() {\r
+\r
+               boolean didExecute = false;\r
+\r
+               for(int performer=0;performer<THREADS;performer++) {\r
+                       if(!delayQueues[index * THREADS + performer].isEmpty()) {\r
+                               synchronized(executors[performer]) {\r
+                                       threadLocks[performer].lock();\r
+                                       queues[performer].addAll(delayQueues[index * THREADS + performer]);\r
+                                       delayQueues[index * THREADS + performer].clear();\r
+                                       executors[performer].notify();\r
+                                       threadLocks[performer].unlock();\r
+                               }\r
+                       }\r
+               }\r
+\r
+               if(tasks.isEmpty()) {\r
+                       ArrayList<SessionTask> finished = newTasks(false, tasks);\r
+                       if(finished == null) return didExecute; \r
+               }\r
+\r
+               while(!tasks.isEmpty()) {\r
+\r
+                       SessionTask task = tasks.remove(tasks.size() - 1);\r
+\r
+                       if(task.syncCaller == index) {\r
+                               ownSyncTasks[index].add(task);\r
+                       } else {\r
+                               task.run(index);\r
+                               didExecute = true;\r
+                       }\r
+\r
+               }\r
+\r
+               return didExecute;\r
+\r
+       }\r
+\r
+       @Override\r
+       public void run() {\r
+\r
+               QuerySupport support = this.querySupport;\r
+\r
+               try {\r
+\r
+                       while (true) {\r
+\r
+                               boolean finished = newTasks(true, tasks) == null;\r
+                               if(finished) {\r
+                                       return;\r
+                               }\r
+                               \r
+                               while(!tasks.isEmpty()) {\r
+\r
+                                       SessionTask task = tasks.remove(tasks.size()-1);\r
+                                       task.run(index);\r
+\r
+                               }\r
+\r
+                               for(int performer=0;performer<THREADS;performer++) {\r
+                                       if(!delayQueues[index * THREADS + performer].isEmpty()) {\r
+                                               synchronized(executors[performer]) {\r
+                                                       threadLocks[performer].lock();\r
+                                                       queues[performer].addAll(delayQueues[index * THREADS + performer]);\r
+                                                       delayQueues[index * THREADS + performer].clear();\r
+                                                       executors[performer].notify();\r
+                                                       threadLocks[performer].unlock();\r
+                                               }\r
+                                       }\r
+                               }\r
+\r
+                       }\r
+\r
+               } catch (Throwable t) {\r
+\r
+                       Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);\r
+                       support.exit(t);\r
+\r
+               } finally {\r
+                       \r
+                       exited.release();\r
+\r
+               }\r
+\r
+       }\r
+\r
+}
\ No newline at end of file