]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java
Sync git svn branch with SVN repository r33308.
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionRequestManager.java
1 package fi.vtt.simantics.procore.internal;\r
2 \r
3 import java.io.IOException;\r
4 import java.util.Collection;\r
5 import java.util.LinkedList;\r
6 \r
7 import org.simantics.db.Resource;\r
8 import org.simantics.db.common.utils.Logger;\r
9 import org.simantics.db.exception.CancelTransactionException;\r
10 import org.simantics.db.exception.DatabaseException;\r
11 import org.simantics.db.impl.graph.WriteGraphImpl;\r
12 import org.simantics.db.impl.internal.RandomAccessValueSupport;\r
13 import org.simantics.db.impl.internal.ResourceData;\r
14 import org.simantics.db.impl.query.QueryProcessor.SessionRead;\r
15 import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
16 import org.simantics.db.request.WriteTraits;\r
17 import org.simantics.utils.datastructures.Pair;\r
18 \r
19 public class SessionRequestManager {\r
20 \r
21         private static boolean DEBUG = false;\r
22         \r
23         enum State {\r
24                 \r
25                 INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR\r
26                 \r
27         }\r
28         \r
29         final SessionImplSocket session;\r
30         final fi.vtt.simantics.procore.internal.State transactionState;\r
31         final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();\r
32         final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();\r
33 \r
34         State state = State.INIT;\r
35 \r
36         private boolean legal(State current, State target) {\r
37                 \r
38                 if(State.IDLE == current) {\r
39                         \r
40                         return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;\r
41                         \r
42                 } else if (State.WRITE == current) {\r
43                         \r
44                         return State.WRITE_UPDATE == target;\r
45 \r
46                 } else if (State.READ == current) {\r
47                         \r
48                         return State.READ_UPDATE == target;\r
49 \r
50                 } else if (State.ERROR == current) {\r
51                         \r
52                         return State.IDLE == target;\r
53                         \r
54                 } else {\r
55                         \r
56                         return State.IDLE == target;\r
57                         \r
58                 }\r
59                 \r
60         }\r
61         \r
62         private synchronized void setState(int thread, State state, SessionTask task) {\r
63 \r
64                 if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);\r
65                 if(this.state == state) return;\r
66                 \r
67                 assert(legal(this.state, state));\r
68                 \r
69                 if(State.READ == state) {\r
70                         \r
71                         assert(State.IDLE == this.state);\r
72                         \r
73                 }\r
74                 \r
75                 State oldState = this.state;\r
76                 this.state = state;\r
77                 \r
78                 if(State.IDLE == oldState && State.IDLE != state) {\r
79                     session.incAsync();\r
80                 }\r
81                 \r
82                 /*\r
83                  * We are now executing in new state.\r
84                  * The methods below SHALL NOT start any task\r
85                  * immediately. All tasks shall be scheduled.\r
86                  * \r
87                  */\r
88                 if(State.READ == state) startRead(thread, (SessionRead)task);\r
89                 else if(State.WRITE == state) startWrite(thread, task);\r
90                 else if(State.READ == oldState) startReadUpdate(thread);\r
91                 else if(State.WRITE == oldState) startWriteUpdate(thread);\r
92 \r
93                 // Check the need for new transaction\r
94                 if(state == State.IDLE) {\r
95 \r
96                         closeRandomAccessValues();\r
97 \r
98                         if(!writes.isEmpty()) {\r
99                                 setState(thread, State.WRITE, writes.poll());\r
100                         } else if (!reads.isEmpty()) {\r
101                                 setState(thread, State.READ, reads.poll());\r
102                         }\r
103 \r
104             session.decAsync();\r
105 \r
106                 }\r
107                 \r
108         }\r
109         \r
110         public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {\r
111                 this.session = session;\r
112                 this.transactionState = transactionState;\r
113         }\r
114         \r
115         public synchronized void startRead(int thread, final SessionRead task) {\r
116                 \r
117                 session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) {\r
118 \r
119                         @Override\r
120             public void run(int thread) {\r
121                 try {\r
122                     transactionState.startReadTransaction(thread);\r
123                     task.run(thread);\r
124                 } catch (Throwable t) {\r
125                     Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t));\r
126                     if(task.throwable != null)\r
127                         task.throwable.set(t);\r
128                     state = State.ERROR;\r
129                 } finally {\r
130                     if(task.notify != null)\r
131                         task.notify.release();\r
132                 }\r
133             }\r
134                         \r
135                 });\r
136                 \r
137         }\r
138         \r
139         public synchronized void startReadUpdate(int thread) {\r
140                 \r
141                 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {\r
142 \r
143                         @Override\r
144                         public void run(int thread) {\r
145 \r
146                                 session.fireFinishReadTransaction();\r
147 \r
148                                 try {\r
149                                         transactionState.stopReadTransaction();\r
150                                 } catch (DatabaseException e) {\r
151                                         e.printStackTrace();\r
152                                 }\r
153                                 \r
154                         }\r
155                         \r
156                 });\r
157                 \r
158         }\r
159 \r
160 //      public synchronized void stopRead(int thread) {\r
161 //              \r
162 //              try {\r
163 //                      transactionState.stopReadTransaction();\r
164 //              } catch (DatabaseException e) {\r
165 //                      e.printStackTrace();\r
166 //              }\r
167 //              \r
168 //      }\r
169 \r
170         public synchronized void startWrite(int thread, final SessionTask task) {\r
171                 \r
172                 session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) {\r
173 \r
174                         @Override\r
175                         public void run(int thread) {\r
176 \r
177                         try {\r
178                                 transactionState.startWriteTransaction(thread);\r
179                         } catch (Throwable t) {\r
180                             DatabaseException e = new DatabaseException("Write transaction could not be started", t);\r
181                             Logger.defaultLogError(e);\r
182                             return;\r
183                         }\r
184                                 task.run(thread);\r
185                                 \r
186                         }\r
187                         \r
188                 });\r
189                 \r
190         }\r
191 \r
192         public synchronized void startWriteUpdate(int thread) {\r
193                 \r
194                 session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {\r
195 \r
196                         @Override\r
197                         public void run(int thread) {\r
198 \r
199                                 // Support for DelayedWriteRequest cancels during the\r
200                                 // read-only part of the request.\r
201                                 WriteStateBase<?> delayedState = session.delayedWriteState;\r
202                                 session.delayedWriteState = null;\r
203                                 if (delayedState != null) {\r
204                                         if (session.writeState != null)\r
205                                                 throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");\r
206 \r
207                         if (delayedState.isExcepted()) {\r
208                         // There can never be any changes in the cluster stream.\r
209                         // In this case, don't bother rolling back.\r
210                             boolean empty = session.clusterStream.reallyFlush(); \r
211                         assert(empty);\r
212                             transactionState.stopWriteTransaction(session.clusterStream);\r
213                         } else {\r
214                             throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");\r
215                         }\r
216                         session.clientChanges = new ClientChangesImpl(session);\r
217                                         delayedState.finish();\r
218                                         return;\r
219                                 }\r
220 \r
221                                 // The session could have been terminated\r
222                                 if(!session.state.isAlive()) return;\r
223 \r
224                                 WriteState<?> writeState = session.writeState;\r
225                                 WriteGraphImpl graph = writeState.getGraph();\r
226 \r
227                                 if(writeState.isExcepted()) {\r
228                                     \r
229                                     if(!(writeState.exception instanceof CancelTransactionException))\r
230                                         writeState.exception.printStackTrace();\r
231                                     \r
232                                     transactionState.cancelWriteTransaction(graph);\r
233 \r
234                                 } else {\r
235 \r
236                                         session.handleUpdatesAndMetadata(graph);\r
237 \r
238 //                    long start = System.nanoTime();\r
239                                         transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);\r
240 //                                      long duration = System.nanoTime() - start;\r
241 //                    System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");\r
242 \r
243                                 }\r
244 \r
245                                 session.clientChanges = new ClientChangesImpl(session);\r
246 \r
247                                 WriteState<?> state = session.writeState;\r
248                                 state.finish();\r
249                                 session.writeState = null;\r
250 \r
251                         }\r
252 \r
253                 });\r
254 \r
255         }\r
256 \r
257 //      public synchronized void stopWrite(int thread) {\r
258 //              \r
259 //        session.clientChanges = new ClientChangesImpl(session);\r
260 //\r
261 //        WriteState<?> state = session.writeState;\r
262 //\r
263 //        System.err.println("D");\r
264 //        state.finish();\r
265 //        System.err.println("E");\r
266 //        \r
267 //        session.writeState = null;\r
268 //        \r
269 //      }\r
270 \r
271         public synchronized void ceased(int thread) {\r
272 \r
273                 if(State.WRITE == state) {\r
274 \r
275                         setState(thread, State.WRITE_UPDATE, null);\r
276                         \r
277                 } else if (State.WRITE_UPDATE == state) {\r
278 \r
279                         setState(thread, State.IDLE, null);\r
280 \r
281                 } else if (State.READ_UPDATE == state) {\r
282 \r
283                         setState(thread, State.IDLE, null);\r
284                         \r
285                 } else if (State.READ == state) {\r
286 \r
287                         if (!reads.isEmpty()) {\r
288 \r
289                                 final SessionRead read = reads.poll();\r
290                                 session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) {\r
291 \r
292                                         @Override\r
293                                         public void run(int thread) {\r
294                                                 read.run(thread);\r
295                                                 if(read.notify != null) read.notify.release();\r
296                                         }\r
297                                         \r
298                                 });     \r
299                                 \r
300                         } else {\r
301                                 \r
302                                 setState(thread, State.READ_UPDATE, null);\r
303                                 \r
304                         }\r
305 \r
306                 } else if (State.INIT == state) {\r
307                         \r
308                         assert(reads.isEmpty());\r
309                         assert(writes.isEmpty());\r
310                         setState(thread, State.IDLE, null);\r
311                         \r
312                 } else if (State.ERROR == state) {\r
313                         \r
314                         setState(thread, State.IDLE, null);\r
315                         \r
316                 } else {\r
317 \r
318                     throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); \r
319                         \r
320                 }\r
321                 \r
322         }\r
323         \r
324     public synchronized void scheduleRead(final SessionRead task) {\r
325 \r
326                 assert(State.INIT != state);\r
327                 \r
328                 if(State.READ == state) {\r
329                         session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) {\r
330 \r
331                                 @Override\r
332                                 public void run(int thread) {\r
333                                         try {\r
334                                                 task.run(thread);\r
335                                         } finally {\r
336                                                 if (task.notify != null) task.notify.release();\r
337                                         }\r
338                                 }\r
339                                 \r
340                         });\r
341                 } else if (State.IDLE == state) {\r
342                         setState(Integer.MIN_VALUE, State.READ, task);\r
343                 } else {\r
344                         reads.offer(task);\r
345                 }\r
346                 \r
347         }\r
348 \r
349     public int lastUpdateIndex() {\r
350         return writes.size();\r
351     }\r
352     \r
353         public synchronized void scheduleWrite(SessionTask task) {\r
354 \r
355                 scheduleWrite(task, null);\r
356 \r
357         }\r
358 \r
359         /**\r
360          * @param task the task to schedule \r
361          * @param combine <code>true</code> or <code>false</code> to explicitly\r
362          *        specify whether to combine the write with the previous operation\r
363          *        or <code>null</code> to use the default logic, i.e. combine when\r
364          *        this schedule is performed from within update processing (state ==\r
365          *        State.WRITE_UPDATE).\r
366          */\r
367         public synchronized void scheduleWrite(SessionTask task, Boolean combine) {\r
368 \r
369                 boolean inUpdate = state == State.WRITE_UPDATE;\r
370 \r
371                 assert(State.INIT != state);\r
372                 //task.combine = combine != null ? combine : inUpdate;\r
373                 if(State.IDLE == state) {\r
374                         setState(Integer.MIN_VALUE, State.WRITE, task);\r
375                 } else {\r
376                         if(inUpdate) {\r
377                                 int index = lastUpdateIndex();\r
378                                 if(index == writes.size()) writes.offer(task);\r
379                                 else writes.add(index, task);\r
380                         } else {\r
381                                 writes.offer(task);\r
382                         }\r
383                 }\r
384 \r
385         }\r
386 \r
387         /**\r
388          * Unregisters and closes all currently registered ResourceData random\r
389          * access value file handles. Any I/O exceptions occurring during closing\r
390          * are logged into {@link Logger}.\r
391          */\r
392         private void closeRandomAccessValues() {\r
393                 RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);\r
394                 if (ravs == null)\r
395                         return;\r
396                 Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();\r
397                 for (Pair<Resource, ResourceData> value : values) {\r
398                         try {\r
399                                 value.second.binaryFile.close();\r
400                         } catch (IOException e) {\r
401                                 Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);\r
402                         }\r
403                 }\r
404         }\r
405 \r
406 }\r