]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
Do not wait forever for querythreads to dispose
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryThread.java
1 package org.simantics.db.impl.query;
2
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5 import java.util.concurrent.TimeUnit;
6 import java.util.concurrent.atomic.AtomicInteger;
7
8 import org.simantics.db.Session;
9 import org.simantics.db.common.SessionThread;
10 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
11 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
12 import org.slf4j.LoggerFactory;
13
14 class QueryThread extends Thread implements SessionThread {
15
16         private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class);
17
18         boolean disposed = false;
19         private Semaphore exited = new Semaphore(0);
20
21         final int index;
22
23         private Session session;
24         private QuerySupport querySupport;
25         private final QueryProcessor processor;
26
27         final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
28 //      final private ArrayList<SessionTask> own;
29 //      final private ArrayList<SessionTask> ownSync;
30 //      final private ArrayList<SessionTask> queue;
31 //      final private ReentrantLock lock;
32 //      final private Condition condition;
33         final private Object querySupportLock;
34         final private int THREADS;
35         final private AtomicInteger sleepers;
36         final private ThreadState[] threadStates;
37
38         private SessionTask currentTask;
39 //      final private ArrayList<SessionTask>[] delayQueues;
40 //      final private QueryThread[] executors;
41 //      final private ReentrantLock[] threadLocks;
42 //      final private ArrayList<SessionTask>[] queues;
43 //      final private ArrayList<SessionTask>[] ownSyncTasks;
44
45         public QueryThread(Session session, QueryProcessor processor, int index, String name) {
46                 super(QueryProcessor.QueryThreadGroup, null, name);
47                 this.session = session;
48                 this.processor = processor;
49                 this.index = index;
50 //              own = processor.ownTasks[index];
51 //              ownSync = processor.ownSyncTasks[index];
52 //              queue = processor.queues[index];
53 //              lock = processor.threadLocks[index];
54 //              condition = processor.threadConditions[index];
55                 querySupportLock = processor.querySupportLock;
56                 THREADS = processor.THREADS;
57                 sleepers = processor.sleepers;
58                 querySupport = processor.querySupport;
59                 threadStates = processor.threadStates;
60 //              delayQueues = processor.delayQueues;
61 //              executors = processor.executors;
62 //              threadLocks = processor.threadLocks;
63 //              queues = processor.queues;
64 //              ownSyncTasks = processor.ownSyncTasks;
65         }
66
67         synchronized void dispose() {
68
69 //              System.err.println("qt dispose");
70
71                 disposed = true;
72 //              lock.lock();
73 //              condition.signalAll();
74 //              lock.unlock();
75                 
76                 try {
77                         // we are not willing to wait forever here..
78                         boolean acquired = exited.tryAcquire(60, TimeUnit.SECONDS);
79                         if (!acquired) {
80                                 LOGGER.error("Could not safely dispose query thread {} - we were processing task {} and still have tasks to do {}", index, currentTask.graph != null ? String.valueOf(currentTask) : "SessionTask with null graph", tasks.size());
81                                 interrupt();
82                         }
83                 } catch (InterruptedException e) {
84                         LOGGER.error("dispose was interrupted", e);
85                 }
86
87                 session = null;
88                 querySupport = null;
89
90 //              System.err.println("qt disposed");
91
92         }
93
94         boolean isDisposed() { 
95
96                 return disposed;
97
98         }
99
100         public Session getSession() {
101
102                 return session;
103
104         }
105
106         private boolean pumpTask() {
107                 if(!processor.freeScheduling.isEmpty()) {
108                         tasks.add(processor.freeScheduling.removeFirst());
109                         return true;
110                 }
111                 return false;
112         }
113         
114         ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
115
116                 try {
117
118                         while(true) {
119
120                                 // Return tasks if some were found
121                                 if(!tasks.isEmpty()) return tasks;
122                                 
123                                 if(!doWait) return null;
124
125                                 synchronized (querySupportLock) {
126
127                                         if(pumpTask())
128                                                 return tasks;
129
130 //                                      lock.lock();
131
132                                         // We are the last one awake
133                                         if(sleepers.incrementAndGet() == THREADS) {
134
135                                                 // Do not indicate sleeping yet
136                                                 sleepers.decrementAndGet();
137                                                 // Ceased can create new own tasks
138                                                 if(querySupport == null) System.err.println("null qs");
139                                                 querySupport.ceased(index);
140
141                                                 if(pumpTask()) {
142 //                                                      lock.unlock();
143                                                         return tasks;
144                                                 }
145
146                                                 // OK, now we are going to sleep
147                                                 sleepers.incrementAndGet();
148
149                                         }
150
151                                 }
152
153                                 // Nope nothing. Sleep & wait
154                                 // Whoever releases this calls sleepers.decrementAndGet()
155
156                                 // We are done
157                                 if(isDisposed()) {
158                                         threadStates[index] = ThreadState.DISPOSED;
159 //                                      lock.unlock();
160                                         return null;
161                                 }
162
163                                 
164                                 threadStates[index] = ThreadState.SLEEP;
165                                 
166                                 synchronized (querySupportLock) {
167                                         querySupportLock.wait(100);
168                                         
169                                 }
170                                 
171 //                              boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
172 //                              if(!woken) {
173 //                                      synchronized (querySupportLock) {
174 //                                              if(!processor.freeScheduling.isEmpty())
175 //                                                      System.err.println("some tasks are available!");
176 //                                      }
177 //                              }
178
179                                 sleepers.decrementAndGet();
180
181                                 // We are done
182                                 if(isDisposed()) {
183                                         threadStates[index] = ThreadState.DISPOSED;
184                                         //lock.unlock();
185                                         return null;
186                                 }
187
188                                 threadStates[index] = ThreadState.RUN;
189
190                                 //lock.unlock();
191
192                         }
193
194                 } catch (InterruptedException e) {
195
196                         LOGGER.error("Query handling (newTasks) was interrupted", e);
197                         throw new RuntimeException("Querying was interrupted.", e);
198
199                 }
200
201         }
202
203         public boolean runSynchronized() {
204
205                 boolean didExecute = false;
206
207 //              for(int performer=0;performer<THREADS;performer++) {
208 //                      if(!delayQueues[index * THREADS + performer].isEmpty()) {
209 //                              synchronized(executors[performer]) {
210 //                                      threadLocks[performer].lock();
211 //                                      queues[performer].addAll(delayQueues[index * THREADS + performer]);
212 //                                      delayQueues[index * THREADS + performer].clear();
213 //                                      executors[performer].notify();
214 //                                      threadLocks[performer].unlock();
215 //                              }
216 //                      }
217 //              }
218
219                 if(tasks.isEmpty()) {
220                         ArrayList<SessionTask> finished = newTasks(false, tasks);
221                         if(finished == null) return didExecute; 
222                 }
223
224                 while(!tasks.isEmpty()) {
225
226                         SessionTask t = currentTask = tasks.remove(tasks.size() - 1);
227
228 //                      if(task.syncCaller == index) {
229 //                              ownSyncTasks[index].add(task);
230 //                      } else {
231                                 t.run(index);
232 //                              System.err.println("QT(s) " + index + " runs " + task);
233                                 didExecute = true;
234                                 currentTask = null;
235 //                      }
236
237                 }
238
239                 return didExecute;
240
241         }
242
243         @Override
244         public void run() {
245
246                 processor.thread.set(index);
247                 
248                 QuerySupport support = this.querySupport;
249
250                 try {
251
252                         while (true) {
253
254                                 boolean finished = newTasks(true, tasks) == null;
255                                 if(finished) {
256                                         return;
257                                 }
258                                 
259                                 while(!tasks.isEmpty()) {
260
261                                         SessionTask t = currentTask = tasks.remove(tasks.size()-1);
262 //                                      System.err.println("QT " + index + " runs " + task);
263                                         t.run(index);
264                                         currentTask = null;
265                                 }
266
267 //                              for(int performer=0;performer<THREADS;performer++) {
268 //                                      if(!delayQueues[index * THREADS + performer].isEmpty()) {
269 //                                              synchronized(executors[performer]) {
270 //                                                      threadLocks[performer].lock();
271 //                                                      queues[performer].addAll(delayQueues[index * THREADS + performer]);
272 //                                                      delayQueues[index * THREADS + performer].clear();
273 //                                                      executors[performer].notify();
274 //                                                      threadLocks[performer].unlock();
275 //                                              }
276 //                                      }
277 //                              }
278
279                         }
280
281                 } catch (Throwable t) {
282
283                         LOGGER.error("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
284                         support.exit(t);
285
286                 } finally {
287                         
288                         exited.release();
289
290                 }
291
292         }
293
294 }