]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java
Some fixes/cleanup for cluster table size caching logic.
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionRequestManager.java
1 package fi.vtt.simantics.procore.internal;
2
3 import java.io.IOException;
4 import java.util.Collection;
5 import java.util.LinkedList;
6
7 import org.simantics.db.Resource;
8 import org.simantics.db.common.utils.Logger;
9 import org.simantics.db.exception.CancelTransactionException;
10 import org.simantics.db.exception.DatabaseException;
11 import org.simantics.db.impl.graph.WriteGraphImpl;
12 import org.simantics.db.impl.internal.RandomAccessValueSupport;
13 import org.simantics.db.impl.internal.ResourceData;
14 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
15 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
16 import org.simantics.db.request.WriteTraits;
17 import org.simantics.utils.datastructures.Pair;
18
19 public class SessionRequestManager {
20
21         private static boolean DEBUG = false;
22         
23         enum State {
24                 
25                 INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR
26                 
27         }
28         
29         final SessionImplSocket session;
30         final fi.vtt.simantics.procore.internal.State transactionState;
31         final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();
32         final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();
33
34         State state = State.INIT;
35
36         private boolean legal(State current, State target) {
37                 
38                 if(State.IDLE == current) {
39                         
40                         return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;
41                         
42                 } else if (State.WRITE == current) {
43                         
44                         return State.WRITE_UPDATE == target;
45
46                 } else if (State.READ == current) {
47                         
48                         return State.READ_UPDATE == target;
49
50                 } else if (State.ERROR == current) {
51                         
52                         return State.IDLE == target;
53                         
54                 } else {
55                         
56                         return State.IDLE == target;
57                         
58                 }
59                 
60         }
61         
62         private synchronized void setState(int thread, State state, SessionTask task) {
63
64                 if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);
65                 if(this.state == state) return;
66                 
67                 assert(legal(this.state, state));
68                 
69                 if(State.READ == state) {
70                         
71                         assert(State.IDLE == this.state);
72                         
73                 }
74                 
75                 State oldState = this.state;
76                 this.state = state;
77                 
78                 if(State.IDLE == oldState && State.IDLE != state) {
79                     session.incAsync();
80                 }
81                 
82                 /*
83                  * We are now executing in new state.
84                  * The methods below SHALL NOT start any task
85                  * immediately. All tasks shall be scheduled.
86                  * 
87                  */
88                 if(State.READ == state) startRead(thread, (SessionRead)task);
89                 else if(State.WRITE == state) startWrite(thread, task);
90                 else if(State.READ == oldState) startReadUpdate(thread);
91                 else if(State.WRITE == oldState) startWriteUpdate(thread);
92
93                 // Check the need for new transaction
94                 if(state == State.IDLE) {
95
96                         closeRandomAccessValues();
97
98                         if(!writes.isEmpty()) {
99                                 setState(thread, State.WRITE, writes.poll());
100                         } else if (!reads.isEmpty()) {
101                                 setState(thread, State.READ, reads.poll());
102                         }
103
104             session.decAsync();
105
106                 }
107                 
108         }
109         
110         public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {
111                 this.session = session;
112                 this.transactionState = transactionState;
113         }
114         
115         public synchronized void startRead(int thread, final SessionRead task) {
116                 
117                 session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) {
118
119                         @Override
120             public void run(int thread) {
121                 try {
122                     transactionState.startReadTransaction(thread);
123                     task.run(thread);
124                 } catch (Throwable t) {
125                     Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t));
126                     if(task.throwable != null)
127                         task.throwable.set(t);
128                     state = State.ERROR;
129                 } finally {
130                     if(task.notify != null)
131                         task.notify.release();
132                 }
133             }
134                         
135                 });
136                 
137         }
138         
139         public synchronized void startReadUpdate(int thread) {
140                 
141                 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
142
143                         @Override
144                         public void run(int thread) {
145
146                                 session.fireFinishReadTransaction();
147
148                                 try {
149                                         transactionState.stopReadTransaction();
150                                 } catch (DatabaseException e) {
151                                         e.printStackTrace();
152                                 }
153                                 
154                         }
155                         
156                 });
157                 
158         }
159
160 //      public synchronized void stopRead(int thread) {
161 //              
162 //              try {
163 //                      transactionState.stopReadTransaction();
164 //              } catch (DatabaseException e) {
165 //                      e.printStackTrace();
166 //              }
167 //              
168 //      }
169
170         public synchronized void startWrite(int thread, final SessionTask task) {
171                 
172                 session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) {
173
174                         @Override
175                         public void run(int thread) {
176
177                         try {
178                                 transactionState.startWriteTransaction(thread);
179                         } catch (Throwable t) {
180                             DatabaseException e = new DatabaseException("Write transaction could not be started", t);
181                             Logger.defaultLogError(e);
182                             return;
183                         }
184                                 task.run(thread);
185                                 
186                         }
187                         
188                 });
189                 
190         }
191
192         public synchronized void startWriteUpdate(int thread) {
193                 
194                 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
195
196                         @Override
197                         public void run(int thread) {
198
199                                 // Support for DelayedWriteRequest cancels during the
200                                 // read-only part of the request.
201                                 WriteStateBase<?> delayedState = session.delayedWriteState;
202                                 session.delayedWriteState = null;
203                                 if (delayedState != null) {
204                                         if (session.writeState != null)
205                                                 throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");
206
207                         if (delayedState.isExcepted()) {
208                         // There can never be any changes in the cluster stream.
209                         // In this case, don't bother rolling back.
210                             boolean empty = session.clusterStream.reallyFlush(); 
211                         assert(empty);
212                             transactionState.stopWriteTransaction(session.clusterStream);
213                         } else {
214                             throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");
215                         }
216                         session.clientChanges = new ClientChangesImpl(session);
217                                         delayedState.finish();
218                                         return;
219                                 }
220
221                                 // The session could have been terminated
222                                 if(!session.state.isAlive()) return;
223
224                                 WriteState<?> writeState = session.writeState;
225                                 WriteGraphImpl graph = writeState.getGraph();
226
227                                 if(writeState.isExcepted()) {
228                                     
229                                     if(!(writeState.exception instanceof CancelTransactionException))
230                                         writeState.exception.printStackTrace();
231                                     
232                                     transactionState.cancelWriteTransaction(graph);
233
234                                 } else {
235
236                                         session.handleUpdatesAndMetadata(graph);
237
238 //                    long start = System.nanoTime();
239                                         transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);
240 //                                      long duration = System.nanoTime() - start;
241 //                    System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");
242
243                                 }
244
245                                 session.clientChanges = new ClientChangesImpl(session);
246
247                                 WriteState<?> state = session.writeState;
248                                 state.finish();
249                                 session.writeState = null;
250
251                         }
252
253                 });
254
255         }
256
257 //      public synchronized void stopWrite(int thread) {
258 //              
259 //        session.clientChanges = new ClientChangesImpl(session);
260 //
261 //        WriteState<?> state = session.writeState;
262 //
263 //        System.err.println("D");
264 //        state.finish();
265 //        System.err.println("E");
266 //        
267 //        session.writeState = null;
268 //        
269 //      }
270
271         public synchronized void ceased(int thread) {
272
273                 if(State.WRITE == state) {
274
275                         setState(thread, State.WRITE_UPDATE, null);
276                         
277                 } else if (State.WRITE_UPDATE == state) {
278
279                         setState(thread, State.IDLE, null);
280
281                 } else if (State.READ_UPDATE == state) {
282
283                         setState(thread, State.IDLE, null);
284                         
285                 } else if (State.READ == state) {
286
287                         if (!reads.isEmpty()) {
288
289                                 final SessionRead read = reads.poll();
290                                 session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) {
291
292                                         @Override
293                                         public void run(int thread) {
294                                                 read.run(thread);
295                                                 if(read.notify != null) read.notify.release();
296                                         }
297                                         
298                                 });     
299                                 
300                         } else {
301                                 
302                                 setState(thread, State.READ_UPDATE, null);
303                                 
304                         }
305
306                 } else if (State.INIT == state) {
307                         
308                         assert(reads.isEmpty());
309                         assert(writes.isEmpty());
310                         setState(thread, State.IDLE, null);
311                         
312                 } else if (State.ERROR == state) {
313                         
314                         setState(thread, State.IDLE, null);
315                         
316                 } else {
317
318                     throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); 
319                         
320                 }
321                 
322         }
323         
324     public synchronized void scheduleRead(final SessionRead task) {
325
326                 assert(State.INIT != state);
327                 
328                 if(State.READ == state) {
329                         session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) {
330
331                                 @Override
332                                 public void run(int thread) {
333                                         try {
334                                                 task.run(thread);
335                                         } finally {
336                                                 if (task.notify != null) task.notify.release();
337                                         }
338                                 }
339                                 
340                         });
341                 } else if (State.IDLE == state) {
342                         setState(Integer.MIN_VALUE, State.READ, task);
343                 } else {
344                         reads.offer(task);
345                 }
346                 
347         }
348
349     public int lastUpdateIndex() {
350         return writes.size();
351     }
352     
353         public synchronized void scheduleWrite(SessionTask task) {
354
355                 scheduleWrite(task, null);
356
357         }
358
359         /**
360          * @param task the task to schedule 
361          * @param combine <code>true</code> or <code>false</code> to explicitly
362          *        specify whether to combine the write with the previous operation
363          *        or <code>null</code> to use the default logic, i.e. combine when
364          *        this schedule is performed from within update processing (state ==
365          *        State.WRITE_UPDATE).
366          */
367         public synchronized void scheduleWrite(SessionTask task, Boolean combine) {
368
369                 boolean inUpdate = state == State.WRITE_UPDATE;
370
371                 assert(State.INIT != state);
372                 //task.combine = combine != null ? combine : inUpdate;
373                 if(State.IDLE == state) {
374                         setState(Integer.MIN_VALUE, State.WRITE, task);
375                 } else {
376                         if(inUpdate) {
377                                 int index = lastUpdateIndex();
378                                 if(index == writes.size()) writes.offer(task);
379                                 else writes.add(index, task);
380                         } else {
381                                 writes.offer(task);
382                         }
383                 }
384
385         }
386
387         /**
388          * Unregisters and closes all currently registered ResourceData random
389          * access value file handles. Any I/O exceptions occurring during closing
390          * are logged into {@link Logger}.
391          */
392         private void closeRandomAccessValues() {
393                 RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);
394                 if (ravs == null)
395                         return;
396                 Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();
397                 for (Pair<Resource, ResourceData> value : values) {
398                         try {
399                                 value.second.binaryFile.close();
400                         } catch (IOException e) {
401                                 Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);
402                         }
403                 }
404         }
405
406 }