1 package org.simantics.db.impl.query;
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5 import java.util.concurrent.TimeUnit;
6 import java.util.concurrent.atomic.AtomicInteger;
7 import java.util.concurrent.locks.Condition;
8 import java.util.concurrent.locks.ReentrantLock;
10 import org.simantics.db.Session;
11 import org.simantics.db.common.SessionThread;
12 import org.simantics.db.common.utils.Logger;
13 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
14 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
16 class QueryThread extends Thread implements SessionThread {
18 boolean disposed = false;
19 private Semaphore exited = new Semaphore(0);
23 private Session session;
24 private QuerySupport querySupport;
25 final private QueryProcessor processor;
27 final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
28 // final private ArrayList<SessionTask> own;
29 // final private ArrayList<SessionTask> ownSync;
30 // final private ArrayList<SessionTask> queue;
31 // final private ReentrantLock lock;
32 // final private Condition condition;
33 final private Object querySupportLock;
34 final private int THREADS;
35 final private AtomicInteger sleepers;
36 final private ThreadState[] threadStates;
37 // final private ArrayList<SessionTask>[] delayQueues;
38 // final private QueryThread[] executors;
39 // final private ReentrantLock[] threadLocks;
40 // final private ArrayList<SessionTask>[] queues;
41 // final private ArrayList<SessionTask>[] ownSyncTasks;
43 public QueryThread(Session session, QueryProcessor processor, int index, String name) {
44 super(QueryProcessor.QueryThreadGroup, null, name);
45 this.session = session;
46 this.processor = processor;
48 // own = processor.ownTasks[index];
49 // ownSync = processor.ownSyncTasks[index];
50 // queue = processor.queues[index];
51 // lock = processor.threadLocks[index];
52 // condition = processor.threadConditions[index];
53 querySupportLock = processor.querySupportLock;
54 THREADS = processor.THREADS;
55 sleepers = processor.sleepers;
56 querySupport = processor.querySupport;
57 threadStates = processor.threadStates;
58 // delayQueues = processor.delayQueues;
59 // executors = processor.executors;
60 // threadLocks = processor.threadLocks;
61 // queues = processor.queues;
62 // ownSyncTasks = processor.ownSyncTasks;
65 synchronized void dispose() {
67 // System.err.println("qt dispose");
71 // condition.signalAll();
76 } catch (InterruptedException e) {
77 Logger.defaultLogError(e);
84 // System.err.println("qt disposed");
88 boolean isDisposed() {
94 public Session getSession() {
100 private boolean pumpTask() {
101 if(!processor.freeScheduling.isEmpty()) {
102 tasks.add(processor.freeScheduling.removeFirst());
108 ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
114 // Return tasks if some were found
115 if(!tasks.isEmpty()) return tasks;
117 if(!doWait) return null;
119 synchronized (querySupportLock) {
126 // We are the last one awake
127 if(sleepers.incrementAndGet() == THREADS) {
129 // Do not indicate sleeping yet
130 sleepers.decrementAndGet();
131 // Ceased can create new own tasks
132 if(querySupport == null) System.err.println("null qs");
133 querySupport.ceased(index);
140 // OK, now we are going to sleep
141 sleepers.incrementAndGet();
147 // Nope nothing. Sleep & wait
148 // Whoever releases this calls sleepers.decrementAndGet()
152 threadStates[index] = ThreadState.DISPOSED;
158 threadStates[index] = ThreadState.SLEEP;
160 synchronized (querySupportLock) {
161 querySupportLock.wait(100);
165 // boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
167 // synchronized (querySupportLock) {
168 // if(!processor.freeScheduling.isEmpty())
169 // System.err.println("some tasks are available!");
173 sleepers.decrementAndGet();
177 threadStates[index] = ThreadState.DISPOSED;
182 threadStates[index] = ThreadState.RUN;
188 } catch (InterruptedException e) {
192 throw new RuntimeException("Querying was interrupted.", e);
198 public boolean runSynchronized() {
200 boolean didExecute = false;
202 // for(int performer=0;performer<THREADS;performer++) {
203 // if(!delayQueues[index * THREADS + performer].isEmpty()) {
204 // synchronized(executors[performer]) {
205 // threadLocks[performer].lock();
206 // queues[performer].addAll(delayQueues[index * THREADS + performer]);
207 // delayQueues[index * THREADS + performer].clear();
208 // executors[performer].notify();
209 // threadLocks[performer].unlock();
214 if(tasks.isEmpty()) {
215 ArrayList<SessionTask> finished = newTasks(false, tasks);
216 if(finished == null) return didExecute;
219 while(!tasks.isEmpty()) {
221 SessionTask task = tasks.remove(tasks.size() - 1);
223 // if(task.syncCaller == index) {
224 // ownSyncTasks[index].add(task);
227 // System.err.println("QT(s) " + index + " runs " + task);
240 processor.thread.set(index);
242 QuerySupport support = this.querySupport;
248 boolean finished = newTasks(true, tasks) == null;
253 while(!tasks.isEmpty()) {
255 SessionTask task = tasks.remove(tasks.size()-1);
256 // System.err.println("QT " + index + " runs " + task);
261 // for(int performer=0;performer<THREADS;performer++) {
262 // if(!delayQueues[index * THREADS + performer].isEmpty()) {
263 // synchronized(executors[performer]) {
264 // threadLocks[performer].lock();
265 // queues[performer].addAll(delayQueues[index * THREADS + performer]);
266 // delayQueues[index * THREADS + performer].clear();
267 // executors[performer].notify();
268 // threadLocks[performer].unlock();
275 } catch (Throwable t) {
277 Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);