X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionRequestManager.java;fp=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionRequestManager.java;h=cb1e2e0cfc7e3505ab8c85f2fd1487507123e4f5;hp=0000000000000000000000000000000000000000;hb=969bd23cab98a79ca9101af33334000879fb60c5;hpb=866dba5cd5a3929bbeae85991796acb212338a08 diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java new file mode 100644 index 000000000..cb1e2e0cf --- /dev/null +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java @@ -0,0 +1,406 @@ +package fi.vtt.simantics.procore.internal; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + +import org.simantics.db.Resource; +import org.simantics.db.common.utils.Logger; +import org.simantics.db.exception.CancelTransactionException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.impl.graph.WriteGraphImpl; +import org.simantics.db.impl.internal.RandomAccessValueSupport; +import org.simantics.db.impl.internal.ResourceData; +import org.simantics.db.impl.query.QueryProcessor.SessionRead; +import org.simantics.db.impl.query.QueryProcessor.SessionTask; +import org.simantics.db.request.WriteTraits; +import org.simantics.utils.datastructures.Pair; + +public class SessionRequestManager { + + private static boolean DEBUG = false; + + enum State { + + INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR + + } + + final SessionImplSocket session; + final fi.vtt.simantics.procore.internal.State transactionState; + final LinkedList writes = new LinkedList(); + final LinkedList reads = new LinkedList(); + + State state = State.INIT; + + private boolean legal(State current, State target) { + + if(State.IDLE == current) { + + return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target; + + } else if (State.WRITE == current) { + + return State.WRITE_UPDATE == target; + + } else if (State.READ == current) { + + return State.READ_UPDATE == target; + + } else if (State.ERROR == current) { + + return State.IDLE == target; + + } else { + + return State.IDLE == target; + + } + + } + + private synchronized void setState(int thread, State state, SessionTask task) { + + if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task); + if(this.state == state) return; + + assert(legal(this.state, state)); + + if(State.READ == state) { + + assert(State.IDLE == this.state); + + } + + State oldState = this.state; + this.state = state; + + if(State.IDLE == oldState && State.IDLE != state) { + session.incAsync(); + } + + /* + * We are now executing in new state. + * The methods below SHALL NOT start any task + * immediately. All tasks shall be scheduled. + * + */ + if(State.READ == state) startRead(thread, (SessionRead)task); + else if(State.WRITE == state) startWrite(thread, task); + else if(State.READ == oldState) startReadUpdate(thread); + else if(State.WRITE == oldState) startWriteUpdate(thread); + + // Check the need for new transaction + if(state == State.IDLE) { + + closeRandomAccessValues(); + + if(!writes.isEmpty()) { + setState(thread, State.WRITE, writes.poll()); + } else if (!reads.isEmpty()) { + setState(thread, State.READ, reads.poll()); + } + + session.decAsync(); + + } + + } + + public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) { + this.session = session; + this.transactionState = transactionState; + } + + public synchronized void startRead(int thread, final SessionRead task) { + + session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) { + + @Override + public void run(int thread) { + try { + transactionState.startReadTransaction(thread); + task.run(thread); + } catch (Throwable t) { + Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t)); + if(task.throwable != null) + task.throwable.set(t); + state = State.ERROR; + } finally { + if(task.notify != null) + task.notify.release(); + } + } + + }); + + } + + public synchronized void startReadUpdate(int thread) { + + session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) { + + @Override + public void run(int thread) { + + session.fireFinishReadTransaction(); + + try { + transactionState.stopReadTransaction(); + } catch (DatabaseException e) { + e.printStackTrace(); + } + + } + + }); + + } + +// public synchronized void stopRead(int thread) { +// +// try { +// transactionState.stopReadTransaction(); +// } catch (DatabaseException e) { +// e.printStackTrace(); +// } +// +// } + + public synchronized void startWrite(int thread, final SessionTask task) { + + session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) { + + @Override + public void run(int thread) { + + try { + transactionState.startWriteTransaction(thread); + } catch (Throwable t) { + DatabaseException e = new DatabaseException("Write transaction could not be started", t); + Logger.defaultLogError(e); + return; + } + task.run(thread); + + } + + }); + + } + + public synchronized void startWriteUpdate(int thread) { + + session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) { + + @Override + public void run(int thread) { + + // Support for DelayedWriteRequest cancels during the + // read-only part of the request. + WriteStateBase delayedState = session.delayedWriteState; + session.delayedWriteState = null; + if (delayedState != null) { + if (session.writeState != null) + throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set."); + + if (delayedState.isExcepted()) { + // There can never be any changes in the cluster stream. + // In this case, don't bother rolling back. + boolean empty = session.clusterStream.reallyFlush(); + assert(empty); + transactionState.stopWriteTransaction(session.clusterStream); + } else { + throw new UnsupportedOperationException("delayedWriteState may only exist when request fails."); + } + session.clientChanges = new ClientChangesImpl(session); + delayedState.finish(); + return; + } + + // The session could have been terminated + if(!session.state.isAlive()) return; + + WriteState writeState = session.writeState; + WriteGraphImpl graph = writeState.getGraph(); + + if(writeState.isExcepted()) { + + if(!(writeState.exception instanceof CancelTransactionException)) + writeState.exception.printStackTrace(); + + transactionState.cancelWriteTransaction(graph); + + } else { + + session.handleUpdatesAndMetadata(graph); + +// long start = System.nanoTime(); + transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null); +// long duration = System.nanoTime() - start; +// System.out.println("commitWriteTransaction " + 1e-9*duration + "s. "); + + } + + session.clientChanges = new ClientChangesImpl(session); + + WriteState state = session.writeState; + state.finish(); + session.writeState = null; + + } + + }); + + } + +// public synchronized void stopWrite(int thread) { +// +// session.clientChanges = new ClientChangesImpl(session); +// +// WriteState state = session.writeState; +// +// System.err.println("D"); +// state.finish(); +// System.err.println("E"); +// +// session.writeState = null; +// +// } + + public synchronized void ceased(int thread) { + + if(State.WRITE == state) { + + setState(thread, State.WRITE_UPDATE, null); + + } else if (State.WRITE_UPDATE == state) { + + setState(thread, State.IDLE, null); + + } else if (State.READ_UPDATE == state) { + + setState(thread, State.IDLE, null); + + } else if (State.READ == state) { + + if (!reads.isEmpty()) { + + final SessionRead read = reads.poll(); + session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) { + + @Override + public void run(int thread) { + read.run(thread); + if(read.notify != null) read.notify.release(); + } + + }); + + } else { + + setState(thread, State.READ_UPDATE, null); + + } + + } else if (State.INIT == state) { + + assert(reads.isEmpty()); + assert(writes.isEmpty()); + setState(thread, State.IDLE, null); + + } else if (State.ERROR == state) { + + setState(thread, State.IDLE, null); + + } else { + + throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); + + } + + } + + public synchronized void scheduleRead(final SessionRead task) { + + assert(State.INIT != state); + + if(State.READ == state) { + session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) { + + @Override + public void run(int thread) { + try { + task.run(thread); + } finally { + if (task.notify != null) task.notify.release(); + } + } + + }); + } else if (State.IDLE == state) { + setState(Integer.MIN_VALUE, State.READ, task); + } else { + reads.offer(task); + } + + } + + public int lastUpdateIndex() { + return writes.size(); + } + + public synchronized void scheduleWrite(SessionTask task) { + + scheduleWrite(task, null); + + } + + /** + * @param task the task to schedule + * @param combine true or false to explicitly + * specify whether to combine the write with the previous operation + * or null to use the default logic, i.e. combine when + * this schedule is performed from within update processing (state == + * State.WRITE_UPDATE). + */ + public synchronized void scheduleWrite(SessionTask task, Boolean combine) { + + boolean inUpdate = state == State.WRITE_UPDATE; + + assert(State.INIT != state); + //task.combine = combine != null ? combine : inUpdate; + if(State.IDLE == state) { + setState(Integer.MIN_VALUE, State.WRITE, task); + } else { + if(inUpdate) { + int index = lastUpdateIndex(); + if(index == writes.size()) writes.offer(task); + else writes.add(index, task); + } else { + writes.offer(task); + } + } + + } + + /** + * Unregisters and closes all currently registered ResourceData random + * access value file handles. Any I/O exceptions occurring during closing + * are logged into {@link Logger}. + */ + private void closeRandomAccessValues() { + RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class); + if (ravs == null) + return; + Collection> values = ravs.removeAll(); + for (Pair value : values) { + try { + value.second.binaryFile.close(); + } catch (IOException e) { + Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e); + } + } + } + +}