-package org.simantics.db.impl.query;\r
-\r
-import java.util.ArrayList;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.atomic.AtomicInteger;\r
-import java.util.concurrent.locks.Condition;\r
-import java.util.concurrent.locks.ReentrantLock;\r
-\r
-import org.simantics.db.Session;\r
-import org.simantics.db.common.SessionThread;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
-import org.simantics.db.impl.query.QueryProcessor.ThreadState;\r
-\r
-class QueryThread extends Thread implements SessionThread {\r
-\r
- boolean disposed = false;\r
- private Semaphore exited = new Semaphore(0);\r
-\r
- final int index;\r
-\r
- private Session session;\r
- private QuerySupport querySupport;\r
-\r
- final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();\r
- final private ArrayList<SessionTask> own;\r
- final private ArrayList<SessionTask> ownSync;\r
- final private ArrayList<SessionTask> queue;\r
- final private ReentrantLock lock;\r
- final private Condition condition;\r
- final private Object querySupportLock;\r
- final private int THREADS;\r
- final private AtomicInteger sleepers;\r
- final private ThreadState[] threadStates;\r
- final private ArrayList<SessionTask>[] delayQueues;\r
- final private QueryThread[] executors;\r
- final private ReentrantLock[] threadLocks;\r
- final private ArrayList<SessionTask>[] queues;\r
- final private ArrayList<SessionTask>[] ownSyncTasks;\r
-\r
- public QueryThread(Session session, QueryProcessor processor, int index, String name) {\r
- super(QueryProcessor.QueryThreadGroup, null, name);\r
- this.session = session;\r
- this.index = index;\r
- own = processor.ownTasks[index];\r
- ownSync = processor.ownSyncTasks[index];\r
- queue = processor.queues[index];\r
- lock = processor.threadLocks[index];\r
- condition = processor.threadConditions[index];\r
- querySupportLock = processor.querySupportLock;\r
- THREADS = processor.THREADS;\r
- sleepers = processor.sleepers;\r
- querySupport = processor.querySupport;\r
- threadStates = processor.threadStates;\r
- delayQueues = processor.delayQueues;\r
- executors = processor.executors;\r
- threadLocks = processor.threadLocks;\r
- queues = processor.queues;\r
- ownSyncTasks = processor.ownSyncTasks;\r
- }\r
-\r
- synchronized void dispose() {\r
-\r
-// System.err.println("qt dispose");\r
-\r
- disposed = true;\r
- lock.lock();\r
- condition.signalAll();\r
- lock.unlock();\r
- \r
- try {\r
- exited.acquire();\r
- } catch (InterruptedException e) {\r
- Logger.defaultLogError(e);\r
- }\r
-\r
- session = null;\r
- querySupport = null;\r
-\r
-\r
-// System.err.println("qt disposed");\r
-\r
- }\r
-\r
- boolean isDisposed() { \r
-\r
- return disposed;\r
-\r
- }\r
-\r
- public Session getSession() {\r
-\r
- return session;\r
-\r
- }\r
-\r
- ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {\r
-\r
- try {\r
-\r
- while(true) {\r
-\r
- // Perform own tasks first\r
- if(tasks.addAll(own)) {\r
- own.clear();\r
- } else if (doWait && !ownSync.isEmpty()) {\r
- tasks.add(ownSync.remove(ownSync.size()-1));\r
- }\r
-\r
- // Try some queued tasks\r
- lock.lock();\r
- if(tasks.addAll(queue)) {\r
- queue.clear();\r
- lock.unlock();\r
- return tasks;\r
- } else {\r
- lock.unlock();\r
- }\r
-\r
- // Return tasks if some were found\r
- if(!tasks.isEmpty()) return tasks;\r
- if(!doWait) return null;\r
-\r
- synchronized (querySupportLock) {\r
-\r
- lock.lock();\r
- \r
- // Just maybe someone inserted tasks and notified just before synchronized block\r
- if(tasks.addAll(queue)) {\r
- queue.clear();\r
- lock.unlock();\r
- return tasks;\r
- }\r
-\r
- // We are the last one awake\r
- if(sleepers.incrementAndGet() == THREADS) {\r
-\r
- // Do not indicate sleeping yet\r
- sleepers.decrementAndGet();\r
- // Ceased can create new own tasks\r
- if(querySupport == null) System.err.println("null qs");\r
- querySupport.ceased(index);\r
-\r
- if(tasks.addAll(own)) {\r
- own.clear();\r
- }\r
- // System.err.println("tasks after ceased: " + tasks.size());\r
- if(!tasks.isEmpty()) {\r
- lock.unlock();\r
- return tasks;\r
- }\r
-\r
- // OK, now we are going to sleep\r
- sleepers.incrementAndGet();\r
-\r
- }\r
-\r
- }\r
-\r
- // Nope nothing. Sleep & wait\r
- // Whoever releases this calls sleepers.decrementAndGet()\r
-\r
- // We are done\r
- if(isDisposed()) {\r
- threadStates[index] = ThreadState.DISPOSED;\r
- lock.unlock();\r
- return null;\r
- }\r
-\r
- threadStates[index] = ThreadState.SLEEP;\r
- condition.await();\r
-\r
- // We are done\r
- if(isDisposed()) {\r
- threadStates[index] = ThreadState.DISPOSED;\r
- lock.unlock();\r
- return null;\r
- }\r
-\r
- threadStates[index] = ThreadState.RUN;\r
-\r
- lock.unlock();\r
-\r
- }\r
-\r
- } catch (InterruptedException e) {\r
-\r
- e.printStackTrace();\r
-\r
- throw new RuntimeException("Querying was interrupted.", e);\r
-\r
- }\r
-\r
- }\r
-\r
- public boolean runSynchronized() {\r
-\r
- boolean didExecute = false;\r
-\r
- for(int performer=0;performer<THREADS;performer++) {\r
- if(!delayQueues[index * THREADS + performer].isEmpty()) {\r
- synchronized(executors[performer]) {\r
- threadLocks[performer].lock();\r
- queues[performer].addAll(delayQueues[index * THREADS + performer]);\r
- delayQueues[index * THREADS + performer].clear();\r
- executors[performer].notify();\r
- threadLocks[performer].unlock();\r
- }\r
- }\r
- }\r
-\r
- if(tasks.isEmpty()) {\r
- ArrayList<SessionTask> finished = newTasks(false, tasks);\r
- if(finished == null) return didExecute; \r
- }\r
-\r
- while(!tasks.isEmpty()) {\r
-\r
- SessionTask task = tasks.remove(tasks.size() - 1);\r
-\r
- if(task.syncCaller == index) {\r
- ownSyncTasks[index].add(task);\r
- } else {\r
- task.run(index);\r
- didExecute = true;\r
- }\r
-\r
- }\r
-\r
- return didExecute;\r
-\r
- }\r
-\r
- @Override\r
- public void run() {\r
-\r
- QuerySupport support = this.querySupport;\r
-\r
- try {\r
-\r
- while (true) {\r
-\r
- boolean finished = newTasks(true, tasks) == null;\r
- if(finished) {\r
- return;\r
- }\r
- \r
- while(!tasks.isEmpty()) {\r
-\r
- SessionTask task = tasks.remove(tasks.size()-1);\r
- task.run(index);\r
-\r
- }\r
-\r
- for(int performer=0;performer<THREADS;performer++) {\r
- if(!delayQueues[index * THREADS + performer].isEmpty()) {\r
- synchronized(executors[performer]) {\r
- threadLocks[performer].lock();\r
- queues[performer].addAll(delayQueues[index * THREADS + performer]);\r
- delayQueues[index * THREADS + performer].clear();\r
- executors[performer].notify();\r
- threadLocks[performer].unlock();\r
- }\r
- }\r
- }\r
-\r
- }\r
-\r
- } catch (Throwable t) {\r
-\r
- Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);\r
- support.exit(t);\r
-\r
- } finally {\r
- \r
- exited.release();\r
-\r
- }\r
-\r
- }\r
-\r
+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.simantics.db.Session;
+import org.simantics.db.common.SessionThread;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
+import org.simantics.db.impl.query.QueryProcessor.ThreadState;
+import org.slf4j.LoggerFactory;
+
+class QueryThread extends Thread implements SessionThread {
+
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class);
+
+ boolean disposed = false;
+ private Semaphore exited = new Semaphore(0);
+
+ final int index;
+
+ private Session session;
+ private QuerySupport querySupport;
+ private final QueryProcessor processor;
+ private final Semaphore requests;
+
+ final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
+// final private ArrayList<SessionTask> own;
+// final private ArrayList<SessionTask> ownSync;
+// final private ArrayList<SessionTask> queue;
+// final private ReentrantLock lock;
+// final private Condition condition;
+ final private Object querySupportLock;
+ final private int THREADS;
+ final private AtomicInteger sleepers;
+ final private ThreadState[] threadStates;
+// final private ArrayList<SessionTask>[] delayQueues;
+// final private QueryThread[] executors;
+// final private ReentrantLock[] threadLocks;
+// final private ArrayList<SessionTask>[] queues;
+// final private ArrayList<SessionTask>[] ownSyncTasks;
+
+ public QueryThread(Session session, QueryProcessor processor, int index, String name) {
+ super(QueryProcessor.QueryThreadGroup, null, name);
+ this.session = session;
+ this.processor = processor;
+ this.index = index;
+// own = processor.ownTasks[index];
+// ownSync = processor.ownSyncTasks[index];
+// queue = processor.queues[index];
+// lock = processor.threadLocks[index];
+// condition = processor.threadConditions[index];
+ querySupportLock = processor.querySupportLock;
+ THREADS = processor.THREADS;
+ sleepers = processor.sleepers;
+ querySupport = processor.querySupport;
+ threadStates = processor.threadStates;
+ requests = processor.requests;
+// delayQueues = processor.delayQueues;
+// executors = processor.executors;
+// threadLocks = processor.threadLocks;
+// queues = processor.queues;
+// ownSyncTasks = processor.ownSyncTasks;
+ }
+
+ synchronized void dispose() {
+
+// System.err.println("qt dispose");
+
+ disposed = true;
+// lock.lock();
+// condition.signalAll();
+// lock.unlock();
+
+ try {
+ exited.acquire();
+ } catch (InterruptedException e) {
+ LOGGER.error("dispose was interrupted", e);
+ }
+
+ session = null;
+ querySupport = null;
+
+// System.err.println("qt disposed");
+
+ }
+
+ boolean isDisposed() {
+
+ return disposed;
+
+ }
+
+ public Session getSession() {
+
+ return session;
+
+ }
+
+ private boolean pumpTask() {
+ return processor.scheduling.pumpTask(tasks);
+ }
+
+ ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
+
+ try {
+
+ while(true) {
+
+ // Return tasks if some were found
+ if(!tasks.isEmpty()) return tasks;
+
+ if(!doWait) return null;
+
+ synchronized (querySupportLock) {
+
+ if(pumpTask())
+ return tasks;
+
+// lock.lock();
+
+ // We are the last one awake
+ if(sleepers.incrementAndGet() == THREADS) {
+
+ // Do not indicate sleeping yet
+ sleepers.decrementAndGet();
+ // Ceased can create new own tasks
+ if(querySupport == null) System.err.println("null qs");
+ querySupport.ceased(index);
+
+ if(pumpTask()) {
+// lock.unlock();
+ return tasks;
+ }
+
+ // OK, now we are going to sleep
+ sleepers.incrementAndGet();
+
+ }
+
+ }
+
+ // Nope nothing. Sleep & wait
+ // Whoever releases this calls sleepers.decrementAndGet()
+
+ // We are done
+ if(isDisposed()) {
+ threadStates[index] = ThreadState.DISPOSED;
+// lock.unlock();
+ return null;
+ }
+
+
+ threadStates[index] = ThreadState.SLEEP;
+
+ requests.acquire();
+
+ sleepers.decrementAndGet();
+
+ // We are done
+ if(isDisposed()) {
+ threadStates[index] = ThreadState.DISPOSED;
+ //lock.unlock();
+ return null;
+ }
+
+ threadStates[index] = ThreadState.RUN;
+
+ //lock.unlock();
+
+ }
+
+ } catch (InterruptedException e) {
+
+ LOGGER.error("Query handling (newTasks) was interrupted", e);
+ throw new RuntimeException("Querying was interrupted.", e);
+
+ }
+
+ }
+
+ public boolean runSynchronized() {
+
+ boolean didExecute = false;
+
+// for(int performer=0;performer<THREADS;performer++) {
+// if(!delayQueues[index * THREADS + performer].isEmpty()) {
+// synchronized(executors[performer]) {
+// threadLocks[performer].lock();
+// queues[performer].addAll(delayQueues[index * THREADS + performer]);
+// delayQueues[index * THREADS + performer].clear();
+// executors[performer].notify();
+// threadLocks[performer].unlock();
+// }
+// }
+// }
+
+ if(tasks.isEmpty()) {
+ ArrayList<SessionTask> finished = newTasks(false, tasks);
+ if(finished == null) return didExecute;
+ }
+
+ while(!tasks.isEmpty()) {
+
+ SessionTask task = tasks.remove(tasks.size() - 1);
+
+// if(task.syncCaller == index) {
+// ownSyncTasks[index].add(task);
+// } else {
+ task.run(index);
+// System.err.println("QT(s) " + index + " runs " + task);
+ didExecute = true;
+// }
+
+ }
+
+ return didExecute;
+
+ }
+
+ @Override
+ public void run() {
+
+ QuerySupport support = this.querySupport;
+
+ try {
+
+ while (true) {
+
+ boolean finished = newTasks(true, tasks) == null;
+ if(finished) {
+ return;
+ }
+
+ while(!tasks.isEmpty()) {
+
+ SessionTask task = tasks.remove(tasks.size()-1);
+ task.run(0);
+
+ }
+
+ }
+
+ } catch (Throwable t) {
+
+ LOGGER.error("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
+ support.exit(t);
+
+ } finally {
+
+ exited.release();
+
+ }
+
+ }
+