]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
f7c56a213d83ba92844fae162b002d713fe8931d
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryThread.java
1 package org.simantics.db.impl.query;
2
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5 import java.util.concurrent.TimeUnit;
6 import java.util.concurrent.atomic.AtomicInteger;
7 import java.util.concurrent.locks.Condition;
8 import java.util.concurrent.locks.ReentrantLock;
9
10 import org.simantics.db.Session;
11 import org.simantics.db.common.SessionThread;
12 import org.simantics.db.common.utils.Logger;
13 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
14 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
15
16 class QueryThread extends Thread implements SessionThread {
17
18         boolean disposed = false;
19         private Semaphore exited = new Semaphore(0);
20
21         final int index;
22
23         private Session session;
24         private QuerySupport querySupport;
25         final private QueryProcessor processor;
26
27         final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
28 //      final private ArrayList<SessionTask> own;
29 //      final private ArrayList<SessionTask> ownSync;
30 //      final private ArrayList<SessionTask> queue;
31 //      final private ReentrantLock lock;
32 //      final private Condition condition;
33         final private Object querySupportLock;
34         final private int THREADS;
35         final private AtomicInteger sleepers;
36         final private ThreadState[] threadStates;
37 //      final private ArrayList<SessionTask>[] delayQueues;
38 //      final private QueryThread[] executors;
39 //      final private ReentrantLock[] threadLocks;
40 //      final private ArrayList<SessionTask>[] queues;
41 //      final private ArrayList<SessionTask>[] ownSyncTasks;
42
43         public QueryThread(Session session, QueryProcessor processor, int index, String name) {
44                 super(QueryProcessor.QueryThreadGroup, null, name);
45                 this.session = session;
46                 this.processor = processor;
47                 this.index = index;
48 //              own = processor.ownTasks[index];
49 //              ownSync = processor.ownSyncTasks[index];
50 //              queue = processor.queues[index];
51 //              lock = processor.threadLocks[index];
52 //              condition = processor.threadConditions[index];
53                 querySupportLock = processor.querySupportLock;
54                 THREADS = processor.THREADS;
55                 sleepers = processor.sleepers;
56                 querySupport = processor.querySupport;
57                 threadStates = processor.threadStates;
58 //              delayQueues = processor.delayQueues;
59 //              executors = processor.executors;
60 //              threadLocks = processor.threadLocks;
61 //              queues = processor.queues;
62 //              ownSyncTasks = processor.ownSyncTasks;
63         }
64
65         synchronized void dispose() {
66
67 //              System.err.println("qt dispose");
68
69                 disposed = true;
70 //              lock.lock();
71 //              condition.signalAll();
72 //              lock.unlock();
73                 
74                 try {
75                         exited.acquire();
76                 } catch (InterruptedException e) {
77                         Logger.defaultLogError(e);
78                 }
79
80                 session = null;
81                 querySupport = null;
82
83
84 //              System.err.println("qt disposed");
85
86         }
87
88         boolean isDisposed() { 
89
90                 return disposed;
91
92         }
93
94         public Session getSession() {
95
96                 return session;
97
98         }
99
100         private boolean pumpTask() {
101                 if(!processor.freeScheduling.isEmpty()) {
102                         tasks.add(processor.freeScheduling.removeFirst());
103                         return true;
104                 }
105                 return false;
106         }
107         
108         ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
109
110                 try {
111
112                         while(true) {
113
114                                 // Return tasks if some were found
115                                 if(!tasks.isEmpty()) return tasks;
116                                 
117                                 if(!doWait) return null;
118
119                                 synchronized (querySupportLock) {
120
121                                         if(pumpTask())
122                                                 return tasks;
123
124 //                                      lock.lock();
125
126                                         // We are the last one awake
127                                         if(sleepers.incrementAndGet() == THREADS) {
128
129                                                 // Do not indicate sleeping yet
130                                                 sleepers.decrementAndGet();
131                                                 // Ceased can create new own tasks
132                                                 if(querySupport == null) System.err.println("null qs");
133                                                 querySupport.ceased(index);
134
135                                                 if(pumpTask()) {
136 //                                                      lock.unlock();
137                                                         return tasks;
138                                                 }
139
140                                                 // OK, now we are going to sleep
141                                                 sleepers.incrementAndGet();
142
143                                         }
144
145                                 }
146
147                                 // Nope nothing. Sleep & wait
148                                 // Whoever releases this calls sleepers.decrementAndGet()
149
150                                 // We are done
151                                 if(isDisposed()) {
152                                         threadStates[index] = ThreadState.DISPOSED;
153 //                                      lock.unlock();
154                                         return null;
155                                 }
156
157                                 
158                                 threadStates[index] = ThreadState.SLEEP;
159                                 
160                                 synchronized (querySupportLock) {
161                                         querySupportLock.wait(100);
162                                         
163                                 }
164                                 
165 //                              boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
166 //                              if(!woken) {
167 //                                      synchronized (querySupportLock) {
168 //                                              if(!processor.freeScheduling.isEmpty())
169 //                                                      System.err.println("some tasks are available!");
170 //                                      }
171 //                              }
172
173                                 sleepers.decrementAndGet();
174
175                                 // We are done
176                                 if(isDisposed()) {
177                                         threadStates[index] = ThreadState.DISPOSED;
178                                         //lock.unlock();
179                                         return null;
180                                 }
181
182                                 threadStates[index] = ThreadState.RUN;
183
184                                 //lock.unlock();
185
186                         }
187
188                 } catch (InterruptedException e) {
189
190                         e.printStackTrace();
191
192                         throw new RuntimeException("Querying was interrupted.", e);
193
194                 }
195
196         }
197
198         public boolean runSynchronized() {
199
200                 boolean didExecute = false;
201
202 //              for(int performer=0;performer<THREADS;performer++) {
203 //                      if(!delayQueues[index * THREADS + performer].isEmpty()) {
204 //                              synchronized(executors[performer]) {
205 //                                      threadLocks[performer].lock();
206 //                                      queues[performer].addAll(delayQueues[index * THREADS + performer]);
207 //                                      delayQueues[index * THREADS + performer].clear();
208 //                                      executors[performer].notify();
209 //                                      threadLocks[performer].unlock();
210 //                              }
211 //                      }
212 //              }
213
214                 if(tasks.isEmpty()) {
215                         ArrayList<SessionTask> finished = newTasks(false, tasks);
216                         if(finished == null) return didExecute; 
217                 }
218
219                 while(!tasks.isEmpty()) {
220
221                         SessionTask task = tasks.remove(tasks.size() - 1);
222
223 //                      if(task.syncCaller == index) {
224 //                              ownSyncTasks[index].add(task);
225 //                      } else {
226                                 task.run(index);
227 //                              System.err.println("QT(s) " + index + " runs " + task);
228                                 didExecute = true;
229 //                      }
230
231                 }
232
233                 return didExecute;
234
235         }
236
237         @Override
238         public void run() {
239
240                 processor.thread.set(index);
241                 
242                 QuerySupport support = this.querySupport;
243
244                 try {
245
246                         while (true) {
247
248                                 boolean finished = newTasks(true, tasks) == null;
249                                 if(finished) {
250                                         return;
251                                 }
252                                 
253                                 while(!tasks.isEmpty()) {
254
255                                         SessionTask task = tasks.remove(tasks.size()-1);
256 //                                      System.err.println("QT " + index + " runs " + task);
257                                         task.run(index);
258
259                                 }
260
261 //                              for(int performer=0;performer<THREADS;performer++) {
262 //                                      if(!delayQueues[index * THREADS + performer].isEmpty()) {
263 //                                              synchronized(executors[performer]) {
264 //                                                      threadLocks[performer].lock();
265 //                                                      queues[performer].addAll(delayQueues[index * THREADS + performer]);
266 //                                                      delayQueues[index * THREADS + performer].clear();
267 //                                                      executors[performer].notify();
268 //                                                      threadLocks[performer].unlock();
269 //                                              }
270 //                                      }
271 //                              }
272
273                         }
274
275                 } catch (Throwable t) {
276
277                         Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
278                         support.exit(t);
279
280                 } finally {
281                         
282                         exited.release();
283
284                 }
285
286         }
287
288 }