]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
Multiple reader thread support for db client
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryThread.java
1 package org.simantics.db.impl.query;
2
3 import java.util.ArrayList;
4 import java.util.concurrent.Semaphore;
5 import java.util.concurrent.atomic.AtomicInteger;
6
7 import org.simantics.db.Session;
8 import org.simantics.db.common.SessionThread;
9 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
10 import org.simantics.db.impl.query.QueryProcessor.ThreadState;
11 import org.slf4j.LoggerFactory;
12
13 class QueryThread extends Thread implements SessionThread {
14
15         private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class);
16
17         boolean disposed = false;
18         private Semaphore exited = new Semaphore(0);
19
20         final int index;
21
22         private Session session;
23         private QuerySupport querySupport;
24         private final QueryProcessor processor;
25
26         final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
27 //      final private ArrayList<SessionTask> own;
28 //      final private ArrayList<SessionTask> ownSync;
29 //      final private ArrayList<SessionTask> queue;
30 //      final private ReentrantLock lock;
31 //      final private Condition condition;
32         final private Object querySupportLock;
33         final private int THREADS;
34         final private AtomicInteger sleepers;
35         final private ThreadState[] threadStates;
36 //      final private ArrayList<SessionTask>[] delayQueues;
37 //      final private QueryThread[] executors;
38 //      final private ReentrantLock[] threadLocks;
39 //      final private ArrayList<SessionTask>[] queues;
40 //      final private ArrayList<SessionTask>[] ownSyncTasks;
41
42         public QueryThread(Session session, QueryProcessor processor, int index, String name) {
43                 super(QueryProcessor.QueryThreadGroup, null, name);
44                 this.session = session;
45                 this.processor = processor;
46                 this.index = index;
47 //              own = processor.ownTasks[index];
48 //              ownSync = processor.ownSyncTasks[index];
49 //              queue = processor.queues[index];
50 //              lock = processor.threadLocks[index];
51 //              condition = processor.threadConditions[index];
52                 querySupportLock = processor.querySupportLock;
53                 THREADS = processor.THREADS;
54                 sleepers = processor.sleepers;
55                 querySupport = processor.querySupport;
56                 threadStates = processor.threadStates;
57 //              delayQueues = processor.delayQueues;
58 //              executors = processor.executors;
59 //              threadLocks = processor.threadLocks;
60 //              queues = processor.queues;
61 //              ownSyncTasks = processor.ownSyncTasks;
62         }
63
64         synchronized void dispose() {
65
66 //              System.err.println("qt dispose");
67
68                 disposed = true;
69 //              lock.lock();
70 //              condition.signalAll();
71 //              lock.unlock();
72                 
73                 try {
74                         exited.acquire();
75                 } catch (InterruptedException e) {
76                         LOGGER.error("dispose was interrupted", e);
77                 }
78
79                 session = null;
80                 querySupport = null;
81
82 //              System.err.println("qt disposed");
83
84         }
85
86         boolean isDisposed() { 
87
88                 return disposed;
89
90         }
91
92         public Session getSession() {
93
94                 return session;
95
96         }
97
98         private boolean pumpTask() {
99                 if(!processor.freeScheduling.isEmpty()) {
100                         tasks.add(processor.freeScheduling.removeFirst());
101                         return true;
102                 }
103                 return false;
104         }
105         
106         ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
107
108                 try {
109
110                         while(true) {
111
112                                 // Return tasks if some were found
113                                 if(!tasks.isEmpty()) return tasks;
114                                 
115                                 if(!doWait) return null;
116
117                                 synchronized (querySupportLock) {
118
119                                         if(pumpTask())
120                                                 return tasks;
121
122 //                                      lock.lock();
123
124                                         // We are the last one awake
125                                         if(sleepers.incrementAndGet() == THREADS) {
126
127                                                 // Do not indicate sleeping yet
128                                                 sleepers.decrementAndGet();
129                                                 // Ceased can create new own tasks
130                                                 if(querySupport == null) System.err.println("null qs");
131                                                 querySupport.ceased(index);
132
133                                                 if(pumpTask()) {
134 //                                                      lock.unlock();
135                                                         return tasks;
136                                                 }
137
138                                                 // OK, now we are going to sleep
139                                                 sleepers.incrementAndGet();
140
141                                         }
142
143                                 }
144
145                                 // Nope nothing. Sleep & wait
146                                 // Whoever releases this calls sleepers.decrementAndGet()
147
148                                 // We are done
149                                 if(isDisposed()) {
150                                         threadStates[index] = ThreadState.DISPOSED;
151 //                                      lock.unlock();
152                                         return null;
153                                 }
154
155                                 
156                                 threadStates[index] = ThreadState.SLEEP;
157                                 
158                                 synchronized (querySupportLock) {
159                                         querySupportLock.wait(100);
160                                         
161                                 }
162                                 
163 //                              boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
164 //                              if(!woken) {
165 //                                      synchronized (querySupportLock) {
166 //                                              if(!processor.freeScheduling.isEmpty())
167 //                                                      System.err.println("some tasks are available!");
168 //                                      }
169 //                              }
170
171                                 sleepers.decrementAndGet();
172
173                                 // We are done
174                                 if(isDisposed()) {
175                                         threadStates[index] = ThreadState.DISPOSED;
176                                         //lock.unlock();
177                                         return null;
178                                 }
179
180                                 threadStates[index] = ThreadState.RUN;
181
182                                 //lock.unlock();
183
184                         }
185
186                 } catch (InterruptedException e) {
187
188                         LOGGER.error("Query handling (newTasks) was interrupted", e);
189                         throw new RuntimeException("Querying was interrupted.", e);
190
191                 }
192
193         }
194
195         public boolean runSynchronized() {
196
197                 boolean didExecute = false;
198
199 //              for(int performer=0;performer<THREADS;performer++) {
200 //                      if(!delayQueues[index * THREADS + performer].isEmpty()) {
201 //                              synchronized(executors[performer]) {
202 //                                      threadLocks[performer].lock();
203 //                                      queues[performer].addAll(delayQueues[index * THREADS + performer]);
204 //                                      delayQueues[index * THREADS + performer].clear();
205 //                                      executors[performer].notify();
206 //                                      threadLocks[performer].unlock();
207 //                              }
208 //                      }
209 //              }
210
211                 if(tasks.isEmpty()) {
212                         ArrayList<SessionTask> finished = newTasks(false, tasks);
213                         if(finished == null) return didExecute; 
214                 }
215
216                 while(!tasks.isEmpty()) {
217
218                         SessionTask task = tasks.remove(tasks.size() - 1);
219
220 //                      if(task.syncCaller == index) {
221 //                              ownSyncTasks[index].add(task);
222 //                      } else {
223                                 task.run(index);
224 //                              System.err.println("QT(s) " + index + " runs " + task);
225                                 didExecute = true;
226 //                      }
227
228                 }
229
230                 return didExecute;
231
232         }
233
234         @Override
235         public void run() {
236
237                 processor.thread.set(index);
238                 
239                 QuerySupport support = this.querySupport;
240
241                 try {
242
243                         while (true) {
244
245                                 boolean finished = newTasks(true, tasks) == null;
246                                 if(finished) {
247                                         return;
248                                 }
249                                 
250                                 while(!tasks.isEmpty()) {
251
252                                         SessionTask task = tasks.remove(tasks.size()-1);
253 //                                      System.err.println("QT " + index + " runs " + task);
254                                         task.run(index);
255
256                                 }
257
258 //                              for(int performer=0;performer<THREADS;performer++) {
259 //                                      if(!delayQueues[index * THREADS + performer].isEmpty()) {
260 //                                              synchronized(executors[performer]) {
261 //                                                      threadLocks[performer].lock();
262 //                                                      queues[performer].addAll(delayQueues[index * THREADS + performer]);
263 //                                                      delayQueues[index * THREADS + performer].clear();
264 //                                                      executors[performer].notify();
265 //                                                      threadLocks[performer].unlock();
266 //                                              }
267 //                                      }
268 //                              }
269
270                         }
271
272                 } catch (Throwable t) {
273
274                         LOGGER.error("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
275                         support.exit(t);
276
277                 } finally {
278                         
279                         exited.release();
280
281                 }
282
283         }
284
285 }