1 package org.simantics.db.impl.query;
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5 import java.util.concurrent.atomic.AtomicInteger;
7 import org.simantics.db.Session;
8 import org.simantics.db.common.SessionThread;
9 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
10 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
11 import org.slf4j.LoggerFactory;
13 class QueryThread extends Thread implements SessionThread {
15 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class);
17 boolean disposed = false;
18 private Semaphore exited = new Semaphore(0);
22 private Session session;
23 private QuerySupport querySupport;
24 private final QueryProcessor processor;
25 private final Semaphore requests;
27 final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
28 // final private ArrayList<SessionTask> own;
29 // final private ArrayList<SessionTask> ownSync;
30 // final private ArrayList<SessionTask> queue;
31 // final private ReentrantLock lock;
32 // final private Condition condition;
33 final private Object querySupportLock;
34 final private int THREADS;
35 final private AtomicInteger sleepers;
36 final private ThreadState[] threadStates;
37 // final private ArrayList<SessionTask>[] delayQueues;
38 // final private QueryThread[] executors;
39 // final private ReentrantLock[] threadLocks;
40 // final private ArrayList<SessionTask>[] queues;
41 // final private ArrayList<SessionTask>[] ownSyncTasks;
43 public QueryThread(Session session, QueryProcessor processor, int index, String name) {
44 super(QueryProcessor.QueryThreadGroup, null, name);
45 this.session = session;
46 this.processor = processor;
48 // own = processor.ownTasks[index];
49 // ownSync = processor.ownSyncTasks[index];
50 // queue = processor.queues[index];
51 // lock = processor.threadLocks[index];
52 // condition = processor.threadConditions[index];
53 querySupportLock = processor.querySupportLock;
54 THREADS = processor.THREADS;
55 sleepers = processor.sleepers;
56 querySupport = processor.querySupport;
57 threadStates = processor.threadStates;
58 requests = processor.requests;
59 // delayQueues = processor.delayQueues;
60 // executors = processor.executors;
61 // threadLocks = processor.threadLocks;
62 // queues = processor.queues;
63 // ownSyncTasks = processor.ownSyncTasks;
66 synchronized void dispose() {
68 // System.err.println("qt dispose");
72 // condition.signalAll();
77 } catch (InterruptedException e) {
78 LOGGER.error("dispose was interrupted", e);
84 // System.err.println("qt disposed");
88 boolean isDisposed() {
94 public Session getSession() {
100 private boolean pumpTask() {
101 if(!processor.freeScheduling.isEmpty()) {
102 tasks.add(processor.freeScheduling.removeFirst());
103 processor.queueLength.decrementAndGet();
109 ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
115 // Return tasks if some were found
116 if(!tasks.isEmpty()) return tasks;
118 if(!doWait) return null;
120 synchronized (querySupportLock) {
127 // We are the last one awake
128 if(sleepers.incrementAndGet() == THREADS) {
130 // Do not indicate sleeping yet
131 sleepers.decrementAndGet();
132 // Ceased can create new own tasks
133 if(querySupport == null) System.err.println("null qs");
134 querySupport.ceased(index);
141 // OK, now we are going to sleep
142 sleepers.incrementAndGet();
148 // Nope nothing. Sleep & wait
149 // Whoever releases this calls sleepers.decrementAndGet()
153 threadStates[index] = ThreadState.DISPOSED;
159 threadStates[index] = ThreadState.SLEEP;
163 sleepers.decrementAndGet();
167 threadStates[index] = ThreadState.DISPOSED;
172 threadStates[index] = ThreadState.RUN;
178 } catch (InterruptedException e) {
180 LOGGER.error("Query handling (newTasks) was interrupted", e);
181 throw new RuntimeException("Querying was interrupted.", e);
187 public boolean runSynchronized() {
189 boolean didExecute = false;
191 // for(int performer=0;performer<THREADS;performer++) {
192 // if(!delayQueues[index * THREADS + performer].isEmpty()) {
193 // synchronized(executors[performer]) {
194 // threadLocks[performer].lock();
195 // queues[performer].addAll(delayQueues[index * THREADS + performer]);
196 // delayQueues[index * THREADS + performer].clear();
197 // executors[performer].notify();
198 // threadLocks[performer].unlock();
203 if(tasks.isEmpty()) {
204 ArrayList<SessionTask> finished = newTasks(false, tasks);
205 if(finished == null) return didExecute;
208 while(!tasks.isEmpty()) {
210 SessionTask task = tasks.remove(tasks.size() - 1);
212 // if(task.syncCaller == index) {
213 // ownSyncTasks[index].add(task);
216 // System.err.println("QT(s) " + index + " runs " + task);
229 processor.thread.set(index);
231 QuerySupport support = this.querySupport;
237 boolean finished = newTasks(true, tasks) == null;
242 while(!tasks.isEmpty()) {
244 SessionTask task = tasks.remove(tasks.size()-1);
245 // System.err.println("QT " + index + " runs " + task);
250 // for(int performer=0;performer<THREADS;performer++) {
251 // if(!delayQueues[index * THREADS + performer].isEmpty()) {
252 // synchronized(executors[performer]) {
253 // threadLocks[performer].lock();
254 // queues[performer].addAll(delayQueues[index * THREADS + performer]);
255 // delayQueues[index * THREADS + performer].clear();
256 // executors[performer].notify();
257 // threadLocks[performer].unlock();
264 } catch (Throwable t) {
266 LOGGER.error("FATAL BUG: QueryThread task processing caused unexpected exception.", t);