]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java
Merge "Multiple reader thread support for db client"
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionRequestManager.java
1 package fi.vtt.simantics.procore.internal;
2
3 import java.io.IOException;
4 import java.util.Collection;
5 import java.util.LinkedList;
6
7 import org.simantics.db.Disposable;
8 import org.simantics.db.Resource;
9 import org.simantics.db.exception.CancelTransactionException;
10 import org.simantics.db.exception.DatabaseException;
11 import org.simantics.db.impl.graph.WriteGraphImpl;
12 import org.simantics.db.impl.internal.RandomAccessValueSupport;
13 import org.simantics.db.impl.internal.ResourceData;
14 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
15 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
16 import org.simantics.db.request.WriteTraits;
17 import org.simantics.utils.datastructures.Pair;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 public class SessionRequestManager {
22
23         private static final Logger LOGGER = LoggerFactory.getLogger(SessionRequestManager.class);
24
25         private static boolean DEBUG = false;
26         
27         enum State {
28                 
29                 INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR
30                 
31         }
32         
33         final SessionImplSocket session;
34         final fi.vtt.simantics.procore.internal.State transactionState;
35         final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();
36         final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();
37
38         State state = State.INIT;
39
40         private boolean legal(State current, State target) {
41                 
42                 if(State.IDLE == current) {
43                         
44                         return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;
45                         
46                 } else if (State.WRITE == current) {
47                         
48                         return State.WRITE_UPDATE == target;
49
50                 } else if (State.READ == current) {
51                         
52                         return State.READ_UPDATE == target;
53
54                 } else if (State.ERROR == current) {
55                         
56                         return State.IDLE == target;
57                         
58                 } else {
59                         
60                         return State.IDLE == target;
61                         
62                 }
63                 
64         }
65         
66         private synchronized void setState(int thread, State state, SessionTask task) {
67
68                 if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);
69                 if(this.state == state) return;
70                 
71                 assert(legal(this.state, state));
72                 
73                 if(State.READ == state) {
74                         
75                         assert(State.IDLE == this.state);
76                         
77                 }
78                 
79                 State oldState = this.state;
80                 this.state = state;
81                 
82                 if(State.IDLE == oldState && State.IDLE != state) {
83                     session.incAsync();
84                 }
85                 
86                 /*
87                  * We are now executing in new state.
88                  * The methods below SHALL NOT start any task
89                  * immediately. All tasks shall be scheduled.
90                  * 
91                  */
92                 if(State.READ == state) startRead(thread, (SessionRead)task);
93                 else if(State.WRITE == state) startWrite(thread, task);
94                 else if(State.READ == oldState) startReadUpdate(thread);
95                 else if(State.WRITE == oldState) startWriteUpdate(thread);
96
97                 // Check the need for new transaction
98                 if(state == State.IDLE) {
99
100                         closeRandomAccessValues();
101
102                         if(!writes.isEmpty()) {
103                                 setState(thread, State.WRITE, writes.poll());
104                         } else if (!reads.isEmpty()) {
105                                 setState(thread, State.READ, reads.poll());
106                         }
107
108             session.decAsync();
109
110                 }
111                 
112         }
113         
114         public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {
115                 this.session = session;
116                 this.transactionState = transactionState;
117         }
118         
119         public synchronized void startRead(int thread, final SessionRead task) {
120                 
121                 session.queryProvider2.schedule(new SessionTask(true) {
122
123                         @Override
124             public void run(int thread) {
125                 try {
126                     transactionState.startReadTransaction(thread);
127                     task.run(thread);
128                 } catch (Throwable t) {
129                     LOGGER.error("Read transaction could not be started", t);
130                     if(task.throwable != null)
131                         task.throwable.set(t);
132                     state = State.ERROR;
133                 } finally {
134                     if(task.notify != null)
135                         task.notify.release();
136                 }
137             }
138                         
139                 });
140                 
141         }
142         
143         public synchronized void startReadUpdate(int thread) {
144                 
145                 session.queryProvider2.schedule(new SessionTask(true) {
146
147                         @Override
148                         public void run(int thread) {
149
150                                 session.fireFinishReadTransaction();
151
152                                 try {
153                                         transactionState.stopReadTransaction();
154                                 } catch (DatabaseException e) {
155                                         LOGGER.error("Read transaction could not be stopped", e);
156                                 }
157                                 
158                         }
159                         
160                 });
161                 
162         }
163
164         public synchronized void startWrite(int thread, final SessionTask task) {
165                 
166                 session.queryProvider2.schedule(new SessionTask(true) {
167
168                         @Override
169                         public void run(int thread) {
170
171                         try {
172                                 transactionState.startWriteTransaction(thread);
173                         } catch (Throwable t) {
174                             LOGGER.error("Write transaction could not be started", t);
175                             return;
176                         }
177                                 task.run(thread);
178                                 
179                         }
180                         
181                 });
182                 
183         }
184
185         public synchronized void startWriteUpdate(int thread) {
186                 
187                 session.queryProvider2.schedule(new SessionTask(true) {
188
189                         @Override
190                         public void run(int thread) {
191
192                                 // Support for DelayedWriteRequest cancels during the
193                                 // read-only part of the request.
194                                 WriteStateBase<?> delayedState = session.delayedWriteState;
195                                 session.delayedWriteState = null;
196                                 if (delayedState != null) {
197                                         if (session.writeState != null)
198                                                 throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");
199
200                         if (delayedState.isExcepted()) {
201                         // There can never be any changes in the cluster stream.
202                         // In this case, don't bother rolling back.
203                             boolean empty = session.clusterStream.reallyFlush(); 
204                         assert(empty);
205                             transactionState.stopWriteTransaction(session.clusterStream);
206                         } else {
207                             throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");
208                         }
209                         Disposable.safeDispose(session.clientChanges);
210                         session.clientChanges = new ClientChangesImpl(session);
211                                         delayedState.finish();
212                                         return;
213                                 }
214
215                                 // The session could have been terminated
216                                 if(!session.state.isAlive()) return;
217
218                                 WriteState<?> writeState = session.writeState;
219                                 
220                                 assert(writeState != null);
221                                 
222                                 WriteGraphImpl graph = writeState.getGraph();
223
224                                 if(writeState.isExcepted()) {
225
226                                     if(!(writeState.exception instanceof CancelTransactionException))
227                                         LOGGER.error("Write request failed", writeState.exception);
228
229                                     transactionState.cancelWriteTransaction(graph);
230
231                                 } else {
232
233                                         session.handleUpdatesAndMetadata(graph);
234
235 //                    long start = System.nanoTime();
236                                         transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);
237 //                                      long duration = System.nanoTime() - start;
238 //                    System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");
239
240                                 }
241
242                                 Disposable.safeDispose(session.clientChanges);
243                                 session.clientChanges = new ClientChangesImpl(session);
244
245                                 WriteState<?> state = session.writeState;
246                                 state.finish();
247                                 session.writeState = null;
248
249                         }
250
251                 });
252
253         }
254
255         public synchronized void ceased(int thread) {
256
257                 if(State.WRITE == state) {
258
259                         setState(thread, State.WRITE_UPDATE, null);
260                         
261                 } else if (State.WRITE_UPDATE == state) {
262
263                         setState(thread, State.IDLE, null);
264
265                 } else if (State.READ_UPDATE == state) {
266
267                         setState(thread, State.IDLE, null);
268                         
269                 } else if (State.READ == state) {
270
271                         if (!reads.isEmpty()) {
272
273                                 final SessionRead read = reads.poll();
274                                 session.queryProvider2.schedule(new SessionTask(true) {
275
276                                         @Override
277                                         public void run(int thread) {
278                                                 read.run(thread);
279                                                 if(read.notify != null) read.notify.release();
280                                         }
281                                         
282                                 });     
283                                 
284                         } else {
285                                 
286                                 setState(thread, State.READ_UPDATE, null);
287                                 
288                         }
289
290                 } else if (State.INIT == state) {
291                         
292                         assert(reads.isEmpty());
293                         assert(writes.isEmpty());
294                         setState(thread, State.IDLE, null);
295                         
296                 } else if (State.ERROR == state) {
297                         
298                         setState(thread, State.IDLE, null);
299                         
300                 } else {
301
302                         // Spurious wakeup
303                         
304                     //throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); 
305                         
306                 }
307                 
308         }
309         
310     public synchronized void scheduleRead(final SessionRead task) {
311
312                 assert(State.INIT != state);
313                 
314                 if(State.READ == state) {
315                         session.queryProvider2.schedule(new SessionTask(true) {
316
317                                 @Override
318                                 public void run(int thread) {
319                                         try {
320                                                 task.run(thread);
321                                         } finally {
322                                                 if (task.notify != null) task.notify.release();
323                                         }
324                                 }
325                                 
326                         });
327                 } else if (State.IDLE == state) {
328                         setState(Integer.MIN_VALUE, State.READ, task);
329                 } else {
330                         reads.offer(task);
331                 }
332                 
333         }
334
335     public int lastUpdateIndex() {
336         return writes.size();
337     }
338     
339         public synchronized void scheduleWrite(SessionTask task) {
340
341                 scheduleWrite(task, null);
342
343         }
344
345         /**
346          * @param task the task to schedule 
347          * @param combine <code>true</code> or <code>false</code> to explicitly
348          *        specify whether to combine the write with the previous operation
349          *        or <code>null</code> to use the default logic, i.e. combine when
350          *        this schedule is performed from within update processing (state ==
351          *        State.WRITE_UPDATE).
352          */
353         public synchronized void scheduleWrite(SessionTask task, Boolean combine) {
354
355                 boolean inUpdate = state == State.WRITE_UPDATE;
356
357                 //System.err.println("schedule write " + task);
358                 
359                 assert(State.INIT != state);
360                 //task.combine = combine != null ? combine : inUpdate;
361                 if(State.IDLE == state) {
362                         setState(Integer.MIN_VALUE, State.WRITE, task);
363                 } else {
364                         if(inUpdate) {
365                                 int index = lastUpdateIndex();
366                                 if(index == writes.size()) writes.offer(task);
367                                 else writes.add(index, task);
368                         } else {
369                                 writes.offer(task);
370                         }
371                 }
372
373         }
374
375         /**
376          * Unregisters and closes all currently registered ResourceData random
377          * access value file handles. Any I/O exceptions occurring during closing
378          * are logged into {@link Logger}.
379          */
380         private void closeRandomAccessValues() {
381                 RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);
382                 if (ravs == null)
383                         return;
384                 Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();
385                 for (Pair<Resource, ResourceData> value : values) {
386                         try {
387                                 value.second.binaryFile.close();
388                         } catch (IOException e) {
389                                 LOGGER.error("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);
390                         }
391                 }
392         }
393
394 }