-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.io.IOException;\r
-import java.util.Collection;\r
-import java.util.LinkedList;\r
-\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.exception.CancelTransactionException;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.impl.graph.WriteGraphImpl;\r
-import org.simantics.db.impl.internal.RandomAccessValueSupport;\r
-import org.simantics.db.impl.internal.ResourceData;\r
-import org.simantics.db.impl.query.QueryProcessor.SessionRead;\r
-import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
-import org.simantics.db.request.WriteTraits;\r
-import org.simantics.utils.datastructures.Pair;\r
-\r
-public class SessionRequestManager {\r
-\r
- private static boolean DEBUG = false;\r
- \r
- enum State {\r
- \r
- INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR\r
- \r
- }\r
- \r
- final SessionImplSocket session;\r
- final fi.vtt.simantics.procore.internal.State transactionState;\r
- final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();\r
- final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();\r
-\r
- State state = State.INIT;\r
-\r
- private boolean legal(State current, State target) {\r
- \r
- if(State.IDLE == current) {\r
- \r
- return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;\r
- \r
- } else if (State.WRITE == current) {\r
- \r
- return State.WRITE_UPDATE == target;\r
-\r
- } else if (State.READ == current) {\r
- \r
- return State.READ_UPDATE == target;\r
-\r
- } else if (State.ERROR == current) {\r
- \r
- return State.IDLE == target;\r
- \r
- } else {\r
- \r
- return State.IDLE == target;\r
- \r
- }\r
- \r
- }\r
- \r
- private synchronized void setState(int thread, State state, SessionTask task) {\r
-\r
- if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);\r
- if(this.state == state) return;\r
- \r
- assert(legal(this.state, state));\r
- \r
- if(State.READ == state) {\r
- \r
- assert(State.IDLE == this.state);\r
- \r
- }\r
- \r
- State oldState = this.state;\r
- this.state = state;\r
- \r
- if(State.IDLE == oldState && State.IDLE != state) {\r
- session.incAsync();\r
- }\r
- \r
- /*\r
- * We are now executing in new state.\r
- * The methods below SHALL NOT start any task\r
- * immediately. All tasks shall be scheduled.\r
- * \r
- */\r
- if(State.READ == state) startRead(thread, (SessionRead)task);\r
- else if(State.WRITE == state) startWrite(thread, task);\r
- else if(State.READ == oldState) startReadUpdate(thread);\r
- else if(State.WRITE == oldState) startWriteUpdate(thread);\r
-\r
- // Check the need for new transaction\r
- if(state == State.IDLE) {\r
-\r
- closeRandomAccessValues();\r
-\r
- if(!writes.isEmpty()) {\r
- setState(thread, State.WRITE, writes.poll());\r
- } else if (!reads.isEmpty()) {\r
- setState(thread, State.READ, reads.poll());\r
- }\r
-\r
- session.decAsync();\r
-\r
- }\r
- \r
- }\r
- \r
- public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {\r
- this.session = session;\r
- this.transactionState = transactionState;\r
- }\r
- \r
- public synchronized void startRead(int thread, final SessionRead task) {\r
- \r
- session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) {\r
-\r
- @Override\r
- public void run(int thread) {\r
- try {\r
- transactionState.startReadTransaction(thread);\r
- task.run(thread);\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t));\r
- if(task.throwable != null)\r
- task.throwable.set(t);\r
- state = State.ERROR;\r
- } finally {\r
- if(task.notify != null)\r
- task.notify.release();\r
- }\r
- }\r
- \r
- });\r
- \r
- }\r
- \r
- public synchronized void startReadUpdate(int thread) {\r
- \r
- session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {\r
-\r
- @Override\r
- public void run(int thread) {\r
-\r
- session.fireFinishReadTransaction();\r
-\r
- try {\r
- transactionState.stopReadTransaction();\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- }\r
- \r
- }\r
- \r
- });\r
- \r
- }\r
-\r
-// public synchronized void stopRead(int thread) {\r
-// \r
-// try {\r
-// transactionState.stopReadTransaction();\r
-// } catch (DatabaseException e) {\r
-// e.printStackTrace();\r
-// }\r
-// \r
-// }\r
-\r
- public synchronized void startWrite(int thread, final SessionTask task) {\r
- \r
- session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) {\r
-\r
- @Override\r
- public void run(int thread) {\r
-\r
- try {\r
- transactionState.startWriteTransaction(thread);\r
- } catch (Throwable t) {\r
- DatabaseException e = new DatabaseException("Write transaction could not be started", t);\r
- Logger.defaultLogError(e);\r
- return;\r
- }\r
- task.run(thread);\r
- \r
- }\r
- \r
- });\r
- \r
- }\r
-\r
- public synchronized void startWriteUpdate(int thread) {\r
- \r
- session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) {\r
-\r
- @Override\r
- public void run(int thread) {\r
-\r
- // Support for DelayedWriteRequest cancels during the\r
- // read-only part of the request.\r
- WriteStateBase<?> delayedState = session.delayedWriteState;\r
- session.delayedWriteState = null;\r
- if (delayedState != null) {\r
- if (session.writeState != null)\r
- throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");\r
-\r
- if (delayedState.isExcepted()) {\r
- // There can never be any changes in the cluster stream.\r
- // In this case, don't bother rolling back.\r
- boolean empty = session.clusterStream.reallyFlush(); \r
- assert(empty);\r
- transactionState.stopWriteTransaction(session.clusterStream);\r
- } else {\r
- throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");\r
- }\r
- session.clientChanges = new ClientChangesImpl(session);\r
- delayedState.finish();\r
- return;\r
- }\r
-\r
- // The session could have been terminated\r
- if(!session.state.isAlive()) return;\r
-\r
- WriteState<?> writeState = session.writeState;\r
- WriteGraphImpl graph = writeState.getGraph();\r
-\r
- if(writeState.isExcepted()) {\r
- \r
- if(!(writeState.exception instanceof CancelTransactionException))\r
- writeState.exception.printStackTrace();\r
- \r
- transactionState.cancelWriteTransaction(graph);\r
-\r
- } else {\r
-\r
- session.handleUpdatesAndMetadata(graph);\r
-\r
-// long start = System.nanoTime();\r
- transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);\r
-// long duration = System.nanoTime() - start;\r
-// System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");\r
-\r
- }\r
-\r
- session.clientChanges = new ClientChangesImpl(session);\r
-\r
- WriteState<?> state = session.writeState;\r
- state.finish();\r
- session.writeState = null;\r
-\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
-// public synchronized void stopWrite(int thread) {\r
-// \r
-// session.clientChanges = new ClientChangesImpl(session);\r
-//\r
-// WriteState<?> state = session.writeState;\r
-//\r
-// System.err.println("D");\r
-// state.finish();\r
-// System.err.println("E");\r
-// \r
-// session.writeState = null;\r
-// \r
-// }\r
-\r
- public synchronized void ceased(int thread) {\r
-\r
- if(State.WRITE == state) {\r
-\r
- setState(thread, State.WRITE_UPDATE, null);\r
- \r
- } else if (State.WRITE_UPDATE == state) {\r
-\r
- setState(thread, State.IDLE, null);\r
-\r
- } else if (State.READ_UPDATE == state) {\r
-\r
- setState(thread, State.IDLE, null);\r
- \r
- } else if (State.READ == state) {\r
-\r
- if (!reads.isEmpty()) {\r
-\r
- final SessionRead read = reads.poll();\r
- session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) {\r
-\r
- @Override\r
- public void run(int thread) {\r
- read.run(thread);\r
- if(read.notify != null) read.notify.release();\r
- }\r
- \r
- }); \r
- \r
- } else {\r
- \r
- setState(thread, State.READ_UPDATE, null);\r
- \r
- }\r
-\r
- } else if (State.INIT == state) {\r
- \r
- assert(reads.isEmpty());\r
- assert(writes.isEmpty());\r
- setState(thread, State.IDLE, null);\r
- \r
- } else if (State.ERROR == state) {\r
- \r
- setState(thread, State.IDLE, null);\r
- \r
- } else {\r
-\r
- throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); \r
- \r
- }\r
- \r
- }\r
- \r
- public synchronized void scheduleRead(final SessionRead task) {\r
-\r
- assert(State.INIT != state);\r
- \r
- if(State.READ == state) {\r
- session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) {\r
-\r
- @Override\r
- public void run(int thread) {\r
- try {\r
- task.run(thread);\r
- } finally {\r
- if (task.notify != null) task.notify.release();\r
- }\r
- }\r
- \r
- });\r
- } else if (State.IDLE == state) {\r
- setState(Integer.MIN_VALUE, State.READ, task);\r
- } else {\r
- reads.offer(task);\r
- }\r
- \r
- }\r
-\r
- public int lastUpdateIndex() {\r
- return writes.size();\r
- }\r
- \r
- public synchronized void scheduleWrite(SessionTask task) {\r
-\r
- scheduleWrite(task, null);\r
-\r
- }\r
-\r
- /**\r
- * @param task the task to schedule \r
- * @param combine <code>true</code> or <code>false</code> to explicitly\r
- * specify whether to combine the write with the previous operation\r
- * or <code>null</code> to use the default logic, i.e. combine when\r
- * this schedule is performed from within update processing (state ==\r
- * State.WRITE_UPDATE).\r
- */\r
- public synchronized void scheduleWrite(SessionTask task, Boolean combine) {\r
-\r
- boolean inUpdate = state == State.WRITE_UPDATE;\r
-\r
- assert(State.INIT != state);\r
- //task.combine = combine != null ? combine : inUpdate;\r
- if(State.IDLE == state) {\r
- setState(Integer.MIN_VALUE, State.WRITE, task);\r
- } else {\r
- if(inUpdate) {\r
- int index = lastUpdateIndex();\r
- if(index == writes.size()) writes.offer(task);\r
- else writes.add(index, task);\r
- } else {\r
- writes.offer(task);\r
- }\r
- }\r
-\r
- }\r
-\r
- /**\r
- * Unregisters and closes all currently registered ResourceData random\r
- * access value file handles. Any I/O exceptions occurring during closing\r
- * are logged into {@link Logger}.\r
- */\r
- private void closeRandomAccessValues() {\r
- RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);\r
- if (ravs == null)\r
- return;\r
- Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();\r
- for (Pair<Resource, ResourceData> value : values) {\r
- try {\r
- value.second.binaryFile.close();\r
- } catch (IOException e) {\r
- Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);\r
- }\r
- }\r
- }\r
-\r
-}\r
+package fi.vtt.simantics.procore.internal;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.simantics.db.Disposable;
+import org.simantics.db.Resource;
+import org.simantics.db.exception.CancelTransactionException;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.graph.WriteGraphImpl;
+import org.simantics.db.impl.internal.RandomAccessValueSupport;
+import org.simantics.db.impl.internal.ResourceData;
+import org.simantics.db.impl.query.QueryProcessor.SessionRead;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
+import org.simantics.db.request.WriteTraits;
+import org.simantics.utils.datastructures.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SessionRequestManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionRequestManager.class);
+
+ private static boolean DEBUG = false;
+
+ enum State {
+
+ INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR
+
+ }
+
+ final SessionImplSocket session;
+ final fi.vtt.simantics.procore.internal.State transactionState;
+ final LinkedList<SessionTask> writes = new LinkedList<SessionTask>();
+ final LinkedList<SessionRead> reads = new LinkedList<SessionRead>();
+
+ State state = State.INIT;
+
+ private boolean legal(State current, State target) {
+
+ if(State.IDLE == current) {
+
+ return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target;
+
+ } else if (State.WRITE == current) {
+
+ return State.WRITE_UPDATE == target;
+
+ } else if (State.READ == current) {
+
+ return State.READ_UPDATE == target;
+
+ } else if (State.ERROR == current) {
+
+ return State.IDLE == target;
+
+ } else {
+
+ return State.IDLE == target;
+
+ }
+
+ }
+
+ private synchronized void setState(int thread, State state, SessionTask task) {
+
+ if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task);
+ if(this.state == state) return;
+
+ assert(legal(this.state, state));
+
+ if(State.READ == state) {
+
+ assert(State.IDLE == this.state);
+
+ }
+
+ State oldState = this.state;
+ this.state = state;
+
+ if(State.IDLE == oldState && State.IDLE != state) {
+ session.incAsync();
+ }
+
+ /*
+ * We are now executing in new state.
+ * The methods below SHALL NOT start any task
+ * immediately. All tasks shall be scheduled.
+ *
+ */
+ if(State.READ == state) startRead(thread, (SessionRead)task);
+ else if(State.WRITE == state) startWrite(thread, task);
+ else if(State.READ == oldState) startReadUpdate(thread);
+ else if(State.WRITE == oldState) startWriteUpdate(thread);
+
+ // Check the need for new transaction
+ if(state == State.IDLE) {
+
+ closeRandomAccessValues();
+
+ if(!writes.isEmpty()) {
+ setState(thread, State.WRITE, writes.poll());
+ } else if (!reads.isEmpty()) {
+ setState(thread, State.READ, reads.poll());
+ }
+
+ session.decAsync();
+
+ }
+
+ }
+
+ public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) {
+ this.session = session;
+ this.transactionState = transactionState;
+ }
+
+ public synchronized void startRead(int thread, final SessionRead task) {
+
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
+
+ @Override
+ public void run0(int thread) {
+ try {
+ transactionState.startReadTransaction(thread);
+ task.run(thread);
+ } catch (Throwable t) {
+ LOGGER.error("Read transaction could not be started", t);
+ if(task.throwable != null)
+ task.throwable.set(t);
+ state = State.ERROR;
+ } finally {
+ if(task.notify != null)
+ task.notify.release();
+ }
+ }
+
+ });
+
+ }
+
+ public synchronized void startReadUpdate(int thread) {
+
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
+
+ @Override
+ public void run0(int thread) {
+
+ session.fireFinishReadTransaction();
+
+ try {
+ transactionState.stopReadTransaction();
+ } catch (DatabaseException e) {
+ LOGGER.error("Read transaction could not be stopped", e);
+ }
+
+ }
+
+ });
+
+ }
+
+ public synchronized void startWrite(int thread, final SessionTask task) {
+
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
+
+ @Override
+ public void run0(int thread) {
+
+ try {
+ transactionState.startWriteTransaction(thread);
+ } catch (Throwable t) {
+ LOGGER.error("Write transaction could not be started", t);
+ return;
+ }
+ task.run(thread);
+
+ }
+
+ });
+
+ }
+
+ public synchronized void startWriteUpdate(int thread) {
+
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
+
+ @Override
+ public void run0(int thread) {
+
+ // Support for DelayedWriteRequest cancels during the
+ // read-only part of the request.
+ WriteStateBase<?> delayedState = session.delayedWriteState;
+ session.delayedWriteState = null;
+ if (delayedState != null) {
+ if (session.writeState != null)
+ throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set.");
+
+ if (delayedState.isExcepted()) {
+ // There can never be any changes in the cluster stream.
+ // In this case, don't bother rolling back.
+ boolean empty = session.clusterStream.reallyFlush();
+ assert(empty);
+ transactionState.stopWriteTransaction(session.clusterStream);
+ } else {
+ throw new UnsupportedOperationException("delayedWriteState may only exist when request fails.");
+ }
+ Disposable.safeDispose(session.clientChanges);
+ session.clientChanges = new ClientChangesImpl(session);
+ delayedState.finish();
+ return;
+ }
+
+ // The session could have been terminated
+ if(!session.state.isAlive()) return;
+
+ WriteState<?> writeState = session.writeState;
+
+ assert(writeState != null);
+
+ WriteGraphImpl graph = writeState.getGraph();
+
+ if(writeState.isExcepted()) {
+
+ if(!(writeState.exception instanceof CancelTransactionException))
+ LOGGER.error("Write request failed", writeState.exception);
+
+ transactionState.cancelWriteTransaction(graph);
+
+ } else {
+
+ session.handleUpdatesAndMetadata(graph);
+
+// long start = System.nanoTime();
+ transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null);
+// long duration = System.nanoTime() - start;
+// System.out.println("commitWriteTransaction " + 1e-9*duration + "s. ");
+
+ }
+
+ Disposable.safeDispose(session.clientChanges);
+ session.clientChanges = new ClientChangesImpl(session);
+
+ WriteState<?> state = session.writeState;
+ state.finish();
+ session.writeState = null;
+
+ }
+
+ });
+
+ }
+
+ public synchronized void ceased(int thread) {
+
+ if(State.WRITE == state) {
+
+ setState(thread, State.WRITE_UPDATE, null);
+
+ } else if (State.WRITE_UPDATE == state) {
+
+ setState(thread, State.IDLE, null);
+
+ } else if (State.READ_UPDATE == state) {
+
+ setState(thread, State.IDLE, null);
+
+ } else if (State.READ == state) {
+
+ if (!reads.isEmpty()) {
+
+ final SessionRead read = reads.poll();
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
+
+ @Override
+ public void run0(int thread) {
+ read.run(thread);
+ if(read.notify != null) read.notify.release();
+ }
+
+ });
+
+ } else {
+
+ setState(thread, State.READ_UPDATE, null);
+
+ }
+
+ } else if (State.INIT == state) {
+
+ assert(reads.isEmpty());
+ assert(writes.isEmpty());
+ setState(thread, State.IDLE, null);
+
+ } else if (State.ERROR == state) {
+
+ setState(thread, State.IDLE, null);
+
+ } else {
+
+ // Spurious wakeup
+
+ //throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")");
+
+ }
+
+ }
+
+ public synchronized void scheduleRead(final SessionRead task) {
+
+ assert(State.INIT != state);
+
+ if(State.READ == state) {
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
+
+ @Override
+ public void run0(int thread) {
+ try {
+ task.run(thread);
+ } finally {
+ if (task.notify != null) task.notify.release();
+ }
+ }
+
+ });
+ } else if (State.IDLE == state) {
+ setState(Integer.MIN_VALUE, State.READ, task);
+ } else {
+ reads.offer(task);
+ }
+
+ }
+
+ public int lastUpdateIndex() {
+ return writes.size();
+ }
+
+ public synchronized void scheduleWrite(SessionTask task) {
+
+ scheduleWrite(task, null);
+
+ }
+
+ /**
+ * @param task the task to schedule
+ * @param combine <code>true</code> or <code>false</code> to explicitly
+ * specify whether to combine the write with the previous operation
+ * or <code>null</code> to use the default logic, i.e. combine when
+ * this schedule is performed from within update processing (state ==
+ * State.WRITE_UPDATE).
+ */
+ public synchronized void scheduleWrite(SessionTask task, Boolean combine) {
+
+ boolean inUpdate = state == State.WRITE_UPDATE;
+
+ //System.err.println("schedule write " + task);
+
+ assert(State.INIT != state);
+ //task.combine = combine != null ? combine : inUpdate;
+ if(State.IDLE == state) {
+ setState(Integer.MIN_VALUE, State.WRITE, task);
+ } else {
+ if(inUpdate) {
+ int index = lastUpdateIndex();
+ if(index == writes.size()) writes.offer(task);
+ else writes.add(index, task);
+ } else {
+ writes.offer(task);
+ }
+ }
+
+ }
+
+ /**
+ * Unregisters and closes all currently registered ResourceData random
+ * access value file handles. Any I/O exceptions occurring during closing
+ * are logged into {@link Logger}.
+ */
+ private void closeRandomAccessValues() {
+ RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class);
+ if (ravs == null)
+ return;
+ Collection<Pair<Resource, ResourceData>> values = ravs.removeAll();
+ for (Pair<Resource, ResourceData> value : values) {
+ try {
+ value.second.binaryFile.close();
+ } catch (IOException e) {
+ LOGGER.error("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e);
+ }
+ }
+ }
+
+}