]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionRequestManager.java
diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java
new file mode 100644 (file)
index 0000000..cb1e2e0
--- /dev/null
@@ -0,0 +1,406 @@
+package fi.vtt.simantics.procore.internal;\r
+\r
+import java.io.IOException;\r
+import java.util.Collection;\r
+import java.util.LinkedList;\r
+\r
+import org.simantics.db.Resource;\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.exception.CancelTransactionException;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.impl.graph.WriteGraphImpl;\r
+import org.simantics.db.impl.internal.RandomAccessValueSupport;\r
+import org.simantics.db.impl.internal.ResourceData;\r
+import org.simantics.db.impl.query.QueryProcessor.SessionRead;\r
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
+import org.simantics.db.request.WriteTraits;\r
+import org.simantics.utils.datastructures.Pair;\r
+\r
+public class SessionRequestManager {\r
+\r
+       private static boolean DEBUG = false;\r
+       \r
+       enum State {\r
+               \r
+               INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR\r
+               \r
+       }\r
+       \r
+       final SessionImplSocket session;\r
+       final fi.vtt.simantics.procore.internal.State transactionState;\r
+       final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();\r
+       final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();\r
+\r
+       State state = State.INIT;\r
+\r
+       private boolean legal(State current, State target) {\r
+               \r
+               if(State.IDLE == current) {\r
+                       \r
+                       return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;\r
+                       \r
+               } else if (State.WRITE == current) {\r
+                       \r
+                       return State.WRITE_UPDATE == target;\r
+\r
+               } else if (State.READ == current) {\r
+                       \r
+                       return State.READ_UPDATE == target;\r
+\r
+               } else if (State.ERROR == current) {\r
+                       \r
+                       return State.IDLE == target;\r
+                       \r
+               } else {\r
+                       \r
+                       return State.IDLE == target;\r
+                       \r
+               }\r
+               \r
+       }\r
+       \r
+       private synchronized void setState(int thread, State state, SessionTask task) {\r
+\r
+               if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);\r
+               if(this.state == state) return;\r
+               \r
+               assert(legal(this.state, state));\r
+               \r
+               if(State.READ == state) {\r
+                       \r
+                       assert(State.IDLE == this.state);\r
+                       \r
+               }\r
+               \r
+               State oldState = this.state;\r
+               this.state = state;\r
+               \r
+               if(State.IDLE == oldState && State.IDLE != state) {\r
+                   session.incAsync();\r
+               }\r
+               \r
+               /*\r
+                * We are now executing in new state.\r
+                * The methods below SHALL NOT start any task\r
+                * immediately. All tasks shall be scheduled.\r
+                * \r
+                */\r
+               if(State.READ == state) startRead(thread, (SessionRead)task);\r
+               else if(State.WRITE == state) startWrite(thread, task);\r
+               else if(State.READ == oldState) startReadUpdate(thread);\r
+               else if(State.WRITE == oldState) startWriteUpdate(thread);\r
+\r
+               // Check the need for new transaction\r
+               if(state == State.IDLE) {\r
+\r
+                       closeRandomAccessValues();\r
+\r
+                       if(!writes.isEmpty()) {\r
+                               setState(thread, State.WRITE, writes.poll());\r
+                       } else if (!reads.isEmpty()) {\r
+                               setState(thread, State.READ, reads.poll());\r
+                       }\r
+\r
+            session.decAsync();\r
+\r
+               }\r
+               \r
+       }\r
+       \r
+       public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {\r
+               this.session = session;\r
+               this.transactionState = transactionState;\r
+       }\r
+       \r
+       public synchronized void startRead(int thread, final SessionRead task) {\r
+               \r
+               session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) {\r
+\r
+                       @Override\r
+            public void run(int thread) {\r
+                try {\r
+                    transactionState.startReadTransaction(thread);\r
+                    task.run(thread);\r
+                } catch (Throwable t) {\r
+                    Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t));\r
+                    if(task.throwable != null)\r
+                        task.throwable.set(t);\r
+                    state = State.ERROR;\r
+                } finally {\r
+                    if(task.notify != null)\r
+                        task.notify.release();\r
+                }\r
+            }\r
+                       \r
+               });\r
+               \r
+       }\r
+       \r
+       public synchronized void startReadUpdate(int thread) {\r
+               \r
+               session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {\r
+\r
+                       @Override\r
+                       public void run(int thread) {\r
+\r
+                               session.fireFinishReadTransaction();\r
+\r
+                               try {\r
+                                       transactionState.stopReadTransaction();\r
+                               } catch (DatabaseException e) {\r
+                                       e.printStackTrace();\r
+                               }\r
+                               \r
+                       }\r
+                       \r
+               });\r
+               \r
+       }\r
+\r
+//     public synchronized void stopRead(int thread) {\r
+//             \r
+//             try {\r
+//                     transactionState.stopReadTransaction();\r
+//             } catch (DatabaseException e) {\r
+//                     e.printStackTrace();\r
+//             }\r
+//             \r
+//     }\r
+\r
+       public synchronized void startWrite(int thread, final SessionTask task) {\r
+               \r
+               session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) {\r
+\r
+                       @Override\r
+                       public void run(int thread) {\r
+\r
+                       try {\r
+                               transactionState.startWriteTransaction(thread);\r
+                       } catch (Throwable t) {\r
+                           DatabaseException e = new DatabaseException("Write transaction could not be started", t);\r
+                           Logger.defaultLogError(e);\r
+                           return;\r
+                       }\r
+                               task.run(thread);\r
+                               \r
+                       }\r
+                       \r
+               });\r
+               \r
+       }\r
+\r
+       public synchronized void startWriteUpdate(int thread) {\r
+               \r
+               session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {\r
+\r
+                       @Override\r
+                       public void run(int thread) {\r
+\r
+                               // Support for DelayedWriteRequest cancels during the\r
+                               // read-only part of the request.\r
+                               WriteStateBase<?> delayedState = session.delayedWriteState;\r
+                               session.delayedWriteState = null;\r
+                               if (delayedState != null) {\r
+                                       if (session.writeState != null)\r
+                                               throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");\r
+\r
+                       if (delayedState.isExcepted()) {\r
+                        // There can never be any changes in the cluster stream.\r
+                        // In this case, don't bother rolling back.\r
+                           boolean empty = session.clusterStream.reallyFlush(); \r
+                        assert(empty);\r
+                           transactionState.stopWriteTransaction(session.clusterStream);\r
+                       } else {\r
+                           throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");\r
+                       }\r
+                       session.clientChanges = new ClientChangesImpl(session);\r
+                                       delayedState.finish();\r
+                                       return;\r
+                               }\r
+\r
+                               // The session could have been terminated\r
+                               if(!session.state.isAlive()) return;\r
+\r
+                               WriteState<?> writeState = session.writeState;\r
+                               WriteGraphImpl graph = writeState.getGraph();\r
+\r
+                               if(writeState.isExcepted()) {\r
+                                   \r
+                                   if(!(writeState.exception instanceof CancelTransactionException))\r
+                                       writeState.exception.printStackTrace();\r
+                                   \r
+                                   transactionState.cancelWriteTransaction(graph);\r
+\r
+                               } else {\r
+\r
+                                       session.handleUpdatesAndMetadata(graph);\r
+\r
+//                    long start = System.nanoTime();\r
+                                       transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);\r
+//                                     long duration = System.nanoTime() - start;\r
+//                    System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");\r
+\r
+                               }\r
+\r
+                               session.clientChanges = new ClientChangesImpl(session);\r
+\r
+                               WriteState<?> state = session.writeState;\r
+                               state.finish();\r
+                               session.writeState = null;\r
+\r
+                       }\r
+\r
+               });\r
+\r
+       }\r
+\r
+//     public synchronized void stopWrite(int thread) {\r
+//             \r
+//        session.clientChanges = new ClientChangesImpl(session);\r
+//\r
+//        WriteState<?> state = session.writeState;\r
+//\r
+//        System.err.println("D");\r
+//        state.finish();\r
+//        System.err.println("E");\r
+//        \r
+//        session.writeState = null;\r
+//        \r
+//     }\r
+\r
+       public synchronized void ceased(int thread) {\r
+\r
+               if(State.WRITE == state) {\r
+\r
+                       setState(thread, State.WRITE_UPDATE, null);\r
+                       \r
+               } else if (State.WRITE_UPDATE == state) {\r
+\r
+                       setState(thread, State.IDLE, null);\r
+\r
+               } else if (State.READ_UPDATE == state) {\r
+\r
+                       setState(thread, State.IDLE, null);\r
+                       \r
+               } else if (State.READ == state) {\r
+\r
+                       if (!reads.isEmpty()) {\r
+\r
+                               final SessionRead read = reads.poll();\r
+                               session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) {\r
+\r
+                                       @Override\r
+                                       public void run(int thread) {\r
+                                               read.run(thread);\r
+                                               if(read.notify != null) read.notify.release();\r
+                                       }\r
+                                       \r
+                               });     \r
+                               \r
+                       } else {\r
+                               \r
+                               setState(thread, State.READ_UPDATE, null);\r
+                               \r
+                       }\r
+\r
+               } else if (State.INIT == state) {\r
+                       \r
+                       assert(reads.isEmpty());\r
+                       assert(writes.isEmpty());\r
+                       setState(thread, State.IDLE, null);\r
+                       \r
+               } else if (State.ERROR == state) {\r
+                       \r
+                       setState(thread, State.IDLE, null);\r
+                       \r
+               } else {\r
+\r
+                   throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); \r
+                       \r
+               }\r
+               \r
+       }\r
+       \r
+    public synchronized void scheduleRead(final SessionRead task) {\r
+\r
+               assert(State.INIT != state);\r
+               \r
+               if(State.READ == state) {\r
+                       session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) {\r
+\r
+                               @Override\r
+                               public void run(int thread) {\r
+                                       try {\r
+                                               task.run(thread);\r
+                                       } finally {\r
+                                               if (task.notify != null) task.notify.release();\r
+                                       }\r
+                               }\r
+                               \r
+                       });\r
+               } else if (State.IDLE == state) {\r
+                       setState(Integer.MIN_VALUE, State.READ, task);\r
+               } else {\r
+                       reads.offer(task);\r
+               }\r
+               \r
+       }\r
+\r
+    public int lastUpdateIndex() {\r
+        return writes.size();\r
+    }\r
+    \r
+       public synchronized void scheduleWrite(SessionTask task) {\r
+\r
+               scheduleWrite(task, null);\r
+\r
+       }\r
+\r
+       /**\r
+        * @param task the task to schedule \r
+        * @param combine <code>true</code> or <code>false</code> to explicitly\r
+        *        specify whether to combine the write with the previous operation\r
+        *        or <code>null</code> to use the default logic, i.e. combine when\r
+        *        this schedule is performed from within update processing (state ==\r
+        *        State.WRITE_UPDATE).\r
+        */\r
+       public synchronized void scheduleWrite(SessionTask task, Boolean combine) {\r
+\r
+               boolean inUpdate = state == State.WRITE_UPDATE;\r
+\r
+               assert(State.INIT != state);\r
+               //task.combine = combine != null ? combine : inUpdate;\r
+               if(State.IDLE == state) {\r
+                       setState(Integer.MIN_VALUE, State.WRITE, task);\r
+               } else {\r
+                       if(inUpdate) {\r
+                               int index = lastUpdateIndex();\r
+                               if(index == writes.size()) writes.offer(task);\r
+                               else writes.add(index, task);\r
+                       } else {\r
+                               writes.offer(task);\r
+                       }\r
+               }\r
+\r
+       }\r
+\r
+       /**\r
+        * Unregisters and closes all currently registered ResourceData random\r
+        * access value file handles. Any I/O exceptions occurring during closing\r
+        * are logged into {@link Logger}.\r
+        */\r
+       private void closeRandomAccessValues() {\r
+               RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);\r
+               if (ravs == null)\r
+                       return;\r
+               Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();\r
+               for (Pair<Resource, ResourceData> value : values) {\r
+                       try {\r
+                               value.second.binaryFile.close();\r
+                       } catch (IOException e) {\r
+                               Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);\r
+                       }\r
+               }\r
+       }\r
+\r
+}\r