]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
Still working for multiple readers
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryThread.java
1 package org.simantics.db.impl.query;
2
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5 import java.util.concurrent.atomic.AtomicInteger;
6 import java.util.concurrent.locks.Condition;
7 import java.util.concurrent.locks.ReentrantLock;
8
9 import org.simantics.db.Session;
10 import org.simantics.db.common.SessionThread;
11 import org.simantics.db.common.utils.Logger;
12 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
13 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
14
15 class QueryThread extends Thread implements SessionThread {
16
17         boolean disposed = false;
18         private Semaphore exited = new Semaphore(0);
19
20         final int index;
21
22         private Session session;
23         private QuerySupport querySupport;
24         final private QueryProcessor processor;
25
26         final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
27 //      final private ArrayList<SessionTask> own;
28 //      final private ArrayList<SessionTask> ownSync;
29 //      final private ArrayList<SessionTask> queue;
30         final private ReentrantLock lock;
31         final private Condition condition;
32         final private Object querySupportLock;
33         final private int THREADS;
34         final private AtomicInteger sleepers;
35         final private ThreadState[] threadStates;
36 //      final private ArrayList<SessionTask>[] delayQueues;
37         final private QueryThread[] executors;
38         final private ReentrantLock[] threadLocks;
39 //      final private ArrayList<SessionTask>[] queues;
40 //      final private ArrayList<SessionTask>[] ownSyncTasks;
41
42         public QueryThread(Session session, QueryProcessor processor, int index, String name) {
43                 super(QueryProcessor.QueryThreadGroup, null, name);
44                 this.session = session;
45                 this.processor = processor;
46                 this.index = index;
47 //              own = processor.ownTasks[index];
48 //              ownSync = processor.ownSyncTasks[index];
49 //              queue = processor.queues[index];
50                 lock = processor.threadLocks[index];
51                 condition = processor.threadConditions[index];
52                 querySupportLock = processor.querySupportLock;
53                 THREADS = processor.THREADS;
54                 sleepers = processor.sleepers;
55                 querySupport = processor.querySupport;
56                 threadStates = processor.threadStates;
57 //              delayQueues = processor.delayQueues;
58                 executors = processor.executors;
59                 threadLocks = processor.threadLocks;
60 //              queues = processor.queues;
61 //              ownSyncTasks = processor.ownSyncTasks;
62         }
63
64         synchronized void dispose() {
65
66 //              System.err.println("qt dispose");
67
68                 disposed = true;
69                 lock.lock();
70                 condition.signalAll();
71                 lock.unlock();
72                 
73                 try {
74                         exited.acquire();
75                 } catch (InterruptedException e) {
76                         Logger.defaultLogError(e);
77                 }
78
79                 session = null;
80                 querySupport = null;
81
82
83 //              System.err.println("qt disposed");
84
85         }
86
87         boolean isDisposed() { 
88
89                 return disposed;
90
91         }
92
93         public Session getSession() {
94
95                 return session;
96
97         }
98
99         ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
100
101                 try {
102
103                         while(true) {
104
105                                 // Perform own tasks first
106 //                              if(tasks.addAll(own)) {
107 //                                      own.clear();
108 //                              } else if (doWait && !ownSync.isEmpty()) {
109 //                                      tasks.add(ownSync.remove(ownSync.size()-1));
110 //                              }
111
112                                 // Try some queued tasks
113 //                              lock.lock();
114 //                              if(tasks.addAll(queue)) {
115 //                                      queue.clear();
116 //                                      lock.unlock();
117 //                                      return tasks;
118 //                              } else {
119 //                                      lock.unlock();
120 //                              }
121
122                                 // Return tasks if some were found
123                                 if(!tasks.isEmpty()) return tasks;
124                                 
125                                 if(!doWait) return null;
126
127                                 synchronized (querySupportLock) {
128
129 //                                      System.err.println("check free tasks for QT " + index + " (" + processor.freeScheduling + ")");
130                                         
131                                         if(!processor.freeScheduling.isEmpty()) {
132                                                 tasks.add(processor.freeScheduling.removeFirst());
133                                                 return tasks;
134                                         }
135
136                                         lock.lock();
137                                         
138                                         // Just maybe someone inserted tasks and notified just before synchronized block
139 //                                      if(tasks.addAll(queue)) {
140 //                                              queue.clear();
141 //                                              lock.unlock();
142 //                                              return tasks;
143 //                                      }
144                                         
145 //                                      System.err.println("QT " + index + ", sleepers = " + sleepers);
146
147                                         // We are the last one awake
148                                         if(sleepers.incrementAndGet() == THREADS) {
149
150                                                 // Do not indicate sleeping yet
151                                                 sleepers.decrementAndGet();
152                                                 // Ceased can create new own tasks
153                                                 if(querySupport == null) System.err.println("null qs");
154                                                 querySupport.ceased(index);
155
156 //                                              if(tasks.addAll(own)) {
157 //                                                      own.clear();
158 //                                              }
159                                                 //                                      System.err.println("tasks after ceased: " + tasks.size());
160                                                 if(!tasks.isEmpty()) {
161                                                         lock.unlock();
162                                                         return tasks;
163                                                 }
164
165                                                 // OK, now we are going to sleep
166                                                 sleepers.incrementAndGet();
167
168                                         }
169
170                                 }
171
172                                 // Nope nothing. Sleep & wait
173                                 // Whoever releases this calls sleepers.decrementAndGet()
174
175                                 // We are done
176                                 if(isDisposed()) {
177                                         threadStates[index] = ThreadState.DISPOSED;
178                                         lock.unlock();
179                                         return null;
180                                 }
181
182                                 threadStates[index] = ThreadState.SLEEP;
183                                 condition.await();
184
185                                 sleepers.decrementAndGet();
186
187                                 // We are done
188                                 if(isDisposed()) {
189                                         threadStates[index] = ThreadState.DISPOSED;
190                                         lock.unlock();
191                                         return null;
192                                 }
193
194                                 threadStates[index] = ThreadState.RUN;
195
196                                 lock.unlock();
197
198                         }
199
200                 } catch (InterruptedException e) {
201
202                         e.printStackTrace();
203
204                         throw new RuntimeException("Querying was interrupted.", e);
205
206                 }
207
208         }
209
210         public boolean runSynchronized() {
211
212                 boolean didExecute = false;
213
214 //              for(int performer=0;performer<THREADS;performer++) {
215 //                      if(!delayQueues[index * THREADS + performer].isEmpty()) {
216 //                              synchronized(executors[performer]) {
217 //                                      threadLocks[performer].lock();
218 //                                      queues[performer].addAll(delayQueues[index * THREADS + performer]);
219 //                                      delayQueues[index * THREADS + performer].clear();
220 //                                      executors[performer].notify();
221 //                                      threadLocks[performer].unlock();
222 //                              }
223 //                      }
224 //              }
225
226                 if(tasks.isEmpty()) {
227                         ArrayList<SessionTask> finished = newTasks(false, tasks);
228                         if(finished == null) return didExecute; 
229                 }
230
231                 while(!tasks.isEmpty()) {
232
233                         SessionTask task = tasks.remove(tasks.size() - 1);
234
235 //                      if(task.syncCaller == index) {
236 //                              ownSyncTasks[index].add(task);
237 //                      } else {
238                                 task.run(index);
239 //                              System.err.println("QT(s) " + index + " runs " + task);
240                                 didExecute = true;
241 //                      }
242
243                 }
244
245                 return didExecute;
246
247         }
248
249         @Override
250         public void run() {
251
252                 QuerySupport support = this.querySupport;
253
254                 try {
255
256                         while (true) {
257
258                                 boolean finished = newTasks(true, tasks) == null;
259                                 if(finished) {
260                                         return;
261                                 }
262                                 
263                                 while(!tasks.isEmpty()) {
264
265                                         SessionTask task = tasks.remove(tasks.size()-1);
266 //                                      System.err.println("QT " + index + " runs " + task);
267                                         task.run(index);
268
269                                 }
270
271 //                              for(int performer=0;performer<THREADS;performer++) {
272 //                                      if(!delayQueues[index * THREADS + performer].isEmpty()) {
273 //                                              synchronized(executors[performer]) {
274 //                                                      threadLocks[performer].lock();
275 //                                                      queues[performer].addAll(delayQueues[index * THREADS + performer]);
276 //                                                      delayQueues[index * THREADS + performer].clear();
277 //                                                      executors[performer].notify();
278 //                                                      threadLocks[performer].unlock();
279 //                                              }
280 //                                      }
281 //                              }
282
283                         }
284
285                 } catch (Throwable t) {
286
287                         Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
288                         support.exit(t);
289
290                 } finally {
291                         
292                         exited.release();
293
294                 }
295
296         }
297
298 }