package fi.vtt.simantics.procore.internal; import java.io.IOException; import java.util.Collection; import java.util.LinkedList; import org.simantics.db.Resource; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.CancelTransactionException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.internal.RandomAccessValueSupport; import org.simantics.db.impl.internal.ResourceData; import org.simantics.db.impl.query.QueryProcessor.SessionRead; import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.request.WriteTraits; import org.simantics.utils.datastructures.Pair; public class SessionRequestManager { private static boolean DEBUG = false; enum State { INIT, IDLE, READ, WRITE, WRITE_UPDATE, READ_UPDATE, ERROR } final SessionImplSocket session; final fi.vtt.simantics.procore.internal.State transactionState; final LinkedList writes = new LinkedList(); final LinkedList reads = new LinkedList(); State state = State.INIT; private boolean legal(State current, State target) { if(State.IDLE == current) { return State.INIT != target && State.WRITE_UPDATE != target && State.READ_UPDATE != target; } else if (State.WRITE == current) { return State.WRITE_UPDATE == target; } else if (State.READ == current) { return State.READ_UPDATE == target; } else if (State.ERROR == current) { return State.IDLE == target; } else { return State.IDLE == target; } } private synchronized void setState(int thread, State state, SessionTask task) { if(DEBUG) System.err.println("Session state: " + this.state + " => " + state + " - " + task); if(this.state == state) return; assert(legal(this.state, state)); if(State.READ == state) { assert(State.IDLE == this.state); } State oldState = this.state; this.state = state; if(State.IDLE == oldState && State.IDLE != state) { session.incAsync(); } /* * We are now executing in new state. * The methods below SHALL NOT start any task * immediately. All tasks shall be scheduled. * */ if(State.READ == state) startRead(thread, (SessionRead)task); else if(State.WRITE == state) startWrite(thread, task); else if(State.READ == oldState) startReadUpdate(thread); else if(State.WRITE == oldState) startWriteUpdate(thread); // Check the need for new transaction if(state == State.IDLE) { closeRandomAccessValues(); if(!writes.isEmpty()) { setState(thread, State.WRITE, writes.poll()); } else if (!reads.isEmpty()) { setState(thread, State.READ, reads.poll()); } session.decAsync(); } } public SessionRequestManager(SessionImplSocket session, fi.vtt.simantics.procore.internal.State transactionState) { this.session = session; this.transactionState = transactionState; } public synchronized void startRead(int thread, final SessionRead task) { session.queryProvider2.scheduleAlways(thread, new SessionTask(task.object, task.thread, task.syncCaller) { @Override public void run(int thread) { try { transactionState.startReadTransaction(thread); task.run(thread); } catch (Throwable t) { Logger.defaultLogError(new DatabaseException("Read transaction could not be started", t)); if(task.throwable != null) task.throwable.set(t); state = State.ERROR; } finally { if(task.notify != null) task.notify.release(); } } }); } public synchronized void startReadUpdate(int thread) { session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) { @Override public void run(int thread) { session.fireFinishReadTransaction(); try { transactionState.stopReadTransaction(); } catch (DatabaseException e) { e.printStackTrace(); } } }); } // public synchronized void stopRead(int thread) { // // try { // transactionState.stopReadTransaction(); // } catch (DatabaseException e) { // e.printStackTrace(); // } // // } public synchronized void startWrite(int thread, final SessionTask task) { session.queryProvider2.scheduleAlways(thread, new SessionTask((WriteTraits)task.object, task.thread) { @Override public void run(int thread) { try { transactionState.startWriteTransaction(thread); } catch (Throwable t) { DatabaseException e = new DatabaseException("Write transaction could not be started", t); Logger.defaultLogError(e); return; } task.run(thread); } }); } public synchronized void startWriteUpdate(int thread) { session.queryProvider2.scheduleAlways(thread, new SessionTask(null, thread) { @Override public void run(int thread) { // Support for DelayedWriteRequest cancels during the // read-only part of the request. WriteStateBase delayedState = session.delayedWriteState; session.delayedWriteState = null; if (delayedState != null) { if (session.writeState != null) throw new AssertionError("Both session delayedWriteState and writeState are set, only one can be set."); if (delayedState.isExcepted()) { // There can never be any changes in the cluster stream. // In this case, don't bother rolling back. boolean empty = session.clusterStream.reallyFlush(); assert(empty); transactionState.stopWriteTransaction(session.clusterStream); } else { throw new UnsupportedOperationException("delayedWriteState may only exist when request fails."); } session.clientChanges = new ClientChangesImpl(session); delayedState.finish(); return; } // The session could have been terminated if(!session.state.isAlive()) return; WriteState writeState = session.writeState; WriteGraphImpl graph = writeState.getGraph(); if(writeState.isExcepted()) { if(!(writeState.exception instanceof CancelTransactionException)) writeState.exception.printStackTrace(); transactionState.cancelWriteTransaction(graph); } else { session.handleUpdatesAndMetadata(graph); // long start = System.nanoTime(); transactionState.commitWriteTransaction(writeState.getGraph(), session.clusterStream, session.clientChanges, writeState.getRequest(), null); // long duration = System.nanoTime() - start; // System.out.println("commitWriteTransaction " + 1e-9*duration + "s. "); } session.clientChanges = new ClientChangesImpl(session); WriteState state = session.writeState; state.finish(); session.writeState = null; } }); } // public synchronized void stopWrite(int thread) { // // session.clientChanges = new ClientChangesImpl(session); // // WriteState state = session.writeState; // // System.err.println("D"); // state.finish(); // System.err.println("E"); // // session.writeState = null; // // } public synchronized void ceased(int thread) { if(State.WRITE == state) { setState(thread, State.WRITE_UPDATE, null); } else if (State.WRITE_UPDATE == state) { setState(thread, State.IDLE, null); } else if (State.READ_UPDATE == state) { setState(thread, State.IDLE, null); } else if (State.READ == state) { if (!reads.isEmpty()) { final SessionRead read = reads.poll(); session.queryProvider2.scheduleAlways(thread, new SessionTask(read.object, read.thread, read.syncCaller) { @Override public void run(int thread) { read.run(thread); if(read.notify != null) read.notify.release(); } }); } else { setState(thread, State.READ_UPDATE, null); } } else if (State.INIT == state) { assert(reads.isEmpty()); assert(writes.isEmpty()); setState(thread, State.IDLE, null); } else if (State.ERROR == state) { setState(thread, State.IDLE, null); } else { throw new IllegalStateException("State in ceased should be WRITE or READ or INIT (was " + state + ")"); } } public synchronized void scheduleRead(final SessionRead task) { assert(State.INIT != state); if(State.READ == state) { session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(task.object, task.thread, task.syncCaller) { @Override public void run(int thread) { try { task.run(thread); } finally { if (task.notify != null) task.notify.release(); } } }); } else if (State.IDLE == state) { setState(Integer.MIN_VALUE, State.READ, task); } else { reads.offer(task); } } public int lastUpdateIndex() { return writes.size(); } public synchronized void scheduleWrite(SessionTask task) { scheduleWrite(task, null); } /** * @param task the task to schedule * @param combine true or false to explicitly * specify whether to combine the write with the previous operation * or null to use the default logic, i.e. combine when * this schedule is performed from within update processing (state == * State.WRITE_UPDATE). */ public synchronized void scheduleWrite(SessionTask task, Boolean combine) { boolean inUpdate = state == State.WRITE_UPDATE; assert(State.INIT != state); //task.combine = combine != null ? combine : inUpdate; if(State.IDLE == state) { setState(Integer.MIN_VALUE, State.WRITE, task); } else { if(inUpdate) { int index = lastUpdateIndex(); if(index == writes.size()) writes.offer(task); else writes.add(index, task); } else { writes.offer(task); } } } /** * Unregisters and closes all currently registered ResourceData random * access value file handles. Any I/O exceptions occurring during closing * are logged into {@link Logger}. */ private void closeRandomAccessValues() { RandomAccessValueSupport ravs = session.peekService(RandomAccessValueSupport.class); if (ravs == null) return; Collection> values = ravs.removeAll(); for (Pair value : values) { try { value.second.binaryFile.close(); } catch (IOException e) { Logger.defaultLogError("I/O exception while closing random access value file " + value.second.binaryFile.file() + " for resource " + value.first , e); } } } }