X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionRequestManager.java;h=0ec6387ae941b0e9cb9c6358282c3b3849ff9d0d;hp=cb1e2e0cfc7e3505ab8c85f2fd1487507123e4f5;hb=97872f96afc18f4cd5b959d869fed80cd92f6aa1;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java index cb1e2e0cf..0ec6387ae 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java @@ -1,406 +1,406 @@ -package fi.vtt.simantics.procore.internal; - -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - -import org.simantics.db.Resource; -import org.simantics.db.common.utils.Logger; -import org.simantics.db.exception.CancelTransactionException; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.impl.graph.WriteGraphImpl; -import org.simantics.db.impl.internal.RandomAccessValueSupport; -import org.simantics.db.impl.internal.ResourceData; -import org.simantics.db.impl.query.QueryProcessor.SessionRead; -import org.simantics.db.impl.query.QueryProcessor.SessionTask; -import org.simantics.db.request.WriteTraits; -import org.simantics.utils.datastructures.Pair; - -public class SessionRequestManager { - - private static boolean DEBUG = false; - - enum State { - - INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR - - } - - final SessionImplSocket session; - final fi.vtt.simantics.procore.internal.State transactionState; - final LinkedList writes = new LinkedList(); - final LinkedList reads = new LinkedList(); - - State state = State.INIT; - - private boolean legal(State current, State target) { - - if(State.IDLE == current) { - - return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target; - - } else if (State.WRITE == current) { - - return State.WRITE_UPDATE == target; - - } else if (State.READ == current) { - - return State.READ_UPDATE == target; - - } else if (State.ERROR == current) { - - return State.IDLE == target; - - } else { - - return State.IDLE == target; - - } - - } - - private synchronized void setState(int thread, State state, SessionTask task) { - - if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task); - if(this.state == state) return; - - assert(legal(this.state, state)); - - if(State.READ == state) { - - assert(State.IDLE == this.state); - - } - - State oldState = this.state; - this.state = state; - - if(State.IDLE == oldState && State.IDLE != state) { - session.incAsync(); - } - - /* - * We are now executing in new state. - * The methods below SHALL NOT start any task - * immediately. All tasks shall be scheduled. - * - */ - if(State.READ == state) startRead(thread, (SessionRead)task); - else if(State.WRITE == state) startWrite(thread, task); - else if(State.READ == oldState) startReadUpdate(thread); - else if(State.WRITE == oldState) startWriteUpdate(thread); - - // Check the need for new transaction - if(state == State.IDLE) { - - closeRandomAccessValues(); - - if(!writes.isEmpty()) { - setState(thread, State.WRITE, writes.poll()); - } else if (!reads.isEmpty()) { - setState(thread, State.READ, reads.poll()); - } - - session.decAsync(); - - } - - } - - public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) { - this.session = session; - this.transactionState = transactionState; - } - - public synchronized void startRead(int thread, final SessionRead task) { - - session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) { - - @Override - public void run(int thread) { - try { - transactionState.startReadTransaction(thread); - task.run(thread); - } catch (Throwable t) { - Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t)); - if(task.throwable != null) - task.throwable.set(t); - state = State.ERROR; - } finally { - if(task.notify != null) - task.notify.release(); - } - } - - }); - - } - - public synchronized void startReadUpdate(int thread) { - - session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) { - - @Override - public void run(int thread) { - - session.fireFinishReadTransaction(); - - try { - transactionState.stopReadTransaction(); - } catch (DatabaseException e) { - e.printStackTrace(); - } - - } - - }); - - } - -// public synchronized void stopRead(int thread) { -// -// try { -// transactionState.stopReadTransaction(); -// } catch (DatabaseException e) { -// e.printStackTrace(); -// } -// -// } - - public synchronized void startWrite(int thread, final SessionTask task) { - - session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) { - - @Override - public void run(int thread) { - - try { - transactionState.startWriteTransaction(thread); - } catch (Throwable t) { - DatabaseException e = new DatabaseException("Write transaction could not be started", t); - Logger.defaultLogError(e); - return; - } - task.run(thread); - - } - - }); - - } - - public synchronized void startWriteUpdate(int thread) { - - session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) { - - @Override - public void run(int thread) { - - // Support for DelayedWriteRequest cancels during the - // read-only part of the request. - WriteStateBase delayedState = session.delayedWriteState; - session.delayedWriteState = null; - if (delayedState != null) { - if (session.writeState != null) - throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set."); - - if (delayedState.isExcepted()) { - // There can never be any changes in the cluster stream. - // In this case, don't bother rolling back. - boolean empty = session.clusterStream.reallyFlush(); - assert(empty); - transactionState.stopWriteTransaction(session.clusterStream); - } else { - throw new UnsupportedOperationException("delayedWriteState may only exist when request fails."); - } - session.clientChanges = new ClientChangesImpl(session); - delayedState.finish(); - return; - } - - // The session could have been terminated - if(!session.state.isAlive()) return; - - WriteState writeState = session.writeState; - WriteGraphImpl graph = writeState.getGraph(); - - if(writeState.isExcepted()) { - - if(!(writeState.exception instanceof CancelTransactionException)) - writeState.exception.printStackTrace(); - - transactionState.cancelWriteTransaction(graph); - - } else { - - session.handleUpdatesAndMetadata(graph); - -// long start = System.nanoTime(); - transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null); -// long duration = System.nanoTime() - start; -// System.out.println("commitWriteTransaction " + 1e-9*duration + "s. "); - - } - - session.clientChanges = new ClientChangesImpl(session); - - WriteState state = session.writeState; - state.finish(); - session.writeState = null; - - } - - }); - - } - -// public synchronized void stopWrite(int thread) { -// -// session.clientChanges = new ClientChangesImpl(session); -// -// WriteState state = session.writeState; -// -// System.err.println("D"); -// state.finish(); -// System.err.println("E"); -// -// session.writeState = null; -// -// } - - public synchronized void ceased(int thread) { - - if(State.WRITE == state) { - - setState(thread, State.WRITE_UPDATE, null); - - } else if (State.WRITE_UPDATE == state) { - - setState(thread, State.IDLE, null); - - } else if (State.READ_UPDATE == state) { - - setState(thread, State.IDLE, null); - - } else if (State.READ == state) { - - if (!reads.isEmpty()) { - - final SessionRead read = reads.poll(); - session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) { - - @Override - public void run(int thread) { - read.run(thread); - if(read.notify != null) read.notify.release(); - } - - }); - - } else { - - setState(thread, State.READ_UPDATE, null); - - } - - } else if (State.INIT == state) { - - assert(reads.isEmpty()); - assert(writes.isEmpty()); - setState(thread, State.IDLE, null); - - } else if (State.ERROR == state) { - - setState(thread, State.IDLE, null); - - } else { - - throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); - - } - - } - - public synchronized void scheduleRead(final SessionRead task) { - - assert(State.INIT != state); - - if(State.READ == state) { - session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) { - - @Override - public void run(int thread) { - try { - task.run(thread); - } finally { - if (task.notify != null) task.notify.release(); - } - } - - }); - } else if (State.IDLE == state) { - setState(Integer.MIN_VALUE, State.READ, task); - } else { - reads.offer(task); - } - - } - - public int lastUpdateIndex() { - return writes.size(); - } - - public synchronized void scheduleWrite(SessionTask task) { - - scheduleWrite(task, null); - - } - - /** - * @param task the task to schedule - * @param combine true or false to explicitly - * specify whether to combine the write with the previous operation - * or null to use the default logic, i.e. combine when - * this schedule is performed from within update processing (state == - * State.WRITE_UPDATE). - */ - public synchronized void scheduleWrite(SessionTask task, Boolean combine) { - - boolean inUpdate = state == State.WRITE_UPDATE; - - assert(State.INIT != state); - //task.combine = combine != null ? combine : inUpdate; - if(State.IDLE == state) { - setState(Integer.MIN_VALUE, State.WRITE, task); - } else { - if(inUpdate) { - int index = lastUpdateIndex(); - if(index == writes.size()) writes.offer(task); - else writes.add(index, task); - } else { - writes.offer(task); - } - } - - } - - /** - * Unregisters and closes all currently registered ResourceData random - * access value file handles. Any I/O exceptions occurring during closing - * are logged into {@link Logger}. - */ - private void closeRandomAccessValues() { - RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class); - if (ravs == null) - return; - Collection> values = ravs.removeAll(); - for (Pair value : values) { - try { - value.second.binaryFile.close(); - } catch (IOException e) { - Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e); - } - } - } - -} +package fi.vtt.simantics.procore.internal; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + +import org.simantics.db.Resource; +import org.simantics.db.common.utils.Logger; +import org.simantics.db.exception.CancelTransactionException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.impl.graph.WriteGraphImpl; +import org.simantics.db.impl.internal.RandomAccessValueSupport; +import org.simantics.db.impl.internal.ResourceData; +import org.simantics.db.impl.query.QueryProcessor.SessionRead; +import org.simantics.db.impl.query.QueryProcessor.SessionTask; +import org.simantics.db.request.WriteTraits; +import org.simantics.utils.datastructures.Pair; + +public class SessionRequestManager { + + private static boolean DEBUG = false; + + enum State { + + INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR + + } + + final SessionImplSocket session; + final fi.vtt.simantics.procore.internal.State transactionState; + final LinkedList writes = new LinkedList(); + final LinkedList reads = new LinkedList(); + + State state = State.INIT; + + private boolean legal(State current, State target) { + + if(State.IDLE == current) { + + return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target; + + } else if (State.WRITE == current) { + + return State.WRITE_UPDATE == target; + + } else if (State.READ == current) { + + return State.READ_UPDATE == target; + + } else if (State.ERROR == current) { + + return State.IDLE == target; + + } else { + + return State.IDLE == target; + + } + + } + + private synchronized void setState(int thread, State state, SessionTask task) { + + if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task); + if(this.state == state) return; + + assert(legal(this.state, state)); + + if(State.READ == state) { + + assert(State.IDLE == this.state); + + } + + State oldState = this.state; + this.state = state; + + if(State.IDLE == oldState && State.IDLE != state) { + session.incAsync(); + } + + /* + * We are now executing in new state. + * The methods below SHALL NOT start any task + * immediately. All tasks shall be scheduled. + * + */ + if(State.READ == state) startRead(thread, (SessionRead)task); + else if(State.WRITE == state) startWrite(thread, task); + else if(State.READ == oldState) startReadUpdate(thread); + else if(State.WRITE == oldState) startWriteUpdate(thread); + + // Check the need for new transaction + if(state == State.IDLE) { + + closeRandomAccessValues(); + + if(!writes.isEmpty()) { + setState(thread, State.WRITE, writes.poll()); + } else if (!reads.isEmpty()) { + setState(thread, State.READ, reads.poll()); + } + + session.decAsync(); + + } + + } + + public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) { + this.session = session; + this.transactionState = transactionState; + } + + public synchronized void startRead(int thread, final SessionRead task) { + + session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) { + + @Override + public void run(int thread) { + try { + transactionState.startReadTransaction(thread); + task.run(thread); + } catch (Throwable t) { + Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t)); + if(task.throwable != null) + task.throwable.set(t); + state = State.ERROR; + } finally { + if(task.notify != null) + task.notify.release(); + } + } + + }); + + } + + public synchronized void startReadUpdate(int thread) { + + session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) { + + @Override + public void run(int thread) { + + session.fireFinishReadTransaction(); + + try { + transactionState.stopReadTransaction(); + } catch (DatabaseException e) { + e.printStackTrace(); + } + + } + + }); + + } + +// public synchronized void stopRead(int thread) { +// +// try { +// transactionState.stopReadTransaction(); +// } catch (DatabaseException e) { +// e.printStackTrace(); +// } +// +// } + + public synchronized void startWrite(int thread, final SessionTask task) { + + session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) { + + @Override + public void run(int thread) { + + try { + transactionState.startWriteTransaction(thread); + } catch (Throwable t) { + DatabaseException e = new DatabaseException("Write transaction could not be started", t); + Logger.defaultLogError(e); + return; + } + task.run(thread); + + } + + }); + + } + + public synchronized void startWriteUpdate(int thread) { + + session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) { + + @Override + public void run(int thread) { + + // Support for DelayedWriteRequest cancels during the + // read-only part of the request. + WriteStateBase delayedState = session.delayedWriteState; + session.delayedWriteState = null; + if (delayedState != null) { + if (session.writeState != null) + throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set."); + + if (delayedState.isExcepted()) { + // There can never be any changes in the cluster stream. + // In this case, don't bother rolling back. + boolean empty = session.clusterStream.reallyFlush(); + assert(empty); + transactionState.stopWriteTransaction(session.clusterStream); + } else { + throw new UnsupportedOperationException("delayedWriteState may only exist when request fails."); + } + session.clientChanges = new ClientChangesImpl(session); + delayedState.finish(); + return; + } + + // The session could have been terminated + if(!session.state.isAlive()) return; + + WriteState writeState = session.writeState; + WriteGraphImpl graph = writeState.getGraph(); + + if(writeState.isExcepted()) { + + if(!(writeState.exception instanceof CancelTransactionException)) + writeState.exception.printStackTrace(); + + transactionState.cancelWriteTransaction(graph); + + } else { + + session.handleUpdatesAndMetadata(graph); + +// long start = System.nanoTime(); + transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null); +// long duration = System.nanoTime() - start; +// System.out.println("commitWriteTransaction " + 1e-9*duration + "s. "); + + } + + session.clientChanges = new ClientChangesImpl(session); + + WriteState state = session.writeState; + state.finish(); + session.writeState = null; + + } + + }); + + } + +// public synchronized void stopWrite(int thread) { +// +// session.clientChanges = new ClientChangesImpl(session); +// +// WriteState state = session.writeState; +// +// System.err.println("D"); +// state.finish(); +// System.err.println("E"); +// +// session.writeState = null; +// +// } + + public synchronized void ceased(int thread) { + + if(State.WRITE == state) { + + setState(thread, State.WRITE_UPDATE, null); + + } else if (State.WRITE_UPDATE == state) { + + setState(thread, State.IDLE, null); + + } else if (State.READ_UPDATE == state) { + + setState(thread, State.IDLE, null); + + } else if (State.READ == state) { + + if (!reads.isEmpty()) { + + final SessionRead read = reads.poll(); + session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) { + + @Override + public void run(int thread) { + read.run(thread); + if(read.notify != null) read.notify.release(); + } + + }); + + } else { + + setState(thread, State.READ_UPDATE, null); + + } + + } else if (State.INIT == state) { + + assert(reads.isEmpty()); + assert(writes.isEmpty()); + setState(thread, State.IDLE, null); + + } else if (State.ERROR == state) { + + setState(thread, State.IDLE, null); + + } else { + + throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); + + } + + } + + public synchronized void scheduleRead(final SessionRead task) { + + assert(State.INIT != state); + + if(State.READ == state) { + session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) { + + @Override + public void run(int thread) { + try { + task.run(thread); + } finally { + if (task.notify != null) task.notify.release(); + } + } + + }); + } else if (State.IDLE == state) { + setState(Integer.MIN_VALUE, State.READ, task); + } else { + reads.offer(task); + } + + } + + public int lastUpdateIndex() { + return writes.size(); + } + + public synchronized void scheduleWrite(SessionTask task) { + + scheduleWrite(task, null); + + } + + /** + * @param task the task to schedule + * @param combine true or false to explicitly + * specify whether to combine the write with the previous operation + * or null to use the default logic, i.e. combine when + * this schedule is performed from within update processing (state == + * State.WRITE_UPDATE). + */ + public synchronized void scheduleWrite(SessionTask task, Boolean combine) { + + boolean inUpdate = state == State.WRITE_UPDATE; + + assert(State.INIT != state); + //task.combine = combine != null ? combine : inUpdate; + if(State.IDLE == state) { + setState(Integer.MIN_VALUE, State.WRITE, task); + } else { + if(inUpdate) { + int index = lastUpdateIndex(); + if(index == writes.size()) writes.offer(task); + else writes.add(index, task); + } else { + writes.offer(task); + } + } + + } + + /** + * Unregisters and closes all currently registered ResourceData random + * access value file handles. Any I/O exceptions occurring during closing + * are logged into {@link Logger}. + */ + private void closeRandomAccessValues() { + RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class); + if (ravs == null) + return; + Collection> values = ravs.removeAll(); + for (Pair value : values) { + try { + value.second.binaryFile.close(); + } catch (IOException e) { + Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e); + } + } + } + +}