1 package org.simantics.db.impl.query;
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5 import java.util.concurrent.atomic.AtomicInteger;
6 import java.util.concurrent.locks.Condition;
7 import java.util.concurrent.locks.ReentrantLock;
9 import org.simantics.db.Session;
10 import org.simantics.db.common.SessionThread;
11 import org.simantics.db.common.utils.Logger;
12 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
13 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
15 class QueryThread extends Thread implements SessionThread {
17 boolean disposed = false;
18 private Semaphore exited = new Semaphore(0);
22 private Session session;
23 private QuerySupport querySupport;
24 final private QueryProcessor processor;
26 final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
27 // final private ArrayList<SessionTask> own;
28 // final private ArrayList<SessionTask> ownSync;
29 // final private ArrayList<SessionTask> queue;
30 final private ReentrantLock lock;
31 final private Condition condition;
32 final private Object querySupportLock;
33 final private int THREADS;
34 final private AtomicInteger sleepers;
35 final private ThreadState[] threadStates;
36 // final private ArrayList<SessionTask>[] delayQueues;
37 final private QueryThread[] executors;
38 final private ReentrantLock[] threadLocks;
39 // final private ArrayList<SessionTask>[] queues;
40 // final private ArrayList<SessionTask>[] ownSyncTasks;
42 public QueryThread(Session session, QueryProcessor processor, int index, String name) {
43 super(QueryProcessor.QueryThreadGroup, null, name);
44 this.session = session;
45 this.processor = processor;
47 // own = processor.ownTasks[index];
48 // ownSync = processor.ownSyncTasks[index];
49 // queue = processor.queues[index];
50 lock = processor.threadLocks[index];
51 condition = processor.threadConditions[index];
52 querySupportLock = processor.querySupportLock;
53 THREADS = processor.THREADS;
54 sleepers = processor.sleepers;
55 querySupport = processor.querySupport;
56 threadStates = processor.threadStates;
57 // delayQueues = processor.delayQueues;
58 executors = processor.executors;
59 threadLocks = processor.threadLocks;
60 // queues = processor.queues;
61 // ownSyncTasks = processor.ownSyncTasks;
64 synchronized void dispose() {
66 // System.err.println("qt dispose");
70 condition.signalAll();
75 } catch (InterruptedException e) {
76 Logger.defaultLogError(e);
83 // System.err.println("qt disposed");
87 boolean isDisposed() {
93 public Session getSession() {
99 ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
105 // Perform own tasks first
106 // if(tasks.addAll(own)) {
108 // } else if (doWait && !ownSync.isEmpty()) {
109 // tasks.add(ownSync.remove(ownSync.size()-1));
112 // Try some queued tasks
114 // if(tasks.addAll(queue)) {
122 // Return tasks if some were found
123 if(!tasks.isEmpty()) return tasks;
125 if(!doWait) return null;
127 synchronized (querySupportLock) {
129 // System.err.println("check free tasks for QT " + index + " (" + processor.freeScheduling + ")");
131 if(!processor.freeScheduling.isEmpty()) {
132 tasks.add(processor.freeScheduling.removeFirst());
138 // Just maybe someone inserted tasks and notified just before synchronized block
139 // if(tasks.addAll(queue)) {
145 // System.err.println("QT " + index + ", sleepers = " + sleepers);
147 // We are the last one awake
148 if(sleepers.incrementAndGet() == THREADS) {
150 // Do not indicate sleeping yet
151 sleepers.decrementAndGet();
152 // Ceased can create new own tasks
153 if(querySupport == null) System.err.println("null qs");
154 querySupport.ceased(index);
156 // if(tasks.addAll(own)) {
159 // System.err.println("tasks after ceased: " + tasks.size());
160 if(!tasks.isEmpty()) {
165 // OK, now we are going to sleep
166 sleepers.incrementAndGet();
172 // Nope nothing. Sleep & wait
173 // Whoever releases this calls sleepers.decrementAndGet()
177 threadStates[index] = ThreadState.DISPOSED;
182 threadStates[index] = ThreadState.SLEEP;
185 sleepers.decrementAndGet();
189 threadStates[index] = ThreadState.DISPOSED;
194 threadStates[index] = ThreadState.RUN;
200 } catch (InterruptedException e) {
204 throw new RuntimeException("Querying was interrupted.", e);
210 public boolean runSynchronized() {
212 boolean didExecute = false;
214 // for(int performer=0;performer<THREADS;performer++) {
215 // if(!delayQueues[index * THREADS + performer].isEmpty()) {
216 // synchronized(executors[performer]) {
217 // threadLocks[performer].lock();
218 // queues[performer].addAll(delayQueues[index * THREADS + performer]);
219 // delayQueues[index * THREADS + performer].clear();
220 // executors[performer].notify();
221 // threadLocks[performer].unlock();
226 if(tasks.isEmpty()) {
227 ArrayList<SessionTask> finished = newTasks(false, tasks);
228 if(finished == null) return didExecute;
231 while(!tasks.isEmpty()) {
233 SessionTask task = tasks.remove(tasks.size() - 1);
235 // if(task.syncCaller == index) {
236 // ownSyncTasks[index].add(task);
239 // System.err.println("QT(s) " + index + " runs " + task);
252 QuerySupport support = this.querySupport;
258 boolean finished = newTasks(true, tasks) == null;
263 while(!tasks.isEmpty()) {
265 SessionTask task = tasks.remove(tasks.size()-1);
266 // System.err.println("QT " + index + " runs " + task);
271 // for(int performer=0;performer<THREADS;performer++) {
272 // if(!delayQueues[index * THREADS + performer].isEmpty()) {
273 // synchronized(executors[performer]) {
274 // threadLocks[performer].lock();
275 // queues[performer].addAll(delayQueues[index * THREADS + performer]);
276 // delayQueues[index * THREADS + performer].clear();
277 // executors[performer].notify();
278 // threadLocks[performer].unlock();
285 } catch (Throwable t) {
287 Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);