X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FState.java;fp=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FState.java;h=8345c7877b817ab54c90b500e500385bbece519c;hp=0000000000000000000000000000000000000000;hb=969bd23cab98a79ca9101af33334000879fb60c5;hpb=866dba5cd5a3929bbeae85991796acb212338a08 diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java new file mode 100644 index 000000000..8345c7877 --- /dev/null +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java @@ -0,0 +1,445 @@ +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package fi.vtt.simantics.procore.internal; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.simantics.db.ChangeSet; +import org.simantics.db.Operation; +import org.simantics.db.VirtualGraph; +import org.simantics.db.common.utils.Logger; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.exception.InternalException; +import org.simantics.db.exception.RuntimeDatabaseException; +import org.simantics.db.impl.graph.WriteGraphImpl; +import org.simantics.db.impl.query.QueryProcessor; +import org.simantics.db.request.WriteOnly; +import org.simantics.db.request.WriteTraits; +import org.simantics.db.service.LifecycleSupport.LifecycleState; +import org.simantics.db.service.TransactionPolicySupport; + +import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport; + +class State { + static final int Closed = 1; + static final int Closing = 2; + static final int WriteTransaction = 4; + static final int ReadTransaction = 8; + static final int All = 15; + private volatile int state = 0; // Must be volatile so we don't have to synchronize getState. + private SessionImplSocket session = null; + private GraphSession graphSession = null; + private QueryProcessor queryProvider = null; + private Lock lock = new ReentrantLock(); + private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort. + private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount. + private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount. + private Thread writeOwner = null; + private int asyncCount = 1; + private TransactionToken transactionToken = null; + void setCombine(boolean a) { + if (null != transactionToken) + transactionToken.setCombine(a); + } + void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) { + this.session = session; + this.graphSession = graphSession; + this.queryProvider = queryProvider; + resetTransactionPolicy(); + } + void resetTransactionPolicy() { + TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class); + this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken); + } + int getAsyncCount() { + return asyncCount; + } + int getReadCount() { + return readCount; + } + int getWriteCount() { + return writeCount; + } + boolean isWriteTransaction() { + return 0 != (state & WriteTransaction); + } + boolean isAlive() { + return !isClosed() && !isClosing(); + } + boolean isClosed() { + return 0 != (state & Closed); + } + boolean isClosing() { + return 0 != (state & Closing); + } + boolean isWriting() { + return 0 != (state & WriteTransaction); + } + void close() { + boolean acquired = false; + try { + acquired = lock.tryLock(1, TimeUnit.MINUTES); + } catch (InterruptedException e1) { + } + try { + if (null != transactionToken) + transactionToken.close(); + graphSession.stop(); + } catch (DatabaseException e) { + } finally { + state = Closed; + session.lifecycleSupport.fireListeners(LifecycleState.CLOSED); + if (acquired) { + condition.signalAll(); + lock.unlock(); + } + } + } + void incAsync() + { + lock.lock(); + try { + boolean closed = isClosed(); + boolean closing = isClosing(); + if (closed || closing) + throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing); + ++asyncCount; + } finally { + lock.unlock(); + } + } + void decAsync() + { + lock.lock(); + try { + if (asyncCount < 1) + throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount); + if (0 == --asyncCount) { + condition.signal(); + } + } finally { + lock.unlock(); + } + } + enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock } + WaitStatus waitClosing(long timeout) { + lock.lock(); + try { + if (isClosed()) + return WaitStatus.IsClosed; + else if (isClosing()) + return WaitStatus.IsClosing; + state |= Closing; + try { + if (timeout < 0) { // Infinite timeout + for (int i=0; i<100; i++) + while (asyncCount > 0 || readCount > 0 || + (null != writeOwner && Thread.currentThread() != writeOwner)) { + condition.signalAll(); + condition.awaitNanos(10000000); // nanos + } + while (asyncCount > 0 || readCount > 0 || + (null != writeOwner && Thread.currentThread() != writeOwner)) { + condition.signalAll(); + condition.await(); + } + } else if (timeout > 0) { + boolean interrupted = false; + while (!interrupted && + (asyncCount > 0 || readCount > 0 || + (null != writeOwner && Thread.currentThread() != writeOwner))) + interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS); + } + transactionToken.closing(); + } catch (Exception e) { + e.printStackTrace(); + } + state &= All ^ Closing; + if (readCount == 0 && writeCount == 0 && asyncCount == 0) + return WaitStatus.CanBeClosed; + else if (null != writeOwner && Thread.currentThread() == writeOwner) + return WaitStatus.Deadlock; + return WaitStatus.Timeout; + } finally { + lock.unlock(); + } + } + void startReadTransaction(int thread) throws DatabaseException { + lock.lock(); + try { + assert(readCount == 0); + transactionToken.startReadTransaction(thread); + state |= ReadTransaction; + session.fireStartReadTransaction(); + ++readCount; + } catch (Throwable e) { + throw new DatabaseException("Failed to start read transaction.", e); + } finally { + lock.unlock(); + } + } + void stopReadTransaction() throws DatabaseException { + lock.lock(); + try { + assert (!queryProvider.hasScheduledUpdates()); + assert (readCount == 1); + session.writeSupport.gc(); + transactionToken.stopReadTransaction(); + readCount = 0; + state &= All ^ ReadTransaction; + condition.signal(); + } finally { + lock.unlock(); + } + } + + void startWriteTransaction(int thread) + throws DatabaseException { + lock.lock(); + try { +// System.out.println("stawt"); + boolean closed = isClosed(); + boolean closing = isClosing(); + int ac = asyncCount; + int wc = writeCount; + if (closed || (closing && ac < 1) || wc < 0) + throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount); + if (writeCount == 0) { + transactionToken.startWriteTransaction(thread); + state |= WriteTransaction; + writeOwner = Thread.currentThread(); + session.fireStartWriteTransaction(); + } + ++writeCount; + assert(writeCount == 1); + } catch (Throwable e) { + throw new DatabaseException(e); + } finally { + lock.unlock(); + } + } + + void writeTransactionEnded() { + session.defaultClusterSet = null; + session.fireFinishWriteTransaction(); + } + + void stopWriteTransaction(ClusterStream clusterStream) { + if(!isAlive()) return; + lock.lock(); + try { +// System.out.println("stowt"); + if (writeCount < 1) + throw new IllegalStateException(session + ": writeCount=" + writeCount); + else if (1 == writeCount) { + boolean empty = session.clusterStream.reallyFlush(); + if (!empty) { + String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n" + + "This is probably a serious error. Automatic cancel done as default recovery."; + Logger.defaultLogInfo(msg); + ClientChangesImpl cs = new ClientChangesImpl(session); + SynchronizeContext context = new SynchronizeContext(session, cs, 1); + transactionToken.cancelBegin(session.writeSupport, context, clusterStream); + } + transactionToken.stopWriteTransaction(); + state &= All ^ WriteTransaction; + writeOwner = null; + condition.signal(); + writeTransactionEnded(); + } + --writeCount; + } catch (Throwable e) { + e.printStackTrace(); + this.close(); // Everything is lost anyway. + throw new IllegalStateException(e); + } finally { + lock.unlock(); + } + } + + void cancelWriteTransaction(WriteGraphImpl graph) { + lock.lock(); + try { + if (writeCount < 1) + throw new IllegalStateException(session + ": writeCount=" + writeCount); + VirtualGraph vg = graph.getProvider(); + if (null == vg) { // Skip if virtual graph. + boolean empty = session.clusterStream.reallyFlush(); + if (!empty) { // Something to cancel. + // Send all local changes to server so it can calculate correct reverse change set. + ClientChangesImpl cs = new ClientChangesImpl(session); + SynchronizeContext context = new SynchronizeContext(session, cs, 1); + // This will send cancel and fetch reverse change set from server. + transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream); + try { + final boolean undo = false; + if (!context.isOk(undo)) // this is a blocking operation + throw new InternalException("Cancel failed. This should never happen. Contact application support."); +// System.out.println("session cs: " + session.clientChanges); +// System.out.println("reverse cs: " + cs); + queryProvider.performDirtyUpdates(graph); + queryProvider.performScheduledUpdates(graph); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + // This will send and accept the reverse change set. + transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream); + } + } + session.writeSupport.clearMetadata(); + if (--writeCount == 0) { + transactionToken.stopWriteTransaction(); + state &= All ^ WriteTransaction; + writeOwner = null; + condition.signal(); + writeTransactionEnded(); + } + } catch (Throwable e) { + e.printStackTrace(); + this.close(); // Everything is lost anyway. + throw new IllegalStateException(e); + } finally { + lock.unlock(); + } + } + + void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) { + assert(request != null); + graph.commitAccessorChanges(request); + boolean writeOnly = request instanceof WriteOnly; + lock.lock(); + try { + VirtualGraph vg = graph.getProvider(); + if (writeCount == 1) { + + if (vg != null && clusterStream.isDirty()) + new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace(); + +// long start = System.nanoTime(); + if (null == vg) { + clusterStream.reallyFlush(); + // This was fired just before in handleUpdatesAndMetadata +// if (!writeOnly) +// session.fireMetadataListeners(graph, cs); + } else + clusterStream.clear(); + +// long duration = System.nanoTime() - start; +// System.out.println("reallyFlush " + 1e-9*duration + "s. "); + + session.clientChanges = new ClientChangesImpl(session); + + +// start = System.nanoTime(); + queryProvider.performScheduledUpdates(graph); +// duration = System.nanoTime() - start; +// System.out.println("performScheduledUpdates " + 1e-9*duration + "s. "); + + // Process updates as long as pending primitives exist + while (session.dirtyPrimitives) { + session.dirtyPrimitives = false; + queryProvider.performDirtyUpdates(graph); + queryProvider.performScheduledUpdates(graph); + } + + if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please! + session.fireReactionsToCommit(graph, cs); // Does not throw exception! + + writeTransactionEnded(); + session.writeSupport.gc(); + } else // Only the last writer can commit. + throw new IllegalStateException(session + ": writeCount=" + writeCount); +// long start = System.nanoTime(); +// System.err.println("commit"); + +// start = System.nanoTime(); + if (null == vg) + transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op); + else { + // There should not be meta data because this is transient graph. + // This is an insurance that this is the case for now. + int n = session.writeSupport.clearMetadata(); + if (n > 0) + if (SessionImplSocket.DEBUG) + System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n); + } + transactionToken.stopWriteTransaction(); +// long duration = System.nanoTime() - start; +// System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. "); + + +// long duration = System.nanoTime() - start; +// System.err.println("commit2 " + 1e-9*duration); + state &= All ^ WriteTransaction; + writeCount = 0; + writeOwner = null; + condition.signal(); + } catch (Throwable e) { + Logger.defaultLogError(e); + this.close(); // Everything is lost anyway. + throw new IllegalStateException(e); + } finally { + lock.unlock(); + } + } + + void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) { + VirtualGraph vg = graph.getProvider(); + if (null != vg) + return; + lock.lock(); + try { + clusterStream.reallyFlush(); + transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null); + // See below for explanation, the same reasoning should apply here too. + if (graph.writeSupport instanceof WriteOnlySupport) + graph.writeSupport.flushCluster(); + } catch (Throwable e) { + Logger.defaultLogError(e); + this.close(); // Everything is lost anyway. + throw new IllegalStateException(e); + } finally { + lock.unlock(); + } + } + void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) { + VirtualGraph vg = graph.getProvider(); + if (null != vg) + return; + lock.lock(); + try { + transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null); + transactionToken.setCombine(true); + // This was added because test Issue3199Test2 failed because + // write only cluster does not need cluster when allocating resource index + // and the first claim after commitAndContinue2 caused cluster to be loaded + // from server and of course it's resource index was off by one. + if (graph.writeSupport instanceof WriteOnlySupport) + graph.writeSupport.flushCluster(); + } catch (Throwable e) { + Logger.defaultLogError(e); + this.close(); // Everything is lost anyway. + throw new IllegalStateException(e); + } finally { + lock.unlock(); + } + } + Operation getLastOperation() { + return transactionToken.getLastOperation(); + } + long getHeadRevisionId() { + return transactionToken.getHeadRevisionId(); + } +}