1 package org.simantics.db.impl.query;
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5 import java.util.concurrent.atomic.AtomicInteger;
6 import java.util.concurrent.locks.Condition;
7 import java.util.concurrent.locks.ReentrantLock;
9 import org.simantics.db.Session;
10 import org.simantics.db.common.SessionThread;
11 import org.simantics.db.common.utils.Logger;
12 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
13 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
15 class QueryThread extends Thread implements SessionThread {
17 boolean disposed = false;
18 private Semaphore exited = new Semaphore(0);
22 private Session session;
23 private QuerySupport querySupport;
25 final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
26 final private ArrayList<SessionTask> own;
27 final private ArrayList<SessionTask> ownSync;
28 final private ArrayList<SessionTask> queue;
29 final private ReentrantLock lock;
30 final private Condition condition;
31 final private Object querySupportLock;
32 final private int THREADS;
33 final private AtomicInteger sleepers;
34 final private ThreadState[] threadStates;
35 final private ArrayList<SessionTask>[] delayQueues;
36 final private QueryThread[] executors;
37 final private ReentrantLock[] threadLocks;
38 final private ArrayList<SessionTask>[] queues;
39 final private ArrayList<SessionTask>[] ownSyncTasks;
41 public QueryThread(Session session, QueryProcessor processor, int index, String name) {
42 super(QueryProcessor.QueryThreadGroup, null, name);
43 this.session = session;
45 own = processor.ownTasks[index];
46 ownSync = processor.ownSyncTasks[index];
47 queue = processor.queues[index];
48 lock = processor.threadLocks[index];
49 condition = processor.threadConditions[index];
50 querySupportLock = processor.querySupportLock;
51 THREADS = processor.THREADS;
52 sleepers = processor.sleepers;
53 querySupport = processor.querySupport;
54 threadStates = processor.threadStates;
55 delayQueues = processor.delayQueues;
56 executors = processor.executors;
57 threadLocks = processor.threadLocks;
58 queues = processor.queues;
59 ownSyncTasks = processor.ownSyncTasks;
62 synchronized void dispose() {
64 // System.err.println("qt dispose");
68 condition.signalAll();
73 } catch (InterruptedException e) {
74 Logger.defaultLogError(e);
81 // System.err.println("qt disposed");
85 boolean isDisposed() {
91 public Session getSession() {
97 ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
103 // Perform own tasks first
104 if(tasks.addAll(own)) {
106 } else if (doWait && !ownSync.isEmpty()) {
107 tasks.add(ownSync.remove(ownSync.size()-1));
110 // Try some queued tasks
112 if(tasks.addAll(queue)) {
120 // Return tasks if some were found
121 if(!tasks.isEmpty()) return tasks;
122 if(!doWait) return null;
124 synchronized (querySupportLock) {
128 // Just maybe someone inserted tasks and notified just before synchronized block
129 if(tasks.addAll(queue)) {
135 // We are the last one awake
136 if(sleepers.incrementAndGet() == THREADS) {
138 // Do not indicate sleeping yet
139 sleepers.decrementAndGet();
140 // Ceased can create new own tasks
141 if(querySupport == null) System.err.println("null qs");
142 querySupport.ceased(index);
144 if(tasks.addAll(own)) {
147 // System.err.println("tasks after ceased: " + tasks.size());
148 if(!tasks.isEmpty()) {
153 // OK, now we are going to sleep
154 sleepers.incrementAndGet();
160 // Nope nothing. Sleep & wait
161 // Whoever releases this calls sleepers.decrementAndGet()
165 threadStates[index] = ThreadState.DISPOSED;
170 threadStates[index] = ThreadState.SLEEP;
175 threadStates[index] = ThreadState.DISPOSED;
180 threadStates[index] = ThreadState.RUN;
186 } catch (InterruptedException e) {
190 throw new RuntimeException("Querying was interrupted.", e);
196 public boolean runSynchronized() {
198 boolean didExecute = false;
200 for(int performer=0;performer<THREADS;performer++) {
201 if(!delayQueues[index * THREADS + performer].isEmpty()) {
202 synchronized(executors[performer]) {
203 threadLocks[performer].lock();
204 queues[performer].addAll(delayQueues[index * THREADS + performer]);
205 delayQueues[index * THREADS + performer].clear();
206 executors[performer].notify();
207 threadLocks[performer].unlock();
212 if(tasks.isEmpty()) {
213 ArrayList<SessionTask> finished = newTasks(false, tasks);
214 if(finished == null) return didExecute;
217 while(!tasks.isEmpty()) {
219 SessionTask task = tasks.remove(tasks.size() - 1);
221 if(task.syncCaller == index) {
222 ownSyncTasks[index].add(task);
237 QuerySupport support = this.querySupport;
243 boolean finished = newTasks(true, tasks) == null;
248 while(!tasks.isEmpty()) {
250 SessionTask task = tasks.remove(tasks.size()-1);
255 for(int performer=0;performer<THREADS;performer++) {
256 if(!delayQueues[index * THREADS + performer].isEmpty()) {
257 synchronized(executors[performer]) {
258 threadLocks[performer].lock();
259 queues[performer].addAll(delayQueues[index * THREADS + performer]);
260 delayQueues[index * THREADS + performer].clear();
261 executors[performer].notify();
262 threadLocks[performer].unlock();
269 } catch (Throwable t) {
271 Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);