1 package org.simantics.db.impl.query;
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5 import java.util.concurrent.TimeUnit;
6 import java.util.concurrent.atomic.AtomicInteger;
8 import org.simantics.db.Session;
9 import org.simantics.db.common.SessionThread;
10 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
11 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
12 import org.slf4j.LoggerFactory;
14 class QueryThread extends Thread implements SessionThread {
16 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class);
18 boolean disposed = false;
19 private Semaphore exited = new Semaphore(0);
23 private Session session;
24 private QuerySupport querySupport;
25 private final QueryProcessor processor;
27 final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
28 // final private ArrayList<SessionTask> own;
29 // final private ArrayList<SessionTask> ownSync;
30 // final private ArrayList<SessionTask> queue;
31 // final private ReentrantLock lock;
32 // final private Condition condition;
33 final private Object querySupportLock;
34 final private int THREADS;
35 final private AtomicInteger sleepers;
36 final private ThreadState[] threadStates;
38 private SessionTask currentTask;
39 // final private ArrayList<SessionTask>[] delayQueues;
40 // final private QueryThread[] executors;
41 // final private ReentrantLock[] threadLocks;
42 // final private ArrayList<SessionTask>[] queues;
43 // final private ArrayList<SessionTask>[] ownSyncTasks;
45 public QueryThread(Session session, QueryProcessor processor, int index, String name) {
46 super(QueryProcessor.QueryThreadGroup, null, name);
47 this.session = session;
48 this.processor = processor;
50 // own = processor.ownTasks[index];
51 // ownSync = processor.ownSyncTasks[index];
52 // queue = processor.queues[index];
53 // lock = processor.threadLocks[index];
54 // condition = processor.threadConditions[index];
55 querySupportLock = processor.querySupportLock;
56 THREADS = processor.THREADS;
57 sleepers = processor.sleepers;
58 querySupport = processor.querySupport;
59 threadStates = processor.threadStates;
60 // delayQueues = processor.delayQueues;
61 // executors = processor.executors;
62 // threadLocks = processor.threadLocks;
63 // queues = processor.queues;
64 // ownSyncTasks = processor.ownSyncTasks;
67 synchronized void dispose() {
69 // System.err.println("qt dispose");
73 // condition.signalAll();
77 // we are not willing to wait forever here..
78 boolean acquired = exited.tryAcquire(60, TimeUnit.SECONDS);
80 LOGGER.error("Could not safely dispose query thread {} - we were processing task {} and still have tasks to do {}", index, currentTask.graph != null ? String.valueOf(currentTask) : "SessionTask with null graph", tasks.size());
83 } catch (InterruptedException e) {
84 LOGGER.error("dispose was interrupted", e);
90 // System.err.println("qt disposed");
94 boolean isDisposed() {
100 public Session getSession() {
106 private boolean pumpTask() {
107 if(!processor.freeScheduling.isEmpty()) {
108 tasks.add(processor.freeScheduling.removeFirst());
114 ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
120 // Return tasks if some were found
121 if(!tasks.isEmpty()) return tasks;
123 if(!doWait) return null;
125 synchronized (querySupportLock) {
132 // We are the last one awake
133 if(sleepers.incrementAndGet() == THREADS) {
135 // Do not indicate sleeping yet
136 sleepers.decrementAndGet();
137 // Ceased can create new own tasks
138 if(querySupport == null) System.err.println("null qs");
139 querySupport.ceased(index);
146 // OK, now we are going to sleep
147 sleepers.incrementAndGet();
153 // Nope nothing. Sleep & wait
154 // Whoever releases this calls sleepers.decrementAndGet()
158 threadStates[index] = ThreadState.DISPOSED;
164 threadStates[index] = ThreadState.SLEEP;
166 synchronized (querySupportLock) {
167 querySupportLock.wait(100);
171 // boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
173 // synchronized (querySupportLock) {
174 // if(!processor.freeScheduling.isEmpty())
175 // System.err.println("some tasks are available!");
179 sleepers.decrementAndGet();
183 threadStates[index] = ThreadState.DISPOSED;
188 threadStates[index] = ThreadState.RUN;
194 } catch (InterruptedException e) {
196 LOGGER.error("Query handling (newTasks) was interrupted", e);
197 throw new RuntimeException("Querying was interrupted.", e);
203 public boolean runSynchronized() {
205 boolean didExecute = false;
207 // for(int performer=0;performer<THREADS;performer++) {
208 // if(!delayQueues[index * THREADS + performer].isEmpty()) {
209 // synchronized(executors[performer]) {
210 // threadLocks[performer].lock();
211 // queues[performer].addAll(delayQueues[index * THREADS + performer]);
212 // delayQueues[index * THREADS + performer].clear();
213 // executors[performer].notify();
214 // threadLocks[performer].unlock();
219 if(tasks.isEmpty()) {
220 ArrayList<SessionTask> finished = newTasks(false, tasks);
221 if(finished == null) return didExecute;
224 while(!tasks.isEmpty()) {
226 SessionTask t = currentTask = tasks.remove(tasks.size() - 1);
228 // if(task.syncCaller == index) {
229 // ownSyncTasks[index].add(task);
232 // System.err.println("QT(s) " + index + " runs " + task);
246 processor.thread.set(index);
248 QuerySupport support = this.querySupport;
254 boolean finished = newTasks(true, tasks) == null;
259 while(!tasks.isEmpty()) {
261 SessionTask t = currentTask = tasks.remove(tasks.size()-1);
262 // System.err.println("QT " + index + " runs " + task);
267 // for(int performer=0;performer<THREADS;performer++) {
268 // if(!delayQueues[index * THREADS + performer].isEmpty()) {
269 // synchronized(executors[performer]) {
270 // threadLocks[performer].lock();
271 // queues[performer].addAll(delayQueues[index * THREADS + performer]);
272 // delayQueues[index * THREADS + performer].clear();
273 // executors[performer].notify();
274 // threadLocks[performer].unlock();
281 } catch (Throwable t) {
283 LOGGER.error("FATAL BUG: QueryThread task processing caused unexpected exception.", t);