]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryThread.java
1 package org.simantics.db.impl.query;\r
2 \r
3 import java.util.ArrayList;\r
4 import java.util.concurrent.Semaphore;\r
5 import java.util.concurrent.atomic.AtomicInteger;\r
6 import java.util.concurrent.locks.Condition;\r
7 import java.util.concurrent.locks.ReentrantLock;\r
8 \r
9 import org.simantics.db.Session;\r
10 import org.simantics.db.common.SessionThread;\r
11 import org.simantics.db.common.utils.Logger;\r
12 import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
13 import org.simantics.db.impl.query.QueryProcessor.ThreadState;\r
14 \r
15 class QueryThread extends Thread implements SessionThread {\r
16 \r
17         boolean disposed = false;\r
18         private Semaphore exited = new Semaphore(0);\r
19 \r
20         final int index;\r
21 \r
22         private Session session;\r
23         private QuerySupport querySupport;\r
24 \r
25         final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();\r
26         final private ArrayList<SessionTask> own;\r
27         final private ArrayList<SessionTask> ownSync;\r
28         final private ArrayList<SessionTask> queue;\r
29         final private ReentrantLock lock;\r
30         final private Condition condition;\r
31         final private Object querySupportLock;\r
32         final private int THREADS;\r
33         final private AtomicInteger sleepers;\r
34         final private ThreadState[] threadStates;\r
35         final private ArrayList<SessionTask>[] delayQueues;\r
36         final private QueryThread[] executors;\r
37         final private ReentrantLock[] threadLocks;\r
38         final private ArrayList<SessionTask>[] queues;\r
39         final private ArrayList<SessionTask>[] ownSyncTasks;\r
40 \r
41         public QueryThread(Session session, QueryProcessor processor, int index, String name) {\r
42                 super(QueryProcessor.QueryThreadGroup, null, name);\r
43                 this.session = session;\r
44                 this.index = index;\r
45                 own = processor.ownTasks[index];\r
46                 ownSync = processor.ownSyncTasks[index];\r
47                 queue = processor.queues[index];\r
48                 lock = processor.threadLocks[index];\r
49                 condition = processor.threadConditions[index];\r
50                 querySupportLock = processor.querySupportLock;\r
51                 THREADS = processor.THREADS;\r
52                 sleepers = processor.sleepers;\r
53                 querySupport = processor.querySupport;\r
54                 threadStates = processor.threadStates;\r
55                 delayQueues = processor.delayQueues;\r
56                 executors = processor.executors;\r
57                 threadLocks = processor.threadLocks;\r
58                 queues = processor.queues;\r
59                 ownSyncTasks = processor.ownSyncTasks;\r
60         }\r
61 \r
62         synchronized void dispose() {\r
63 \r
64 //              System.err.println("qt dispose");\r
65 \r
66                 disposed = true;\r
67                 lock.lock();\r
68                 condition.signalAll();\r
69                 lock.unlock();\r
70                 \r
71                 try {\r
72                         exited.acquire();\r
73                 } catch (InterruptedException e) {\r
74                         Logger.defaultLogError(e);\r
75                 }\r
76 \r
77                 session = null;\r
78                 querySupport = null;\r
79 \r
80 \r
81 //              System.err.println("qt disposed");\r
82 \r
83         }\r
84 \r
85         boolean isDisposed() { \r
86 \r
87                 return disposed;\r
88 \r
89         }\r
90 \r
91         public Session getSession() {\r
92 \r
93                 return session;\r
94 \r
95         }\r
96 \r
97         ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {\r
98 \r
99                 try {\r
100 \r
101                         while(true) {\r
102 \r
103                                 // Perform own tasks first\r
104                                 if(tasks.addAll(own)) {\r
105                                         own.clear();\r
106                                 } else if (doWait && !ownSync.isEmpty()) {\r
107                                         tasks.add(ownSync.remove(ownSync.size()-1));\r
108                                 }\r
109 \r
110                                 // Try some queued tasks\r
111                                 lock.lock();\r
112                                 if(tasks.addAll(queue)) {\r
113                                         queue.clear();\r
114                                         lock.unlock();\r
115                                         return tasks;\r
116                                 } else {\r
117                                         lock.unlock();\r
118                                 }\r
119 \r
120                                 // Return tasks if some were found\r
121                                 if(!tasks.isEmpty()) return tasks;\r
122                                 if(!doWait) return null;\r
123 \r
124                                 synchronized (querySupportLock) {\r
125 \r
126                                         lock.lock();\r
127                                         \r
128                                         // Just maybe someone inserted tasks and notified just before synchronized block\r
129                                         if(tasks.addAll(queue)) {\r
130                                                 queue.clear();\r
131                                                 lock.unlock();\r
132                                                 return tasks;\r
133                                         }\r
134 \r
135                                         // We are the last one awake\r
136                                         if(sleepers.incrementAndGet() == THREADS) {\r
137 \r
138                                                 // Do not indicate sleeping yet\r
139                                                 sleepers.decrementAndGet();\r
140                                                 // Ceased can create new own tasks\r
141                                                 if(querySupport == null) System.err.println("null qs");\r
142                                                 querySupport.ceased(index);\r
143 \r
144                                                 if(tasks.addAll(own)) {\r
145                                                         own.clear();\r
146                                                 }\r
147                                                 //                                      System.err.println("tasks after ceased: " + tasks.size());\r
148                                                 if(!tasks.isEmpty()) {\r
149                                                         lock.unlock();\r
150                                                         return tasks;\r
151                                                 }\r
152 \r
153                                                 // OK, now we are going to sleep\r
154                                                 sleepers.incrementAndGet();\r
155 \r
156                                         }\r
157 \r
158                                 }\r
159 \r
160                                 // Nope nothing. Sleep & wait\r
161                                 // Whoever releases this calls sleepers.decrementAndGet()\r
162 \r
163                                 // We are done\r
164                                 if(isDisposed()) {\r
165                                         threadStates[index] = ThreadState.DISPOSED;\r
166                                         lock.unlock();\r
167                                         return null;\r
168                                 }\r
169 \r
170                                 threadStates[index] = ThreadState.SLEEP;\r
171                                 condition.await();\r
172 \r
173                                 // We are done\r
174                                 if(isDisposed()) {\r
175                                         threadStates[index] = ThreadState.DISPOSED;\r
176                                         lock.unlock();\r
177                                         return null;\r
178                                 }\r
179 \r
180                                 threadStates[index] = ThreadState.RUN;\r
181 \r
182                                 lock.unlock();\r
183 \r
184                         }\r
185 \r
186                 } catch (InterruptedException e) {\r
187 \r
188                         e.printStackTrace();\r
189 \r
190                         throw new RuntimeException("Querying was interrupted.", e);\r
191 \r
192                 }\r
193 \r
194         }\r
195 \r
196         public boolean runSynchronized() {\r
197 \r
198                 boolean didExecute = false;\r
199 \r
200                 for(int performer=0;performer<THREADS;performer++) {\r
201                         if(!delayQueues[index * THREADS + performer].isEmpty()) {\r
202                                 synchronized(executors[performer]) {\r
203                                         threadLocks[performer].lock();\r
204                                         queues[performer].addAll(delayQueues[index * THREADS + performer]);\r
205                                         delayQueues[index * THREADS + performer].clear();\r
206                                         executors[performer].notify();\r
207                                         threadLocks[performer].unlock();\r
208                                 }\r
209                         }\r
210                 }\r
211 \r
212                 if(tasks.isEmpty()) {\r
213                         ArrayList<SessionTask> finished = newTasks(false, tasks);\r
214                         if(finished == null) return didExecute; \r
215                 }\r
216 \r
217                 while(!tasks.isEmpty()) {\r
218 \r
219                         SessionTask task = tasks.remove(tasks.size() - 1);\r
220 \r
221                         if(task.syncCaller == index) {\r
222                                 ownSyncTasks[index].add(task);\r
223                         } else {\r
224                                 task.run(index);\r
225                                 didExecute = true;\r
226                         }\r
227 \r
228                 }\r
229 \r
230                 return didExecute;\r
231 \r
232         }\r
233 \r
234         @Override\r
235         public void run() {\r
236 \r
237                 QuerySupport support = this.querySupport;\r
238 \r
239                 try {\r
240 \r
241                         while (true) {\r
242 \r
243                                 boolean finished = newTasks(true, tasks) == null;\r
244                                 if(finished) {\r
245                                         return;\r
246                                 }\r
247                                 \r
248                                 while(!tasks.isEmpty()) {\r
249 \r
250                                         SessionTask task = tasks.remove(tasks.size()-1);\r
251                                         task.run(index);\r
252 \r
253                                 }\r
254 \r
255                                 for(int performer=0;performer<THREADS;performer++) {\r
256                                         if(!delayQueues[index * THREADS + performer].isEmpty()) {\r
257                                                 synchronized(executors[performer]) {\r
258                                                         threadLocks[performer].lock();\r
259                                                         queues[performer].addAll(delayQueues[index * THREADS + performer]);\r
260                                                         delayQueues[index * THREADS + performer].clear();\r
261                                                         executors[performer].notify();\r
262                                                         threadLocks[performer].unlock();\r
263                                                 }\r
264                                         }\r
265                                 }\r
266 \r
267                         }\r
268 \r
269                 } catch (Throwable t) {\r
270 \r
271                         Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);\r
272                         support.exit(t);\r
273 \r
274                 } finally {\r
275                         \r
276                         exited.release();\r
277 \r
278                 }\r
279 \r
280         }\r
281 \r
282 }