]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
Fix livelock situation in QueryProcessor
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryThread.java
1 package org.simantics.db.impl.query;
2
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5 import java.util.concurrent.atomic.AtomicInteger;
6
7 import org.simantics.db.Session;
8 import org.simantics.db.common.SessionThread;
9 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
10 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
11 import org.slf4j.LoggerFactory;
12
13 class QueryThread extends Thread implements SessionThread {
14
15         private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class);
16
17         boolean disposed = false;
18         private Semaphore exited = new Semaphore(0);
19
20         final int index;
21
22         private Session session;
23         private QuerySupport querySupport;
24         private final QueryProcessor processor;
25         private final Semaphore requests;
26
27         final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
28 //      final private ArrayList<SessionTask> own;
29 //      final private ArrayList<SessionTask> ownSync;
30 //      final private ArrayList<SessionTask> queue;
31 //      final private ReentrantLock lock;
32 //      final private Condition condition;
33         final private Object querySupportLock;
34         final private int THREADS;
35         final private AtomicInteger sleepers;
36         final private ThreadState[] threadStates;
37 //      final private ArrayList<SessionTask>[] delayQueues;
38 //      final private QueryThread[] executors;
39 //      final private ReentrantLock[] threadLocks;
40 //      final private ArrayList<SessionTask>[] queues;
41 //      final private ArrayList<SessionTask>[] ownSyncTasks;
42
43         public QueryThread(Session session, QueryProcessor processor, int index, String name) {
44                 super(QueryProcessor.QueryThreadGroup, null, name);
45                 this.session = session;
46                 this.processor = processor;
47                 this.index = index;
48 //              own = processor.ownTasks[index];
49 //              ownSync = processor.ownSyncTasks[index];
50 //              queue = processor.queues[index];
51 //              lock = processor.threadLocks[index];
52 //              condition = processor.threadConditions[index];
53                 querySupportLock = processor.querySupportLock;
54                 THREADS = processor.THREADS;
55                 sleepers = processor.sleepers;
56                 querySupport = processor.querySupport;
57                 threadStates = processor.threadStates;
58                 requests = processor.requests;
59 //              delayQueues = processor.delayQueues;
60 //              executors = processor.executors;
61 //              threadLocks = processor.threadLocks;
62 //              queues = processor.queues;
63 //              ownSyncTasks = processor.ownSyncTasks;
64         }
65
66         synchronized void dispose() {
67
68 //              System.err.println("qt dispose");
69
70                 disposed = true;
71 //              lock.lock();
72 //              condition.signalAll();
73 //              lock.unlock();
74                 
75                 try {
76                         exited.acquire();
77                 } catch (InterruptedException e) {
78                         LOGGER.error("dispose was interrupted", e);
79                 }
80
81                 session = null;
82                 querySupport = null;
83
84 //              System.err.println("qt disposed");
85
86         }
87
88         boolean isDisposed() { 
89
90                 return disposed;
91
92         }
93
94         public Session getSession() {
95
96                 return session;
97
98         }
99
100         private boolean pumpTask() {
101                 if(!processor.freeScheduling.isEmpty()) {
102                         tasks.add(processor.freeScheduling.removeFirst());
103                         processor.queueLength.decrementAndGet();
104                         return true;
105                 }
106                 return false;
107         }
108         
109         ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
110
111                 try {
112
113                         while(true) {
114
115                                 // Return tasks if some were found
116                                 if(!tasks.isEmpty()) return tasks;
117                                 
118                                 if(!doWait) return null;
119
120                                 synchronized (querySupportLock) {
121
122                                         if(pumpTask())
123                                                 return tasks;
124
125 //                                      lock.lock();
126
127                                         // We are the last one awake
128                                         if(sleepers.incrementAndGet() == THREADS) {
129
130                                                 // Do not indicate sleeping yet
131                                                 sleepers.decrementAndGet();
132                                                 // Ceased can create new own tasks
133                                                 if(querySupport == null) System.err.println("null qs");
134                                                 querySupport.ceased(index);
135
136                                                 if(pumpTask()) {
137 //                                                      lock.unlock();
138                                                         return tasks;
139                                                 }
140
141                                                 // OK, now we are going to sleep
142                                                 sleepers.incrementAndGet();
143
144                                         }
145
146                                 }
147
148                                 // Nope nothing. Sleep & wait
149                                 // Whoever releases this calls sleepers.decrementAndGet()
150
151                                 // We are done
152                                 if(isDisposed()) {
153                                         threadStates[index] = ThreadState.DISPOSED;
154 //                                      lock.unlock();
155                                         return null;
156                                 }
157
158                                 
159                                 threadStates[index] = ThreadState.SLEEP;
160                                 
161                                 requests.acquire();
162
163                                 sleepers.decrementAndGet();
164
165                                 // We are done
166                                 if(isDisposed()) {
167                                         threadStates[index] = ThreadState.DISPOSED;
168                                         //lock.unlock();
169                                         return null;
170                                 }
171
172                                 threadStates[index] = ThreadState.RUN;
173
174                                 //lock.unlock();
175
176                         }
177
178                 } catch (InterruptedException e) {
179
180                         LOGGER.error("Query handling (newTasks) was interrupted", e);
181                         throw new RuntimeException("Querying was interrupted.", e);
182
183                 }
184
185         }
186
187         public boolean runSynchronized() {
188
189                 boolean didExecute = false;
190
191 //              for(int performer=0;performer<THREADS;performer++) {
192 //                      if(!delayQueues[index * THREADS + performer].isEmpty()) {
193 //                              synchronized(executors[performer]) {
194 //                                      threadLocks[performer].lock();
195 //                                      queues[performer].addAll(delayQueues[index * THREADS + performer]);
196 //                                      delayQueues[index * THREADS + performer].clear();
197 //                                      executors[performer].notify();
198 //                                      threadLocks[performer].unlock();
199 //                              }
200 //                      }
201 //              }
202
203                 if(tasks.isEmpty()) {
204                         ArrayList<SessionTask> finished = newTasks(false, tasks);
205                         if(finished == null) return didExecute; 
206                 }
207
208                 while(!tasks.isEmpty()) {
209
210                         SessionTask task = tasks.remove(tasks.size() - 1);
211
212 //                      if(task.syncCaller == index) {
213 //                              ownSyncTasks[index].add(task);
214 //                      } else {
215                                 task.run(index);
216 //                              System.err.println("QT(s) " + index + " runs " + task);
217                                 didExecute = true;
218 //                      }
219
220                 }
221
222                 return didExecute;
223
224         }
225
226         @Override
227         public void run() {
228
229                 processor.thread.set(index);
230                 
231                 QuerySupport support = this.querySupport;
232
233                 try {
234
235                         while (true) {
236
237                                 boolean finished = newTasks(true, tasks) == null;
238                                 if(finished) {
239                                         return;
240                                 }
241                                 
242                                 while(!tasks.isEmpty()) {
243
244                                         SessionTask task = tasks.remove(tasks.size()-1);
245 //                                      System.err.println("QT " + index + " runs " + task);
246                                         task.run(index);
247
248                                 }
249
250 //                              for(int performer=0;performer<THREADS;performer++) {
251 //                                      if(!delayQueues[index * THREADS + performer].isEmpty()) {
252 //                                              synchronized(executors[performer]) {
253 //                                                      threadLocks[performer].lock();
254 //                                                      queues[performer].addAll(delayQueues[index * THREADS + performer]);
255 //                                                      delayQueues[index * THREADS + performer].clear();
256 //                                                      executors[performer].notify();
257 //                                                      threadLocks[performer].unlock();
258 //                                              }
259 //                                      }
260 //                              }
261
262                         }
263
264                 } catch (Throwable t) {
265
266                         LOGGER.error("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
267                         support.exit(t);
268
269                 } finally {
270                         
271                         exited.release();
272
273                 }
274
275         }
276
277 }