]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
Fixed all line endings of the repository
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryThread.java
1 package org.simantics.db.impl.query;
2
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5 import java.util.concurrent.atomic.AtomicInteger;
6 import java.util.concurrent.locks.Condition;
7 import java.util.concurrent.locks.ReentrantLock;
8
9 import org.simantics.db.Session;
10 import org.simantics.db.common.SessionThread;
11 import org.simantics.db.common.utils.Logger;
12 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
13 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
14
15 class QueryThread extends Thread implements SessionThread {
16
17         boolean disposed = false;
18         private Semaphore exited = new Semaphore(0);
19
20         final int index;
21
22         private Session session;
23         private QuerySupport querySupport;
24
25         final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
26         final private ArrayList<SessionTask> own;
27         final private ArrayList<SessionTask> ownSync;
28         final private ArrayList<SessionTask> queue;
29         final private ReentrantLock lock;
30         final private Condition condition;
31         final private Object querySupportLock;
32         final private int THREADS;
33         final private AtomicInteger sleepers;
34         final private ThreadState[] threadStates;
35         final private ArrayList<SessionTask>[] delayQueues;
36         final private QueryThread[] executors;
37         final private ReentrantLock[] threadLocks;
38         final private ArrayList<SessionTask>[] queues;
39         final private ArrayList<SessionTask>[] ownSyncTasks;
40
41         public QueryThread(Session session, QueryProcessor processor, int index, String name) {
42                 super(QueryProcessor.QueryThreadGroup, null, name);
43                 this.session = session;
44                 this.index = index;
45                 own = processor.ownTasks[index];
46                 ownSync = processor.ownSyncTasks[index];
47                 queue = processor.queues[index];
48                 lock = processor.threadLocks[index];
49                 condition = processor.threadConditions[index];
50                 querySupportLock = processor.querySupportLock;
51                 THREADS = processor.THREADS;
52                 sleepers = processor.sleepers;
53                 querySupport = processor.querySupport;
54                 threadStates = processor.threadStates;
55                 delayQueues = processor.delayQueues;
56                 executors = processor.executors;
57                 threadLocks = processor.threadLocks;
58                 queues = processor.queues;
59                 ownSyncTasks = processor.ownSyncTasks;
60         }
61
62         synchronized void dispose() {
63
64 //              System.err.println("qt dispose");
65
66                 disposed = true;
67                 lock.lock();
68                 condition.signalAll();
69                 lock.unlock();
70                 
71                 try {
72                         exited.acquire();
73                 } catch (InterruptedException e) {
74                         Logger.defaultLogError(e);
75                 }
76
77                 session = null;
78                 querySupport = null;
79
80
81 //              System.err.println("qt disposed");
82
83         }
84
85         boolean isDisposed() { 
86
87                 return disposed;
88
89         }
90
91         public Session getSession() {
92
93                 return session;
94
95         }
96
97         ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
98
99                 try {
100
101                         while(true) {
102
103                                 // Perform own tasks first
104                                 if(tasks.addAll(own)) {
105                                         own.clear();
106                                 } else if (doWait && !ownSync.isEmpty()) {
107                                         tasks.add(ownSync.remove(ownSync.size()-1));
108                                 }
109
110                                 // Try some queued tasks
111                                 lock.lock();
112                                 if(tasks.addAll(queue)) {
113                                         queue.clear();
114                                         lock.unlock();
115                                         return tasks;
116                                 } else {
117                                         lock.unlock();
118                                 }
119
120                                 // Return tasks if some were found
121                                 if(!tasks.isEmpty()) return tasks;
122                                 if(!doWait) return null;
123
124                                 synchronized (querySupportLock) {
125
126                                         lock.lock();
127                                         
128                                         // Just maybe someone inserted tasks and notified just before synchronized block
129                                         if(tasks.addAll(queue)) {
130                                                 queue.clear();
131                                                 lock.unlock();
132                                                 return tasks;
133                                         }
134
135                                         // We are the last one awake
136                                         if(sleepers.incrementAndGet() == THREADS) {
137
138                                                 // Do not indicate sleeping yet
139                                                 sleepers.decrementAndGet();
140                                                 // Ceased can create new own tasks
141                                                 if(querySupport == null) System.err.println("null qs");
142                                                 querySupport.ceased(index);
143
144                                                 if(tasks.addAll(own)) {
145                                                         own.clear();
146                                                 }
147                                                 //                                      System.err.println("tasks after ceased: " + tasks.size());
148                                                 if(!tasks.isEmpty()) {
149                                                         lock.unlock();
150                                                         return tasks;
151                                                 }
152
153                                                 // OK, now we are going to sleep
154                                                 sleepers.incrementAndGet();
155
156                                         }
157
158                                 }
159
160                                 // Nope nothing. Sleep & wait
161                                 // Whoever releases this calls sleepers.decrementAndGet()
162
163                                 // We are done
164                                 if(isDisposed()) {
165                                         threadStates[index] = ThreadState.DISPOSED;
166                                         lock.unlock();
167                                         return null;
168                                 }
169
170                                 threadStates[index] = ThreadState.SLEEP;
171                                 condition.await();
172
173                                 // We are done
174                                 if(isDisposed()) {
175                                         threadStates[index] = ThreadState.DISPOSED;
176                                         lock.unlock();
177                                         return null;
178                                 }
179
180                                 threadStates[index] = ThreadState.RUN;
181
182                                 lock.unlock();
183
184                         }
185
186                 } catch (InterruptedException e) {
187
188                         e.printStackTrace();
189
190                         throw new RuntimeException("Querying was interrupted.", e);
191
192                 }
193
194         }
195
196         public boolean runSynchronized() {
197
198                 boolean didExecute = false;
199
200                 for(int performer=0;performer<THREADS;performer++) {
201                         if(!delayQueues[index * THREADS + performer].isEmpty()) {
202                                 synchronized(executors[performer]) {
203                                         threadLocks[performer].lock();
204                                         queues[performer].addAll(delayQueues[index * THREADS + performer]);
205                                         delayQueues[index * THREADS + performer].clear();
206                                         executors[performer].notify();
207                                         threadLocks[performer].unlock();
208                                 }
209                         }
210                 }
211
212                 if(tasks.isEmpty()) {
213                         ArrayList<SessionTask> finished = newTasks(false, tasks);
214                         if(finished == null) return didExecute; 
215                 }
216
217                 while(!tasks.isEmpty()) {
218
219                         SessionTask task = tasks.remove(tasks.size() - 1);
220
221                         if(task.syncCaller == index) {
222                                 ownSyncTasks[index].add(task);
223                         } else {
224                                 task.run(index);
225                                 didExecute = true;
226                         }
227
228                 }
229
230                 return didExecute;
231
232         }
233
234         @Override
235         public void run() {
236
237                 QuerySupport support = this.querySupport;
238
239                 try {
240
241                         while (true) {
242
243                                 boolean finished = newTasks(true, tasks) == null;
244                                 if(finished) {
245                                         return;
246                                 }
247                                 
248                                 while(!tasks.isEmpty()) {
249
250                                         SessionTask task = tasks.remove(tasks.size()-1);
251                                         task.run(index);
252
253                                 }
254
255                                 for(int performer=0;performer<THREADS;performer++) {
256                                         if(!delayQueues[index * THREADS + performer].isEmpty()) {
257                                                 synchronized(executors[performer]) {
258                                                         threadLocks[performer].lock();
259                                                         queues[performer].addAll(delayQueues[index * THREADS + performer]);
260                                                         delayQueues[index * THREADS + performer].clear();
261                                                         executors[performer].notify();
262                                                         threadLocks[performer].unlock();
263                                                 }
264                                         }
265                                 }
266
267                         }
268
269                 } catch (Throwable t) {
270
271                         Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
272                         support.exit(t);
273
274                 } finally {
275                         
276                         exited.release();
277
278                 }
279
280         }
281
282 }