]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java
Still working for multiple readers
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionRequestManager.java
1 package fi.vtt.simantics.procore.internal;
2
3 import java.io.IOException;
4 import java.util.Collection;
5 import java.util.LinkedList;
6
7 import org.simantics.db.Disposable;
8 import org.simantics.db.Resource;
9 import org.simantics.db.common.utils.Logger;
10 import org.simantics.db.exception.CancelTransactionException;
11 import org.simantics.db.exception.DatabaseException;
12 import org.simantics.db.impl.graph.WriteGraphImpl;
13 import org.simantics.db.impl.internal.RandomAccessValueSupport;
14 import org.simantics.db.impl.internal.ResourceData;
15 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
16 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
17 import org.simantics.db.request.WriteTraits;
18 import org.simantics.utils.datastructures.Pair;
19
20 public class SessionRequestManager {
21
22         private static boolean DEBUG = false;
23         
24         enum State {
25                 
26                 INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR
27                 
28         }
29         
30         final SessionImplSocket session;
31         final fi.vtt.simantics.procore.internal.State transactionState;
32         final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();
33         final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();
34
35         State state = State.INIT;
36
37         private boolean legal(State current, State target) {
38                 
39                 if(State.IDLE == current) {
40                         
41                         return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;
42                         
43                 } else if (State.WRITE == current) {
44                         
45                         return State.WRITE_UPDATE == target;
46
47                 } else if (State.READ == current) {
48                         
49                         return State.READ_UPDATE == target;
50
51                 } else if (State.ERROR == current) {
52                         
53                         return State.IDLE == target;
54                         
55                 } else {
56                         
57                         return State.IDLE == target;
58                         
59                 }
60                 
61         }
62         
63         private synchronized void setState(int thread, State state, SessionTask task) {
64
65                 if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);
66                 if(this.state == state) return;
67                 
68                 assert(legal(this.state, state));
69                 
70                 if(State.READ == state) {
71                         
72                         assert(State.IDLE == this.state);
73                         
74                 }
75                 
76                 State oldState = this.state;
77                 this.state = state;
78                 
79                 if(State.IDLE == oldState && State.IDLE != state) {
80                     session.incAsync();
81                 }
82                 
83                 /*
84                  * We are now executing in new state.
85                  * The methods below SHALL NOT start any task
86                  * immediately. All tasks shall be scheduled.
87                  * 
88                  */
89                 if(State.READ == state) startRead(thread, (SessionRead)task);
90                 else if(State.WRITE == state) startWrite(thread, task);
91                 else if(State.READ == oldState) startReadUpdate(thread);
92                 else if(State.WRITE == oldState) startWriteUpdate(thread);
93
94                 // Check the need for new transaction
95                 if(state == State.IDLE) {
96
97                         closeRandomAccessValues();
98
99                         if(!writes.isEmpty()) {
100                                 setState(thread, State.WRITE, writes.poll());
101                         } else if (!reads.isEmpty()) {
102                                 setState(thread, State.READ, reads.poll());
103                         }
104
105             session.decAsync();
106
107                 }
108                 
109         }
110         
111         public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {
112                 this.session = session;
113                 this.transactionState = transactionState;
114         }
115         
116         public synchronized void startRead(int thread, final SessionRead task) {
117                 
118                 session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) {
119
120                         @Override
121             public void run(int thread) {
122                 try {
123                     transactionState.startReadTransaction(thread);
124                     task.run(thread);
125                 } catch (Throwable t) {
126                     Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t));
127                     if(task.throwable != null)
128                         task.throwable.set(t);
129                     state = State.ERROR;
130                 } finally {
131                     if(task.notify != null)
132                         task.notify.release();
133                 }
134             }
135                         
136                 });
137                 
138         }
139         
140         public synchronized void startReadUpdate(int thread) {
141                 
142                 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
143
144                         @Override
145                         public void run(int thread) {
146
147                                 session.fireFinishReadTransaction();
148
149                                 try {
150                                         transactionState.stopReadTransaction();
151                                 } catch (DatabaseException e) {
152                                         e.printStackTrace();
153                                 }
154                                 
155                         }
156                         
157                 });
158                 
159         }
160
161 //      public synchronized void stopRead(int thread) {
162 //              
163 //              try {
164 //                      transactionState.stopReadTransaction();
165 //              } catch (DatabaseException e) {
166 //                      e.printStackTrace();
167 //              }
168 //              
169 //      }
170
171         public synchronized void startWrite(int thread, final SessionTask task) {
172                 
173                 session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) {
174
175                         @Override
176                         public void run(int thread) {
177
178                         try {
179                                 transactionState.startWriteTransaction(thread);
180                         } catch (Throwable t) {
181                             DatabaseException e = new DatabaseException("Write transaction could not be started", t);
182                             Logger.defaultLogError(e);
183                             return;
184                         }
185                                 task.run(thread);
186                                 
187                         }
188                         
189                 });
190                 
191         }
192
193         public synchronized void startWriteUpdate(int thread) {
194                 
195                 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
196
197                         @Override
198                         public void run(int thread) {
199
200                                 // Support for DelayedWriteRequest cancels during the
201                                 // read-only part of the request.
202                                 WriteStateBase<?> delayedState = session.delayedWriteState;
203                                 session.delayedWriteState = null;
204                                 if (delayedState != null) {
205                                         if (session.writeState != null)
206                                                 throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");
207
208                         if (delayedState.isExcepted()) {
209                         // There can never be any changes in the cluster stream.
210                         // In this case, don't bother rolling back.
211                             boolean empty = session.clusterStream.reallyFlush(); 
212                         assert(empty);
213                             transactionState.stopWriteTransaction(session.clusterStream);
214                         } else {
215                             throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");
216                         }
217                         Disposable.safeDispose(session.clientChanges);
218                         session.clientChanges = new ClientChangesImpl(session);
219                                         delayedState.finish();
220                                         return;
221                                 }
222
223                                 // The session could have been terminated
224                                 if(!session.state.isAlive()) return;
225
226                                 WriteState<?> writeState = session.writeState;
227                                 
228                                 assert(writeState != null);
229                                 
230                                 WriteGraphImpl graph = writeState.getGraph();
231
232                                 if(writeState.isExcepted()) {
233                                     
234                                     if(!(writeState.exception instanceof CancelTransactionException))
235                                         writeState.exception.printStackTrace();
236                                     
237                                     transactionState.cancelWriteTransaction(graph);
238
239                                 } else {
240
241                                         session.handleUpdatesAndMetadata(graph);
242
243 //                    long start = System.nanoTime();
244                                         transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);
245 //                                      long duration = System.nanoTime() - start;
246 //                    System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");
247
248                                 }
249
250                                 Disposable.safeDispose(session.clientChanges);
251                                 session.clientChanges = new ClientChangesImpl(session);
252
253                                 WriteState<?> state = session.writeState;
254                                 state.finish();
255                                 session.writeState = null;
256
257                         }
258
259                 });
260
261         }
262
263 //      public synchronized void stopWrite(int thread) {
264 //              
265 //        session.clientChanges = new ClientChangesImpl(session);
266 //
267 //        WriteState<?> state = session.writeState;
268 //
269 //        System.err.println("D");
270 //        state.finish();
271 //        System.err.println("E");
272 //        
273 //        session.writeState = null;
274 //        
275 //      }
276
277         public synchronized void ceased(int thread) {
278
279                 if(State.WRITE == state) {
280
281                         setState(thread, State.WRITE_UPDATE, null);
282                         
283                 } else if (State.WRITE_UPDATE == state) {
284
285                         setState(thread, State.IDLE, null);
286
287                 } else if (State.READ_UPDATE == state) {
288
289                         setState(thread, State.IDLE, null);
290                         
291                 } else if (State.READ == state) {
292
293                         if (!reads.isEmpty()) {
294
295                                 final SessionRead read = reads.poll();
296                                 session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) {
297
298                                         @Override
299                                         public void run(int thread) {
300                                                 read.run(thread);
301                                                 if(read.notify != null) read.notify.release();
302                                         }
303                                         
304                                 });     
305                                 
306                         } else {
307                                 
308                                 setState(thread, State.READ_UPDATE, null);
309                                 
310                         }
311
312                 } else if (State.INIT == state) {
313                         
314                         assert(reads.isEmpty());
315                         assert(writes.isEmpty());
316                         setState(thread, State.IDLE, null);
317                         
318                 } else if (State.ERROR == state) {
319                         
320                         setState(thread, State.IDLE, null);
321                         
322                 } else {
323
324                         // Spurious wakeup
325                         
326                     //throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); 
327                         
328                 }
329                 
330         }
331         
332     public synchronized void scheduleRead(final SessionRead task) {
333
334                 assert(State.INIT != state);
335                 
336                 if(State.READ == state) {
337                         session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) {
338
339                                 @Override
340                                 public void run(int thread) {
341                                         try {
342                                                 task.run(thread);
343                                         } finally {
344                                                 if (task.notify != null) task.notify.release();
345                                         }
346                                 }
347                                 
348                         });
349                 } else if (State.IDLE == state) {
350                         setState(Integer.MIN_VALUE, State.READ, task);
351                 } else {
352                         reads.offer(task);
353                 }
354                 
355         }
356
357     public int lastUpdateIndex() {
358         return writes.size();
359     }
360     
361         public synchronized void scheduleWrite(SessionTask task) {
362
363                 scheduleWrite(task, null);
364
365         }
366
367         /**
368          * @param task the task to schedule 
369          * @param combine <code>true</code> or <code>false</code> to explicitly
370          *        specify whether to combine the write with the previous operation
371          *        or <code>null</code> to use the default logic, i.e. combine when
372          *        this schedule is performed from within update processing (state ==
373          *        State.WRITE_UPDATE).
374          */
375         public synchronized void scheduleWrite(SessionTask task, Boolean combine) {
376
377                 boolean inUpdate = state == State.WRITE_UPDATE;
378
379                 //System.err.println("schedule write " + task);
380                 
381                 assert(State.INIT != state);
382                 //task.combine = combine != null ? combine : inUpdate;
383                 if(State.IDLE == state) {
384                         setState(Integer.MIN_VALUE, State.WRITE, task);
385                 } else {
386                         if(inUpdate) {
387                                 int index = lastUpdateIndex();
388                                 if(index == writes.size()) writes.offer(task);
389                                 else writes.add(index, task);
390                         } else {
391                                 writes.offer(task);
392                         }
393                 }
394
395         }
396
397         /**
398          * Unregisters and closes all currently registered ResourceData random
399          * access value file handles. Any I/O exceptions occurring during closing
400          * are logged into {@link Logger}.
401          */
402         private void closeRandomAccessValues() {
403                 RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);
404                 if (ravs == null)
405                         return;
406                 Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();
407                 for (Pair<Resource, ResourceData> value : values) {
408                         try {
409                                 value.second.binaryFile.close();
410                         } catch (IOException e) {
411                                 Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);
412                         }
413                 }
414         }
415
416 }