--- /dev/null
+package org.simantics.db.impl.query;\r
+\r
+import java.util.ArrayList;\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+import java.util.concurrent.locks.Condition;\r
+import java.util.concurrent.locks.ReentrantLock;\r
+\r
+import org.simantics.db.Session;\r
+import org.simantics.db.common.SessionThread;\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
+import org.simantics.db.impl.query.QueryProcessor.ThreadState;\r
+\r
+class QueryThread extends Thread implements SessionThread {\r
+\r
+ boolean disposed = false;\r
+ private Semaphore exited = new Semaphore(0);\r
+\r
+ final int index;\r
+\r
+ private Session session;\r
+ private QuerySupport querySupport;\r
+\r
+ final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();\r
+ final private ArrayList<SessionTask> own;\r
+ final private ArrayList<SessionTask> ownSync;\r
+ final private ArrayList<SessionTask> queue;\r
+ final private ReentrantLock lock;\r
+ final private Condition condition;\r
+ final private Object querySupportLock;\r
+ final private int THREADS;\r
+ final private AtomicInteger sleepers;\r
+ final private ThreadState[] threadStates;\r
+ final private ArrayList<SessionTask>[] delayQueues;\r
+ final private QueryThread[] executors;\r
+ final private ReentrantLock[] threadLocks;\r
+ final private ArrayList<SessionTask>[] queues;\r
+ final private ArrayList<SessionTask>[] ownSyncTasks;\r
+\r
+ public QueryThread(Session session, QueryProcessor processor, int index, String name) {\r
+ super(QueryProcessor.QueryThreadGroup, null, name);\r
+ this.session = session;\r
+ this.index = index;\r
+ own = processor.ownTasks[index];\r
+ ownSync = processor.ownSyncTasks[index];\r
+ queue = processor.queues[index];\r
+ lock = processor.threadLocks[index];\r
+ condition = processor.threadConditions[index];\r
+ querySupportLock = processor.querySupportLock;\r
+ THREADS = processor.THREADS;\r
+ sleepers = processor.sleepers;\r
+ querySupport = processor.querySupport;\r
+ threadStates = processor.threadStates;\r
+ delayQueues = processor.delayQueues;\r
+ executors = processor.executors;\r
+ threadLocks = processor.threadLocks;\r
+ queues = processor.queues;\r
+ ownSyncTasks = processor.ownSyncTasks;\r
+ }\r
+\r
+ synchronized void dispose() {\r
+\r
+// System.err.println("qt dispose");\r
+\r
+ disposed = true;\r
+ lock.lock();\r
+ condition.signalAll();\r
+ lock.unlock();\r
+ \r
+ try {\r
+ exited.acquire();\r
+ } catch (InterruptedException e) {\r
+ Logger.defaultLogError(e);\r
+ }\r
+\r
+ session = null;\r
+ querySupport = null;\r
+\r
+\r
+// System.err.println("qt disposed");\r
+\r
+ }\r
+\r
+ boolean isDisposed() { \r
+\r
+ return disposed;\r
+\r
+ }\r
+\r
+ public Session getSession() {\r
+\r
+ return session;\r
+\r
+ }\r
+\r
+ ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {\r
+\r
+ try {\r
+\r
+ while(true) {\r
+\r
+ // Perform own tasks first\r
+ if(tasks.addAll(own)) {\r
+ own.clear();\r
+ } else if (doWait && !ownSync.isEmpty()) {\r
+ tasks.add(ownSync.remove(ownSync.size()-1));\r
+ }\r
+\r
+ // Try some queued tasks\r
+ lock.lock();\r
+ if(tasks.addAll(queue)) {\r
+ queue.clear();\r
+ lock.unlock();\r
+ return tasks;\r
+ } else {\r
+ lock.unlock();\r
+ }\r
+\r
+ // Return tasks if some were found\r
+ if(!tasks.isEmpty()) return tasks;\r
+ if(!doWait) return null;\r
+\r
+ synchronized (querySupportLock) {\r
+\r
+ lock.lock();\r
+ \r
+ // Just maybe someone inserted tasks and notified just before synchronized block\r
+ if(tasks.addAll(queue)) {\r
+ queue.clear();\r
+ lock.unlock();\r
+ return tasks;\r
+ }\r
+\r
+ // We are the last one awake\r
+ if(sleepers.incrementAndGet() == THREADS) {\r
+\r
+ // Do not indicate sleeping yet\r
+ sleepers.decrementAndGet();\r
+ // Ceased can create new own tasks\r
+ if(querySupport == null) System.err.println("null qs");\r
+ querySupport.ceased(index);\r
+\r
+ if(tasks.addAll(own)) {\r
+ own.clear();\r
+ }\r
+ // System.err.println("tasks after ceased: " + tasks.size());\r
+ if(!tasks.isEmpty()) {\r
+ lock.unlock();\r
+ return tasks;\r
+ }\r
+\r
+ // OK, now we are going to sleep\r
+ sleepers.incrementAndGet();\r
+\r
+ }\r
+\r
+ }\r
+\r
+ // Nope nothing. Sleep & wait\r
+ // Whoever releases this calls sleepers.decrementAndGet()\r
+\r
+ // We are done\r
+ if(isDisposed()) {\r
+ threadStates[index] = ThreadState.DISPOSED;\r
+ lock.unlock();\r
+ return null;\r
+ }\r
+\r
+ threadStates[index] = ThreadState.SLEEP;\r
+ condition.await();\r
+\r
+ // We are done\r
+ if(isDisposed()) {\r
+ threadStates[index] = ThreadState.DISPOSED;\r
+ lock.unlock();\r
+ return null;\r
+ }\r
+\r
+ threadStates[index] = ThreadState.RUN;\r
+\r
+ lock.unlock();\r
+\r
+ }\r
+\r
+ } catch (InterruptedException e) {\r
+\r
+ e.printStackTrace();\r
+\r
+ throw new RuntimeException("Querying was interrupted.", e);\r
+\r
+ }\r
+\r
+ }\r
+\r
+ public boolean runSynchronized() {\r
+\r
+ boolean didExecute = false;\r
+\r
+ for(int performer=0;performer<THREADS;performer++) {\r
+ if(!delayQueues[index * THREADS + performer].isEmpty()) {\r
+ synchronized(executors[performer]) {\r
+ threadLocks[performer].lock();\r
+ queues[performer].addAll(delayQueues[index * THREADS + performer]);\r
+ delayQueues[index * THREADS + performer].clear();\r
+ executors[performer].notify();\r
+ threadLocks[performer].unlock();\r
+ }\r
+ }\r
+ }\r
+\r
+ if(tasks.isEmpty()) {\r
+ ArrayList<SessionTask> finished = newTasks(false, tasks);\r
+ if(finished == null) return didExecute; \r
+ }\r
+\r
+ while(!tasks.isEmpty()) {\r
+\r
+ SessionTask task = tasks.remove(tasks.size() - 1);\r
+\r
+ if(task.syncCaller == index) {\r
+ ownSyncTasks[index].add(task);\r
+ } else {\r
+ task.run(index);\r
+ didExecute = true;\r
+ }\r
+\r
+ }\r
+\r
+ return didExecute;\r
+\r
+ }\r
+\r
+ @Override\r
+ public void run() {\r
+\r
+ QuerySupport support = this.querySupport;\r
+\r
+ try {\r
+\r
+ while (true) {\r
+\r
+ boolean finished = newTasks(true, tasks) == null;\r
+ if(finished) {\r
+ return;\r
+ }\r
+ \r
+ while(!tasks.isEmpty()) {\r
+\r
+ SessionTask task = tasks.remove(tasks.size()-1);\r
+ task.run(index);\r
+\r
+ }\r
+\r
+ for(int performer=0;performer<THREADS;performer++) {\r
+ if(!delayQueues[index * THREADS + performer].isEmpty()) {\r
+ synchronized(executors[performer]) {\r
+ threadLocks[performer].lock();\r
+ queues[performer].addAll(delayQueues[index * THREADS + performer]);\r
+ delayQueues[index * THREADS + performer].clear();\r
+ executors[performer].notify();\r
+ threadLocks[performer].unlock();\r
+ }\r
+ }\r
+ }\r
+\r
+ }\r
+\r
+ } catch (Throwable t) {\r
+\r
+ Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);\r
+ support.exit(t);\r
+\r
+ } finally {\r
+ \r
+ exited.release();\r
+\r
+ }\r
+\r
+ }\r
+\r
+}
\ No newline at end of file