/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package fi.vtt.simantics.procore.internal; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.simantics.db.ChangeSet; import org.simantics.db.Disposable; import org.simantics.db.Operation; import org.simantics.db.VirtualGraph; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.InternalException; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.query.QueryProcessor; import org.simantics.db.request.WriteOnly; import org.simantics.db.request.WriteTraits; import org.simantics.db.service.LifecycleSupport.LifecycleState; import org.simantics.db.service.TransactionPolicySupport; import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport; class State { static final int Closed = 1; static final int Closing = 2; static final int WriteTransaction = 4; static final int ReadTransaction = 8; static final int All = 15; private volatile int state = 0; // Must be volatile so we don't have to synchronize getState. private SessionImplSocket session = null; private GraphSession graphSession = null; private QueryProcessor queryProvider = null; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort. private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount. private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount. private Thread writeOwner = null; private volatile int asyncCount = 1; // Must be volatile so we don't have to synchronize getAsyncCount. private TransactionToken transactionToken = null; void setCombine(boolean a) { if (null != transactionToken) transactionToken.setCombine(a); } void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) { this.session = session; this.graphSession = graphSession; this.queryProvider = queryProvider; resetTransactionPolicy(); } void resetTransactionPolicy() { TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class); this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken); } int getAsyncCount() { return asyncCount; } int getReadCount() { return readCount; } int getWriteCount() { return writeCount; } boolean isWriteTransaction() { return 0 != (state & WriteTransaction); } boolean isAlive() { return !isClosed() && !isClosing(); } boolean isClosed() { return 0 != (state & Closed); } boolean isClosing() { return 0 != (state & Closing); } boolean isWriting() { return 0 != (state & WriteTransaction); } void close() { boolean acquired = false; try { acquired = lock.tryLock(1, TimeUnit.MINUTES); } catch (InterruptedException e1) { } try { if (null != transactionToken) transactionToken.close(); graphSession.stop(); } catch (DatabaseException e) { } finally { state = Closed; session.lifecycleSupport.fireListeners(LifecycleState.CLOSED); if (acquired) { condition.signalAll(); lock.unlock(); } } } void incAsync() { lock.lock(); try { boolean closed = isClosed(); boolean closing = isClosing(); if (closed || closing) throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing); ++asyncCount; } finally { lock.unlock(); } } void decAsync() { lock.lock(); try { if (asyncCount < 1) throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount); if (0 == --asyncCount) { condition.signal(); } } finally { lock.unlock(); } } enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock } WaitStatus waitClosing(long timeout) { lock.lock(); try { if (isClosed()) return WaitStatus.IsClosed; else if (isClosing()) return WaitStatus.IsClosing; state |= Closing; try { if (timeout < 0) { // Infinite timeout for (int i=0; i<100; i++) while (asyncCount > 0 || readCount > 0 || (null != writeOwner && Thread.currentThread() != writeOwner)) { condition.signalAll(); condition.awaitNanos(10000000); // nanos } while (asyncCount > 0 || readCount > 0 || (null != writeOwner && Thread.currentThread() != writeOwner)) { condition.signalAll(); condition.await(); } } else if (timeout > 0) { boolean interrupted = false; while (!interrupted && (asyncCount > 0 || readCount > 0 || (null != writeOwner && Thread.currentThread() != writeOwner))) interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS); } transactionToken.closing(); } catch (Exception e) { e.printStackTrace(); } state &= All ^ Closing; if (readCount == 0 && writeCount == 0 && asyncCount == 0) return WaitStatus.CanBeClosed; else if (null != writeOwner && Thread.currentThread() == writeOwner) return WaitStatus.Deadlock; return WaitStatus.Timeout; } finally { lock.unlock(); } } void startReadTransaction(int thread) throws DatabaseException { lock.lock(); try { assert(readCount == 0); transactionToken.startReadTransaction(thread); state |= ReadTransaction; session.fireStartReadTransaction(); ++readCount; } catch (Throwable e) { throw new DatabaseException("Failed to start read transaction.", e); } finally { lock.unlock(); } } void stopReadTransaction() throws DatabaseException { lock.lock(); try { assert (!queryProvider.listening.hasScheduledUpdates()); assert (readCount == 1); session.writeSupport.gc(); transactionToken.stopReadTransaction(); readCount = 0; state &= All ^ ReadTransaction; condition.signal(); } finally { lock.unlock(); } } void startWriteTransaction(int thread) throws DatabaseException { lock.lock(); try { // System.out.println("stawt"); boolean closed = isClosed(); boolean closing = isClosing(); int ac = asyncCount; int wc = writeCount; if (closed || (closing && ac < 1) || wc < 0) throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount); if (writeCount == 0) { transactionToken.startWriteTransaction(thread); state |= WriteTransaction; writeOwner = Thread.currentThread(); session.fireStartWriteTransaction(); } ++writeCount; assert(writeCount == 1); } catch (Throwable e) { throw new DatabaseException(e); } finally { lock.unlock(); } } void writeTransactionEnded() { session.defaultClusterSet = null; session.fireFinishWriteTransaction(); } void stopWriteTransaction(ClusterStream clusterStream) { if(!isAlive()) return; lock.lock(); try { // System.out.println("stowt"); if (writeCount < 1) throw new IllegalStateException(session + ": writeCount=" + writeCount); else if (1 == writeCount) { boolean empty = session.clusterStream.reallyFlush(); if (!empty) { String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n" + "This is probably a serious error. Automatic cancel done as default recovery."; Logger.defaultLogInfo(msg); ClientChangesImpl cs = new ClientChangesImpl(session); SynchronizeContext context = new SynchronizeContext(session, cs, 1); transactionToken.cancelBegin(session.writeSupport, context, clusterStream); } transactionToken.stopWriteTransaction(); state &= All ^ WriteTransaction; writeOwner = null; condition.signal(); writeTransactionEnded(); } --writeCount; } catch (Throwable e) { e.printStackTrace(); this.close(); // Everything is lost anyway. throw new IllegalStateException(e); } finally { lock.unlock(); } } void cancelWriteTransaction(WriteGraphImpl graph) { lock.lock(); try { if (writeCount < 1) throw new IllegalStateException(session + ": writeCount=" + writeCount); VirtualGraph vg = graph.getProvider(); if (null == vg) { // Skip if virtual graph. boolean empty = session.clusterStream.reallyFlush(); if (!empty) { // Something to cancel. // Send all local changes to server so it can calculate correct reverse change set. ClientChangesImpl cs = new ClientChangesImpl(session); SynchronizeContext context = new SynchronizeContext(session, cs, 1); // This will send cancel and fetch reverse change set from server. transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream); try { final boolean undo = false; if (!context.isOk(undo)) // this is a blocking operation throw new InternalException("Cancel failed. This should never happen."); // System.out.println("session cs: " + session.clientChanges); // System.out.println("reverse cs: " + cs); queryProvider.propagateChangesInQueryCache(graph); queryProvider.listening.fireListeners(graph); } catch (DatabaseException e) { Logger.defaultLogError(e); } // This will send and accept the reverse change set. transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream); } else { queryProvider.propagateChangesInQueryCache(graph); queryProvider.listening.fireListeners(graph); } } session.writeSupport.clearMetadata(); if (--writeCount == 0) { transactionToken.stopWriteTransaction(); state &= All ^ WriteTransaction; writeOwner = null; condition.signal(); writeTransactionEnded(); } } catch (Throwable e) { e.printStackTrace(); this.close(); // Everything is lost anyway. throw new IllegalStateException(e); } finally { lock.unlock(); } } void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) { assert(request != null); graph.commitAccessorChanges(request); boolean writeOnly = request instanceof WriteOnly; lock.lock(); try { VirtualGraph vg = graph.getProvider(); if (writeCount == 1) { if (vg != null && clusterStream.isDirty()) new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace(); // This is needed even when the write targets a virtual graph - // deny can always remove a persistent statement. clusterStream.reallyFlush(); session.clientChanges = new ClientChangesImpl(session); // start = System.nanoTime(); queryProvider.propagateChangesInQueryCache(graph); queryProvider.listening.fireListeners(graph); // ReadGraphImpl listenerGraph = graph.forRecompute(null); // listenerGraph.asyncBarrier.inc(); // queryProvider.listening.fireListeners(listenerGraph); // listenerGraph.asyncBarrier.dec(); // duration = System.nanoTime() - start; // System.out.println("performScheduledUpdates " + 1e-9*duration + "s. "); // Process updates as long as pending primitives exist while (session.dirtyPrimitives) { session.dirtyPrimitives = false; queryProvider.propagateChangesInQueryCache(graph); queryProvider.listening.fireListeners(graph); } if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please! session.fireReactionsToCommit(graph, cs); // Does not throw exception! writeTransactionEnded(); session.writeSupport.gc(); } else // Only the last writer can commit. throw new IllegalStateException(session + ": writeCount=" + writeCount); // long start = System.nanoTime(); // System.err.println("commit"); // start = System.nanoTime(); if (null == vg) transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op); else { // There should not be meta data because this is transient graph. // This is an insurance that this is the case for now. int n = session.writeSupport.clearMetadata(); if (n > 0) if (SessionImplSocket.DEBUG) System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n); } transactionToken.stopWriteTransaction(); // long duration = System.nanoTime() - start; // System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. "); // long duration = System.nanoTime() - start; // System.err.println("commit2 " + 1e-9*duration); state &= All ^ WriteTransaction; writeCount = 0; writeOwner = null; condition.signal(); } catch (Throwable e) { Logger.defaultLogError(e); this.close(); // Everything is lost anyway. throw new IllegalStateException(e); } finally { Disposable.safeDispose(cs); lock.unlock(); } } void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) { VirtualGraph vg = graph.getProvider(); if (null != vg) return; lock.lock(); try { clusterStream.reallyFlush(); transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null); // See below for explanation, the same reasoning should apply here too. if (graph.writeSupport instanceof WriteOnlySupport) graph.writeSupport.flushCluster(); } catch (Throwable e) { Logger.defaultLogError(e); this.close(); // Everything is lost anyway. throw new IllegalStateException(e); } finally { lock.unlock(); } } void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) { VirtualGraph vg = graph.getProvider(); if (null != vg) return; lock.lock(); try { transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null); transactionToken.setCombine(true); // This was added because test Issue3199Test2 failed because // write only cluster does not need cluster when allocating resource index // and the first claim after commitAndContinue2 caused cluster to be loaded // from server and of course it's resource index was off by one. if (graph.writeSupport instanceof WriteOnlySupport) graph.writeSupport.flushCluster(); } catch (Throwable e) { Logger.defaultLogError(e); this.close(); // Everything is lost anyway. throw new IllegalStateException(e); } finally { lock.unlock(); } } Operation getLastOperation() { return transactionToken.getLastOperation(); } long getHeadRevisionId() { return transactionToken.getHeadRevisionId(); } }