]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java
isImmutable can NPE
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionRequestManager.java
index cb1e2e0cfc7e3505ab8c85f2fd1487507123e4f5..0ec6387ae941b0e9cb9c6358282c3b3849ff9d0d 100644 (file)
-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.io.IOException;\r
-import java.util.Collection;\r
-import java.util.LinkedList;\r
-\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.exception.CancelTransactionException;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.impl.graph.WriteGraphImpl;\r
-import org.simantics.db.impl.internal.RandomAccessValueSupport;\r
-import org.simantics.db.impl.internal.ResourceData;\r
-import org.simantics.db.impl.query.QueryProcessor.SessionRead;\r
-import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
-import org.simantics.db.request.WriteTraits;\r
-import org.simantics.utils.datastructures.Pair;\r
-\r
-public class SessionRequestManager {\r
-\r
-       private static boolean DEBUG = false;\r
-       \r
-       enum State {\r
-               \r
-               INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR\r
-               \r
-       }\r
-       \r
-       final SessionImplSocket session;\r
-       final fi.vtt.simantics.procore.internal.State transactionState;\r
-       final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();\r
-       final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();\r
-\r
-       State state = State.INIT;\r
-\r
-       private boolean legal(State current, State target) {\r
-               \r
-               if(State.IDLE == current) {\r
-                       \r
-                       return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;\r
-                       \r
-               } else if (State.WRITE == current) {\r
-                       \r
-                       return State.WRITE_UPDATE == target;\r
-\r
-               } else if (State.READ == current) {\r
-                       \r
-                       return State.READ_UPDATE == target;\r
-\r
-               } else if (State.ERROR == current) {\r
-                       \r
-                       return State.IDLE == target;\r
-                       \r
-               } else {\r
-                       \r
-                       return State.IDLE == target;\r
-                       \r
-               }\r
-               \r
-       }\r
-       \r
-       private synchronized void setState(int thread, State state, SessionTask task) {\r
-\r
-               if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);\r
-               if(this.state == state) return;\r
-               \r
-               assert(legal(this.state, state));\r
-               \r
-               if(State.READ == state) {\r
-                       \r
-                       assert(State.IDLE == this.state);\r
-                       \r
-               }\r
-               \r
-               State oldState = this.state;\r
-               this.state = state;\r
-               \r
-               if(State.IDLE == oldState && State.IDLE != state) {\r
-                   session.incAsync();\r
-               }\r
-               \r
-               /*\r
-                * We are now executing in new state.\r
-                * The methods below SHALL NOT start any task\r
-                * immediately. All tasks shall be scheduled.\r
-                * \r
-                */\r
-               if(State.READ == state) startRead(thread, (SessionRead)task);\r
-               else if(State.WRITE == state) startWrite(thread, task);\r
-               else if(State.READ == oldState) startReadUpdate(thread);\r
-               else if(State.WRITE == oldState) startWriteUpdate(thread);\r
-\r
-               // Check the need for new transaction\r
-               if(state == State.IDLE) {\r
-\r
-                       closeRandomAccessValues();\r
-\r
-                       if(!writes.isEmpty()) {\r
-                               setState(thread, State.WRITE, writes.poll());\r
-                       } else if (!reads.isEmpty()) {\r
-                               setState(thread, State.READ, reads.poll());\r
-                       }\r
-\r
-            session.decAsync();\r
-\r
-               }\r
-               \r
-       }\r
-       \r
-       public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {\r
-               this.session = session;\r
-               this.transactionState = transactionState;\r
-       }\r
-       \r
-       public synchronized void startRead(int thread, final SessionRead task) {\r
-               \r
-               session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) {\r
-\r
-                       @Override\r
-            public void run(int thread) {\r
-                try {\r
-                    transactionState.startReadTransaction(thread);\r
-                    task.run(thread);\r
-                } catch (Throwable t) {\r
-                    Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t));\r
-                    if(task.throwable != null)\r
-                        task.throwable.set(t);\r
-                    state = State.ERROR;\r
-                } finally {\r
-                    if(task.notify != null)\r
-                        task.notify.release();\r
-                }\r
-            }\r
-                       \r
-               });\r
-               \r
-       }\r
-       \r
-       public synchronized void startReadUpdate(int thread) {\r
-               \r
-               session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {\r
-\r
-                       @Override\r
-                       public void run(int thread) {\r
-\r
-                               session.fireFinishReadTransaction();\r
-\r
-                               try {\r
-                                       transactionState.stopReadTransaction();\r
-                               } catch (DatabaseException e) {\r
-                                       e.printStackTrace();\r
-                               }\r
-                               \r
-                       }\r
-                       \r
-               });\r
-               \r
-       }\r
-\r
-//     public synchronized void stopRead(int thread) {\r
-//             \r
-//             try {\r
-//                     transactionState.stopReadTransaction();\r
-//             } catch (DatabaseException e) {\r
-//                     e.printStackTrace();\r
-//             }\r
-//             \r
-//     }\r
-\r
-       public synchronized void startWrite(int thread, final SessionTask task) {\r
-               \r
-               session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) {\r
-\r
-                       @Override\r
-                       public void run(int thread) {\r
-\r
-                       try {\r
-                               transactionState.startWriteTransaction(thread);\r
-                       } catch (Throwable t) {\r
-                           DatabaseException e = new DatabaseException("Write transaction could not be started", t);\r
-                           Logger.defaultLogError(e);\r
-                           return;\r
-                       }\r
-                               task.run(thread);\r
-                               \r
-                       }\r
-                       \r
-               });\r
-               \r
-       }\r
-\r
-       public synchronized void startWriteUpdate(int thread) {\r
-               \r
-               session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {\r
-\r
-                       @Override\r
-                       public void run(int thread) {\r
-\r
-                               // Support for DelayedWriteRequest cancels during the\r
-                               // read-only part of the request.\r
-                               WriteStateBase<?> delayedState = session.delayedWriteState;\r
-                               session.delayedWriteState = null;\r
-                               if (delayedState != null) {\r
-                                       if (session.writeState != null)\r
-                                               throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");\r
-\r
-                       if (delayedState.isExcepted()) {\r
-                        // There can never be any changes in the cluster stream.\r
-                        // In this case, don't bother rolling back.\r
-                           boolean empty = session.clusterStream.reallyFlush(); \r
-                        assert(empty);\r
-                           transactionState.stopWriteTransaction(session.clusterStream);\r
-                       } else {\r
-                           throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");\r
-                       }\r
-                       session.clientChanges = new ClientChangesImpl(session);\r
-                                       delayedState.finish();\r
-                                       return;\r
-                               }\r
-\r
-                               // The session could have been terminated\r
-                               if(!session.state.isAlive()) return;\r
-\r
-                               WriteState<?> writeState = session.writeState;\r
-                               WriteGraphImpl graph = writeState.getGraph();\r
-\r
-                               if(writeState.isExcepted()) {\r
-                                   \r
-                                   if(!(writeState.exception instanceof CancelTransactionException))\r
-                                       writeState.exception.printStackTrace();\r
-                                   \r
-                                   transactionState.cancelWriteTransaction(graph);\r
-\r
-                               } else {\r
-\r
-                                       session.handleUpdatesAndMetadata(graph);\r
-\r
-//                    long start = System.nanoTime();\r
-                                       transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);\r
-//                                     long duration = System.nanoTime() - start;\r
-//                    System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");\r
-\r
-                               }\r
-\r
-                               session.clientChanges = new ClientChangesImpl(session);\r
-\r
-                               WriteState<?> state = session.writeState;\r
-                               state.finish();\r
-                               session.writeState = null;\r
-\r
-                       }\r
-\r
-               });\r
-\r
-       }\r
-\r
-//     public synchronized void stopWrite(int thread) {\r
-//             \r
-//        session.clientChanges = new ClientChangesImpl(session);\r
-//\r
-//        WriteState<?> state = session.writeState;\r
-//\r
-//        System.err.println("D");\r
-//        state.finish();\r
-//        System.err.println("E");\r
-//        \r
-//        session.writeState = null;\r
-//        \r
-//     }\r
-\r
-       public synchronized void ceased(int thread) {\r
-\r
-               if(State.WRITE == state) {\r
-\r
-                       setState(thread, State.WRITE_UPDATE, null);\r
-                       \r
-               } else if (State.WRITE_UPDATE == state) {\r
-\r
-                       setState(thread, State.IDLE, null);\r
-\r
-               } else if (State.READ_UPDATE == state) {\r
-\r
-                       setState(thread, State.IDLE, null);\r
-                       \r
-               } else if (State.READ == state) {\r
-\r
-                       if (!reads.isEmpty()) {\r
-\r
-                               final SessionRead read = reads.poll();\r
-                               session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) {\r
-\r
-                                       @Override\r
-                                       public void run(int thread) {\r
-                                               read.run(thread);\r
-                                               if(read.notify != null) read.notify.release();\r
-                                       }\r
-                                       \r
-                               });     \r
-                               \r
-                       } else {\r
-                               \r
-                               setState(thread, State.READ_UPDATE, null);\r
-                               \r
-                       }\r
-\r
-               } else if (State.INIT == state) {\r
-                       \r
-                       assert(reads.isEmpty());\r
-                       assert(writes.isEmpty());\r
-                       setState(thread, State.IDLE, null);\r
-                       \r
-               } else if (State.ERROR == state) {\r
-                       \r
-                       setState(thread, State.IDLE, null);\r
-                       \r
-               } else {\r
-\r
-                   throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); \r
-                       \r
-               }\r
-               \r
-       }\r
-       \r
-    public synchronized void scheduleRead(final SessionRead task) {\r
-\r
-               assert(State.INIT != state);\r
-               \r
-               if(State.READ == state) {\r
-                       session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) {\r
-\r
-                               @Override\r
-                               public void run(int thread) {\r
-                                       try {\r
-                                               task.run(thread);\r
-                                       } finally {\r
-                                               if (task.notify != null) task.notify.release();\r
-                                       }\r
-                               }\r
-                               \r
-                       });\r
-               } else if (State.IDLE == state) {\r
-                       setState(Integer.MIN_VALUE, State.READ, task);\r
-               } else {\r
-                       reads.offer(task);\r
-               }\r
-               \r
-       }\r
-\r
-    public int lastUpdateIndex() {\r
-        return writes.size();\r
-    }\r
-    \r
-       public synchronized void scheduleWrite(SessionTask task) {\r
-\r
-               scheduleWrite(task, null);\r
-\r
-       }\r
-\r
-       /**\r
-        * @param task the task to schedule \r
-        * @param combine <code>true</code> or <code>false</code> to explicitly\r
-        *        specify whether to combine the write with the previous operation\r
-        *        or <code>null</code> to use the default logic, i.e. combine when\r
-        *        this schedule is performed from within update processing (state ==\r
-        *        State.WRITE_UPDATE).\r
-        */\r
-       public synchronized void scheduleWrite(SessionTask task, Boolean combine) {\r
-\r
-               boolean inUpdate = state == State.WRITE_UPDATE;\r
-\r
-               assert(State.INIT != state);\r
-               //task.combine = combine != null ? combine : inUpdate;\r
-               if(State.IDLE == state) {\r
-                       setState(Integer.MIN_VALUE, State.WRITE, task);\r
-               } else {\r
-                       if(inUpdate) {\r
-                               int index = lastUpdateIndex();\r
-                               if(index == writes.size()) writes.offer(task);\r
-                               else writes.add(index, task);\r
-                       } else {\r
-                               writes.offer(task);\r
-                       }\r
-               }\r
-\r
-       }\r
-\r
-       /**\r
-        * Unregisters and closes all currently registered ResourceData random\r
-        * access value file handles. Any I/O exceptions occurring during closing\r
-        * are logged into {@link Logger}.\r
-        */\r
-       private void closeRandomAccessValues() {\r
-               RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);\r
-               if (ravs == null)\r
-                       return;\r
-               Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();\r
-               for (Pair<Resource, ResourceData> value : values) {\r
-                       try {\r
-                               value.second.binaryFile.close();\r
-                       } catch (IOException e) {\r
-                               Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);\r
-                       }\r
-               }\r
-       }\r
-\r
-}\r
+package fi.vtt.simantics.procore.internal;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.simantics.db.Resource;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.CancelTransactionException;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.graph.WriteGraphImpl;
+import org.simantics.db.impl.internal.RandomAccessValueSupport;
+import org.simantics.db.impl.internal.ResourceData;
+import org.simantics.db.impl.query.QueryProcessor.SessionRead;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
+import org.simantics.db.request.WriteTraits;
+import org.simantics.utils.datastructures.Pair;
+
+public class SessionRequestManager {
+
+       private static boolean DEBUG = false;
+       
+       enum State {
+               
+               INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR
+               
+       }
+       
+       final SessionImplSocket session;
+       final fi.vtt.simantics.procore.internal.State transactionState;
+       final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();
+       final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();
+
+       State state = State.INIT;
+
+       private boolean legal(State current, State target) {
+               
+               if(State.IDLE == current) {
+                       
+                       return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;
+                       
+               } else if (State.WRITE == current) {
+                       
+                       return State.WRITE_UPDATE == target;
+
+               } else if (State.READ == current) {
+                       
+                       return State.READ_UPDATE == target;
+
+               } else if (State.ERROR == current) {
+                       
+                       return State.IDLE == target;
+                       
+               } else {
+                       
+                       return State.IDLE == target;
+                       
+               }
+               
+       }
+       
+       private synchronized void setState(int thread, State state, SessionTask task) {
+
+               if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);
+               if(this.state == state) return;
+               
+               assert(legal(this.state, state));
+               
+               if(State.READ == state) {
+                       
+                       assert(State.IDLE == this.state);
+                       
+               }
+               
+               State oldState = this.state;
+               this.state = state;
+               
+               if(State.IDLE == oldState && State.IDLE != state) {
+                   session.incAsync();
+               }
+               
+               /*
+                * We are now executing in new state.
+                * The methods below SHALL NOT start any task
+                * immediately. All tasks shall be scheduled.
+                * 
+                */
+               if(State.READ == state) startRead(thread, (SessionRead)task);
+               else if(State.WRITE == state) startWrite(thread, task);
+               else if(State.READ == oldState) startReadUpdate(thread);
+               else if(State.WRITE == oldState) startWriteUpdate(thread);
+
+               // Check the need for new transaction
+               if(state == State.IDLE) {
+
+                       closeRandomAccessValues();
+
+                       if(!writes.isEmpty()) {
+                               setState(thread, State.WRITE, writes.poll());
+                       } else if (!reads.isEmpty()) {
+                               setState(thread, State.READ, reads.poll());
+                       }
+
+            session.decAsync();
+
+               }
+               
+       }
+       
+       public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {
+               this.session = session;
+               this.transactionState = transactionState;
+       }
+       
+       public synchronized void startRead(int thread, final SessionRead task) {
+               
+               session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) {
+
+                       @Override
+            public void run(int thread) {
+                try {
+                    transactionState.startReadTransaction(thread);
+                    task.run(thread);
+                } catch (Throwable t) {
+                    Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t));
+                    if(task.throwable != null)
+                        task.throwable.set(t);
+                    state = State.ERROR;
+                } finally {
+                    if(task.notify != null)
+                        task.notify.release();
+                }
+            }
+                       
+               });
+               
+       }
+       
+       public synchronized void startReadUpdate(int thread) {
+               
+               session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
+
+                       @Override
+                       public void run(int thread) {
+
+                               session.fireFinishReadTransaction();
+
+                               try {
+                                       transactionState.stopReadTransaction();
+                               } catch (DatabaseException e) {
+                                       e.printStackTrace();
+                               }
+                               
+                       }
+                       
+               });
+               
+       }
+
+//     public synchronized void stopRead(int thread) {
+//             
+//             try {
+//                     transactionState.stopReadTransaction();
+//             } catch (DatabaseException e) {
+//                     e.printStackTrace();
+//             }
+//             
+//     }
+
+       public synchronized void startWrite(int thread, final SessionTask task) {
+               
+               session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) {
+
+                       @Override
+                       public void run(int thread) {
+
+                       try {
+                               transactionState.startWriteTransaction(thread);
+                       } catch (Throwable t) {
+                           DatabaseException e = new DatabaseException("Write transaction could not be started", t);
+                           Logger.defaultLogError(e);
+                           return;
+                       }
+                               task.run(thread);
+                               
+                       }
+                       
+               });
+               
+       }
+
+       public synchronized void startWriteUpdate(int thread) {
+               
+               session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {
+
+                       @Override
+                       public void run(int thread) {
+
+                               // Support for DelayedWriteRequest cancels during the
+                               // read-only part of the request.
+                               WriteStateBase<?> delayedState = session.delayedWriteState;
+                               session.delayedWriteState = null;
+                               if (delayedState != null) {
+                                       if (session.writeState != null)
+                                               throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");
+
+                       if (delayedState.isExcepted()) {
+                        // There can never be any changes in the cluster stream.
+                        // In this case, don't bother rolling back.
+                           boolean empty = session.clusterStream.reallyFlush(); 
+                        assert(empty);
+                           transactionState.stopWriteTransaction(session.clusterStream);
+                       } else {
+                           throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");
+                       }
+                       session.clientChanges = new ClientChangesImpl(session);
+                                       delayedState.finish();
+                                       return;
+                               }
+
+                               // The session could have been terminated
+                               if(!session.state.isAlive()) return;
+
+                               WriteState<?> writeState = session.writeState;
+                               WriteGraphImpl graph = writeState.getGraph();
+
+                               if(writeState.isExcepted()) {
+                                   
+                                   if(!(writeState.exception instanceof CancelTransactionException))
+                                       writeState.exception.printStackTrace();
+                                   
+                                   transactionState.cancelWriteTransaction(graph);
+
+                               } else {
+
+                                       session.handleUpdatesAndMetadata(graph);
+
+//                    long start = System.nanoTime();
+                                       transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);
+//                                     long duration = System.nanoTime() - start;
+//                    System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");
+
+                               }
+
+                               session.clientChanges = new ClientChangesImpl(session);
+
+                               WriteState<?> state = session.writeState;
+                               state.finish();
+                               session.writeState = null;
+
+                       }
+
+               });
+
+       }
+
+//     public synchronized void stopWrite(int thread) {
+//             
+//        session.clientChanges = new ClientChangesImpl(session);
+//
+//        WriteState<?> state = session.writeState;
+//
+//        System.err.println("D");
+//        state.finish();
+//        System.err.println("E");
+//        
+//        session.writeState = null;
+//        
+//     }
+
+       public synchronized void ceased(int thread) {
+
+               if(State.WRITE == state) {
+
+                       setState(thread, State.WRITE_UPDATE, null);
+                       
+               } else if (State.WRITE_UPDATE == state) {
+
+                       setState(thread, State.IDLE, null);
+
+               } else if (State.READ_UPDATE == state) {
+
+                       setState(thread, State.IDLE, null);
+                       
+               } else if (State.READ == state) {
+
+                       if (!reads.isEmpty()) {
+
+                               final SessionRead read = reads.poll();
+                               session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) {
+
+                                       @Override
+                                       public void run(int thread) {
+                                               read.run(thread);
+                                               if(read.notify != null) read.notify.release();
+                                       }
+                                       
+                               });     
+                               
+                       } else {
+                               
+                               setState(thread, State.READ_UPDATE, null);
+                               
+                       }
+
+               } else if (State.INIT == state) {
+                       
+                       assert(reads.isEmpty());
+                       assert(writes.isEmpty());
+                       setState(thread, State.IDLE, null);
+                       
+               } else if (State.ERROR == state) {
+                       
+                       setState(thread, State.IDLE, null);
+                       
+               } else {
+
+                   throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); 
+                       
+               }
+               
+       }
+       
+    public synchronized void scheduleRead(final SessionRead task) {
+
+               assert(State.INIT != state);
+               
+               if(State.READ == state) {
+                       session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) {
+
+                               @Override
+                               public void run(int thread) {
+                                       try {
+                                               task.run(thread);
+                                       } finally {
+                                               if (task.notify != null) task.notify.release();
+                                       }
+                               }
+                               
+                       });
+               } else if (State.IDLE == state) {
+                       setState(Integer.MIN_VALUE, State.READ, task);
+               } else {
+                       reads.offer(task);
+               }
+               
+       }
+
+    public int lastUpdateIndex() {
+        return writes.size();
+    }
+    
+       public synchronized void scheduleWrite(SessionTask task) {
+
+               scheduleWrite(task, null);
+
+       }
+
+       /**
+        * @param task the task to schedule 
+        * @param combine <code>true</code> or <code>false</code> to explicitly
+        *        specify whether to combine the write with the previous operation
+        *        or <code>null</code> to use the default logic, i.e. combine when
+        *        this schedule is performed from within update processing (state ==
+        *        State.WRITE_UPDATE).
+        */
+       public synchronized void scheduleWrite(SessionTask task, Boolean combine) {
+
+               boolean inUpdate = state == State.WRITE_UPDATE;
+
+               assert(State.INIT != state);
+               //task.combine = combine != null ? combine : inUpdate;
+               if(State.IDLE == state) {
+                       setState(Integer.MIN_VALUE, State.WRITE, task);
+               } else {
+                       if(inUpdate) {
+                               int index = lastUpdateIndex();
+                               if(index == writes.size()) writes.offer(task);
+                               else writes.add(index, task);
+                       } else {
+                               writes.offer(task);
+                       }
+               }
+
+       }
+
+       /**
+        * Unregisters and closes all currently registered ResourceData random
+        * access value file handles. Any I/O exceptions occurring during closing
+        * are logged into {@link Logger}.
+        */
+       private void closeRandomAccessValues() {
+               RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);
+               if (ravs == null)
+                       return;
+               Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();
+               for (Pair<Resource, ResourceData> value : values) {
+                       try {
+                               value.second.binaryFile.close();
+                       } catch (IOException e) {
+                               Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);
+                       }
+               }
+       }
+
+}