1 package org.simantics.db.impl.query;
\r
3 import java.util.ArrayList;
\r
4 import java.util.concurrent.Semaphore;
\r
5 import java.util.concurrent.atomic.AtomicInteger;
\r
6 import java.util.concurrent.locks.Condition;
\r
7 import java.util.concurrent.locks.ReentrantLock;
\r
9 import org.simantics.db.Session;
\r
10 import org.simantics.db.common.SessionThread;
\r
11 import org.simantics.db.common.utils.Logger;
\r
12 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
\r
13 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
\r
15 class QueryThread extends Thread implements SessionThread {
\r
17 boolean disposed = false;
\r
18 private Semaphore exited = new Semaphore(0);
\r
22 private Session session;
\r
23 private QuerySupport querySupport;
\r
25 final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
\r
26 final private ArrayList<SessionTask> own;
\r
27 final private ArrayList<SessionTask> ownSync;
\r
28 final private ArrayList<SessionTask> queue;
\r
29 final private ReentrantLock lock;
\r
30 final private Condition condition;
\r
31 final private Object querySupportLock;
\r
32 final private int THREADS;
\r
33 final private AtomicInteger sleepers;
\r
34 final private ThreadState[] threadStates;
\r
35 final private ArrayList<SessionTask>[] delayQueues;
\r
36 final private QueryThread[] executors;
\r
37 final private ReentrantLock[] threadLocks;
\r
38 final private ArrayList<SessionTask>[] queues;
\r
39 final private ArrayList<SessionTask>[] ownSyncTasks;
\r
41 public QueryThread(Session session, QueryProcessor processor, int index, String name) {
\r
42 super(QueryProcessor.QueryThreadGroup, null, name);
\r
43 this.session = session;
\r
45 own = processor.ownTasks[index];
\r
46 ownSync = processor.ownSyncTasks[index];
\r
47 queue = processor.queues[index];
\r
48 lock = processor.threadLocks[index];
\r
49 condition = processor.threadConditions[index];
\r
50 querySupportLock = processor.querySupportLock;
\r
51 THREADS = processor.THREADS;
\r
52 sleepers = processor.sleepers;
\r
53 querySupport = processor.querySupport;
\r
54 threadStates = processor.threadStates;
\r
55 delayQueues = processor.delayQueues;
\r
56 executors = processor.executors;
\r
57 threadLocks = processor.threadLocks;
\r
58 queues = processor.queues;
\r
59 ownSyncTasks = processor.ownSyncTasks;
\r
62 synchronized void dispose() {
\r
64 // System.err.println("qt dispose");
\r
68 condition.signalAll();
\r
73 } catch (InterruptedException e) {
\r
74 Logger.defaultLogError(e);
\r
78 querySupport = null;
\r
81 // System.err.println("qt disposed");
\r
85 boolean isDisposed() {
\r
91 public Session getSession() {
\r
97 ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
\r
103 // Perform own tasks first
\r
104 if(tasks.addAll(own)) {
\r
106 } else if (doWait && !ownSync.isEmpty()) {
\r
107 tasks.add(ownSync.remove(ownSync.size()-1));
\r
110 // Try some queued tasks
\r
112 if(tasks.addAll(queue)) {
\r
120 // Return tasks if some were found
\r
121 if(!tasks.isEmpty()) return tasks;
\r
122 if(!doWait) return null;
\r
124 synchronized (querySupportLock) {
\r
128 // Just maybe someone inserted tasks and notified just before synchronized block
\r
129 if(tasks.addAll(queue)) {
\r
135 // We are the last one awake
\r
136 if(sleepers.incrementAndGet() == THREADS) {
\r
138 // Do not indicate sleeping yet
\r
139 sleepers.decrementAndGet();
\r
140 // Ceased can create new own tasks
\r
141 if(querySupport == null) System.err.println("null qs");
\r
142 querySupport.ceased(index);
\r
144 if(tasks.addAll(own)) {
\r
147 // System.err.println("tasks after ceased: " + tasks.size());
\r
148 if(!tasks.isEmpty()) {
\r
153 // OK, now we are going to sleep
\r
154 sleepers.incrementAndGet();
\r
160 // Nope nothing. Sleep & wait
\r
161 // Whoever releases this calls sleepers.decrementAndGet()
\r
165 threadStates[index] = ThreadState.DISPOSED;
\r
170 threadStates[index] = ThreadState.SLEEP;
\r
175 threadStates[index] = ThreadState.DISPOSED;
\r
180 threadStates[index] = ThreadState.RUN;
\r
186 } catch (InterruptedException e) {
\r
188 e.printStackTrace();
\r
190 throw new RuntimeException("Querying was interrupted.", e);
\r
196 public boolean runSynchronized() {
\r
198 boolean didExecute = false;
\r
200 for(int performer=0;performer<THREADS;performer++) {
\r
201 if(!delayQueues[index * THREADS + performer].isEmpty()) {
\r
202 synchronized(executors[performer]) {
\r
203 threadLocks[performer].lock();
\r
204 queues[performer].addAll(delayQueues[index * THREADS + performer]);
\r
205 delayQueues[index * THREADS + performer].clear();
\r
206 executors[performer].notify();
\r
207 threadLocks[performer].unlock();
\r
212 if(tasks.isEmpty()) {
\r
213 ArrayList<SessionTask> finished = newTasks(false, tasks);
\r
214 if(finished == null) return didExecute;
\r
217 while(!tasks.isEmpty()) {
\r
219 SessionTask task = tasks.remove(tasks.size() - 1);
\r
221 if(task.syncCaller == index) {
\r
222 ownSyncTasks[index].add(task);
\r
235 public void run() {
\r
237 QuerySupport support = this.querySupport;
\r
243 boolean finished = newTasks(true, tasks) == null;
\r
248 while(!tasks.isEmpty()) {
\r
250 SessionTask task = tasks.remove(tasks.size()-1);
\r
255 for(int performer=0;performer<THREADS;performer++) {
\r
256 if(!delayQueues[index * THREADS + performer].isEmpty()) {
\r
257 synchronized(executors[performer]) {
\r
258 threadLocks[performer].lock();
\r
259 queues[performer].addAll(delayQueues[index * THREADS + performer]);
\r
260 delayQueues[index * THREADS + performer].clear();
\r
261 executors[performer].notify();
\r
262 threadLocks[performer].unlock();
\r
269 } catch (Throwable t) {
\r
271 Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
\r