X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FState.java;fp=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FState.java;h=5adc593001cd19ae305c5fb3e00dfe181b0003e0;hp=8345c7877b817ab54c90b500e500385bbece519c;hb=0ae2b770234dfc3cbb18bd38f324125cf0faca07;hpb=24e2b34260f219f0d1644ca7a138894980e25b14 diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java index 8345c7877..5adc59300 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java @@ -1,445 +1,445 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package fi.vtt.simantics.procore.internal; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.simantics.db.ChangeSet; -import org.simantics.db.Operation; -import org.simantics.db.VirtualGraph; -import org.simantics.db.common.utils.Logger; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.exception.InternalException; -import org.simantics.db.exception.RuntimeDatabaseException; -import org.simantics.db.impl.graph.WriteGraphImpl; -import org.simantics.db.impl.query.QueryProcessor; -import org.simantics.db.request.WriteOnly; -import org.simantics.db.request.WriteTraits; -import org.simantics.db.service.LifecycleSupport.LifecycleState; -import org.simantics.db.service.TransactionPolicySupport; - -import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport; - -class State { - static final int Closed = 1; - static final int Closing = 2; - static final int WriteTransaction = 4; - static final int ReadTransaction = 8; - static final int All = 15; - private volatile int state = 0; // Must be volatile so we don't have to synchronize getState. - private SessionImplSocket session = null; - private GraphSession graphSession = null; - private QueryProcessor queryProvider = null; - private Lock lock = new ReentrantLock(); - private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort. - private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount. - private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount. - private Thread writeOwner = null; - private int asyncCount = 1; - private TransactionToken transactionToken = null; - void setCombine(boolean a) { - if (null != transactionToken) - transactionToken.setCombine(a); - } - void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) { - this.session = session; - this.graphSession = graphSession; - this.queryProvider = queryProvider; - resetTransactionPolicy(); - } - void resetTransactionPolicy() { - TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class); - this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken); - } - int getAsyncCount() { - return asyncCount; - } - int getReadCount() { - return readCount; - } - int getWriteCount() { - return writeCount; - } - boolean isWriteTransaction() { - return 0 != (state & WriteTransaction); - } - boolean isAlive() { - return !isClosed() && !isClosing(); - } - boolean isClosed() { - return 0 != (state & Closed); - } - boolean isClosing() { - return 0 != (state & Closing); - } - boolean isWriting() { - return 0 != (state & WriteTransaction); - } - void close() { - boolean acquired = false; - try { - acquired = lock.tryLock(1, TimeUnit.MINUTES); - } catch (InterruptedException e1) { - } - try { - if (null != transactionToken) - transactionToken.close(); - graphSession.stop(); - } catch (DatabaseException e) { - } finally { - state = Closed; - session.lifecycleSupport.fireListeners(LifecycleState.CLOSED); - if (acquired) { - condition.signalAll(); - lock.unlock(); - } - } - } - void incAsync() - { - lock.lock(); - try { - boolean closed = isClosed(); - boolean closing = isClosing(); - if (closed || closing) - throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing); - ++asyncCount; - } finally { - lock.unlock(); - } - } - void decAsync() - { - lock.lock(); - try { - if (asyncCount < 1) - throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount); - if (0 == --asyncCount) { - condition.signal(); - } - } finally { - lock.unlock(); - } - } - enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock } - WaitStatus waitClosing(long timeout) { - lock.lock(); - try { - if (isClosed()) - return WaitStatus.IsClosed; - else if (isClosing()) - return WaitStatus.IsClosing; - state |= Closing; - try { - if (timeout < 0) { // Infinite timeout - for (int i=0; i<100; i++) - while (asyncCount > 0 || readCount > 0 || - (null != writeOwner && Thread.currentThread() != writeOwner)) { - condition.signalAll(); - condition.awaitNanos(10000000); // nanos - } - while (asyncCount > 0 || readCount > 0 || - (null != writeOwner && Thread.currentThread() != writeOwner)) { - condition.signalAll(); - condition.await(); - } - } else if (timeout > 0) { - boolean interrupted = false; - while (!interrupted && - (asyncCount > 0 || readCount > 0 || - (null != writeOwner && Thread.currentThread() != writeOwner))) - interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS); - } - transactionToken.closing(); - } catch (Exception e) { - e.printStackTrace(); - } - state &= All ^ Closing; - if (readCount == 0 && writeCount == 0 && asyncCount == 0) - return WaitStatus.CanBeClosed; - else if (null != writeOwner && Thread.currentThread() == writeOwner) - return WaitStatus.Deadlock; - return WaitStatus.Timeout; - } finally { - lock.unlock(); - } - } - void startReadTransaction(int thread) throws DatabaseException { - lock.lock(); - try { - assert(readCount == 0); - transactionToken.startReadTransaction(thread); - state |= ReadTransaction; - session.fireStartReadTransaction(); - ++readCount; - } catch (Throwable e) { - throw new DatabaseException("Failed to start read transaction.", e); - } finally { - lock.unlock(); - } - } - void stopReadTransaction() throws DatabaseException { - lock.lock(); - try { - assert (!queryProvider.hasScheduledUpdates()); - assert (readCount == 1); - session.writeSupport.gc(); - transactionToken.stopReadTransaction(); - readCount = 0; - state &= All ^ ReadTransaction; - condition.signal(); - } finally { - lock.unlock(); - } - } - - void startWriteTransaction(int thread) - throws DatabaseException { - lock.lock(); - try { -// System.out.println("stawt"); - boolean closed = isClosed(); - boolean closing = isClosing(); - int ac = asyncCount; - int wc = writeCount; - if (closed || (closing && ac < 1) || wc < 0) - throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount); - if (writeCount == 0) { - transactionToken.startWriteTransaction(thread); - state |= WriteTransaction; - writeOwner = Thread.currentThread(); - session.fireStartWriteTransaction(); - } - ++writeCount; - assert(writeCount == 1); - } catch (Throwable e) { - throw new DatabaseException(e); - } finally { - lock.unlock(); - } - } - - void writeTransactionEnded() { - session.defaultClusterSet = null; - session.fireFinishWriteTransaction(); - } - - void stopWriteTransaction(ClusterStream clusterStream) { - if(!isAlive()) return; - lock.lock(); - try { -// System.out.println("stowt"); - if (writeCount < 1) - throw new IllegalStateException(session + ": writeCount=" + writeCount); - else if (1 == writeCount) { - boolean empty = session.clusterStream.reallyFlush(); - if (!empty) { - String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n" - + "This is probably a serious error. Automatic cancel done as default recovery."; - Logger.defaultLogInfo(msg); - ClientChangesImpl cs = new ClientChangesImpl(session); - SynchronizeContext context = new SynchronizeContext(session, cs, 1); - transactionToken.cancelBegin(session.writeSupport, context, clusterStream); - } - transactionToken.stopWriteTransaction(); - state &= All ^ WriteTransaction; - writeOwner = null; - condition.signal(); - writeTransactionEnded(); - } - --writeCount; - } catch (Throwable e) { - e.printStackTrace(); - this.close(); // Everything is lost anyway. - throw new IllegalStateException(e); - } finally { - lock.unlock(); - } - } - - void cancelWriteTransaction(WriteGraphImpl graph) { - lock.lock(); - try { - if (writeCount < 1) - throw new IllegalStateException(session + ": writeCount=" + writeCount); - VirtualGraph vg = graph.getProvider(); - if (null == vg) { // Skip if virtual graph. - boolean empty = session.clusterStream.reallyFlush(); - if (!empty) { // Something to cancel. - // Send all local changes to server so it can calculate correct reverse change set. - ClientChangesImpl cs = new ClientChangesImpl(session); - SynchronizeContext context = new SynchronizeContext(session, cs, 1); - // This will send cancel and fetch reverse change set from server. - transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream); - try { - final boolean undo = false; - if (!context.isOk(undo)) // this is a blocking operation - throw new InternalException("Cancel failed. This should never happen. Contact application support."); -// System.out.println("session cs: " + session.clientChanges); -// System.out.println("reverse cs: " + cs); - queryProvider.performDirtyUpdates(graph); - queryProvider.performScheduledUpdates(graph); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - // This will send and accept the reverse change set. - transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream); - } - } - session.writeSupport.clearMetadata(); - if (--writeCount == 0) { - transactionToken.stopWriteTransaction(); - state &= All ^ WriteTransaction; - writeOwner = null; - condition.signal(); - writeTransactionEnded(); - } - } catch (Throwable e) { - e.printStackTrace(); - this.close(); // Everything is lost anyway. - throw new IllegalStateException(e); - } finally { - lock.unlock(); - } - } - - void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) { - assert(request != null); - graph.commitAccessorChanges(request); - boolean writeOnly = request instanceof WriteOnly; - lock.lock(); - try { - VirtualGraph vg = graph.getProvider(); - if (writeCount == 1) { - - if (vg != null && clusterStream.isDirty()) - new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace(); - -// long start = System.nanoTime(); - if (null == vg) { - clusterStream.reallyFlush(); - // This was fired just before in handleUpdatesAndMetadata -// if (!writeOnly) -// session.fireMetadataListeners(graph, cs); - } else - clusterStream.clear(); - -// long duration = System.nanoTime() - start; -// System.out.println("reallyFlush " + 1e-9*duration + "s. "); - - session.clientChanges = new ClientChangesImpl(session); - - -// start = System.nanoTime(); - queryProvider.performScheduledUpdates(graph); -// duration = System.nanoTime() - start; -// System.out.println("performScheduledUpdates " + 1e-9*duration + "s. "); - - // Process updates as long as pending primitives exist - while (session.dirtyPrimitives) { - session.dirtyPrimitives = false; - queryProvider.performDirtyUpdates(graph); - queryProvider.performScheduledUpdates(graph); - } - - if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please! - session.fireReactionsToCommit(graph, cs); // Does not throw exception! - - writeTransactionEnded(); - session.writeSupport.gc(); - } else // Only the last writer can commit. - throw new IllegalStateException(session + ": writeCount=" + writeCount); -// long start = System.nanoTime(); -// System.err.println("commit"); - -// start = System.nanoTime(); - if (null == vg) - transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op); - else { - // There should not be meta data because this is transient graph. - // This is an insurance that this is the case for now. - int n = session.writeSupport.clearMetadata(); - if (n > 0) - if (SessionImplSocket.DEBUG) - System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n); - } - transactionToken.stopWriteTransaction(); -// long duration = System.nanoTime() - start; -// System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. "); - - -// long duration = System.nanoTime() - start; -// System.err.println("commit2 " + 1e-9*duration); - state &= All ^ WriteTransaction; - writeCount = 0; - writeOwner = null; - condition.signal(); - } catch (Throwable e) { - Logger.defaultLogError(e); - this.close(); // Everything is lost anyway. - throw new IllegalStateException(e); - } finally { - lock.unlock(); - } - } - - void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) { - VirtualGraph vg = graph.getProvider(); - if (null != vg) - return; - lock.lock(); - try { - clusterStream.reallyFlush(); - transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null); - // See below for explanation, the same reasoning should apply here too. - if (graph.writeSupport instanceof WriteOnlySupport) - graph.writeSupport.flushCluster(); - } catch (Throwable e) { - Logger.defaultLogError(e); - this.close(); // Everything is lost anyway. - throw new IllegalStateException(e); - } finally { - lock.unlock(); - } - } - void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) { - VirtualGraph vg = graph.getProvider(); - if (null != vg) - return; - lock.lock(); - try { - transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null); - transactionToken.setCombine(true); - // This was added because test Issue3199Test2 failed because - // write only cluster does not need cluster when allocating resource index - // and the first claim after commitAndContinue2 caused cluster to be loaded - // from server and of course it's resource index was off by one. - if (graph.writeSupport instanceof WriteOnlySupport) - graph.writeSupport.flushCluster(); - } catch (Throwable e) { - Logger.defaultLogError(e); - this.close(); // Everything is lost anyway. - throw new IllegalStateException(e); - } finally { - lock.unlock(); - } - } - Operation getLastOperation() { - return transactionToken.getLastOperation(); - } - long getHeadRevisionId() { - return transactionToken.getHeadRevisionId(); - } -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package fi.vtt.simantics.procore.internal; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.simantics.db.ChangeSet; +import org.simantics.db.Operation; +import org.simantics.db.VirtualGraph; +import org.simantics.db.common.utils.Logger; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.exception.InternalException; +import org.simantics.db.exception.RuntimeDatabaseException; +import org.simantics.db.impl.graph.WriteGraphImpl; +import org.simantics.db.impl.query.QueryProcessor; +import org.simantics.db.request.WriteOnly; +import org.simantics.db.request.WriteTraits; +import org.simantics.db.service.LifecycleSupport.LifecycleState; +import org.simantics.db.service.TransactionPolicySupport; + +import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport; + +class State { + static final int Closed = 1; + static final int Closing = 2; + static final int WriteTransaction = 4; + static final int ReadTransaction = 8; + static final int All = 15; + private volatile int state = 0; // Must be volatile so we don't have to synchronize getState. + private SessionImplSocket session = null; + private GraphSession graphSession = null; + private QueryProcessor queryProvider = null; + private Lock lock = new ReentrantLock(); + private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort. + private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount. + private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount. + private Thread writeOwner = null; + private int asyncCount = 1; + private TransactionToken transactionToken = null; + void setCombine(boolean a) { + if (null != transactionToken) + transactionToken.setCombine(a); + } + void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) { + this.session = session; + this.graphSession = graphSession; + this.queryProvider = queryProvider; + resetTransactionPolicy(); + } + void resetTransactionPolicy() { + TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class); + this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken); + } + int getAsyncCount() { + return asyncCount; + } + int getReadCount() { + return readCount; + } + int getWriteCount() { + return writeCount; + } + boolean isWriteTransaction() { + return 0 != (state & WriteTransaction); + } + boolean isAlive() { + return !isClosed() && !isClosing(); + } + boolean isClosed() { + return 0 != (state & Closed); + } + boolean isClosing() { + return 0 != (state & Closing); + } + boolean isWriting() { + return 0 != (state & WriteTransaction); + } + void close() { + boolean acquired = false; + try { + acquired = lock.tryLock(1, TimeUnit.MINUTES); + } catch (InterruptedException e1) { + } + try { + if (null != transactionToken) + transactionToken.close(); + graphSession.stop(); + } catch (DatabaseException e) { + } finally { + state = Closed; + session.lifecycleSupport.fireListeners(LifecycleState.CLOSED); + if (acquired) { + condition.signalAll(); + lock.unlock(); + } + } + } + void incAsync() + { + lock.lock(); + try { + boolean closed = isClosed(); + boolean closing = isClosing(); + if (closed || closing) + throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing); + ++asyncCount; + } finally { + lock.unlock(); + } + } + void decAsync() + { + lock.lock(); + try { + if (asyncCount < 1) + throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount); + if (0 == --asyncCount) { + condition.signal(); + } + } finally { + lock.unlock(); + } + } + enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock } + WaitStatus waitClosing(long timeout) { + lock.lock(); + try { + if (isClosed()) + return WaitStatus.IsClosed; + else if (isClosing()) + return WaitStatus.IsClosing; + state |= Closing; + try { + if (timeout < 0) { // Infinite timeout + for (int i=0; i<100; i++) + while (asyncCount > 0 || readCount > 0 || + (null != writeOwner && Thread.currentThread() != writeOwner)) { + condition.signalAll(); + condition.awaitNanos(10000000); // nanos + } + while (asyncCount > 0 || readCount > 0 || + (null != writeOwner && Thread.currentThread() != writeOwner)) { + condition.signalAll(); + condition.await(); + } + } else if (timeout > 0) { + boolean interrupted = false; + while (!interrupted && + (asyncCount > 0 || readCount > 0 || + (null != writeOwner && Thread.currentThread() != writeOwner))) + interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS); + } + transactionToken.closing(); + } catch (Exception e) { + e.printStackTrace(); + } + state &= All ^ Closing; + if (readCount == 0 && writeCount == 0 && asyncCount == 0) + return WaitStatus.CanBeClosed; + else if (null != writeOwner && Thread.currentThread() == writeOwner) + return WaitStatus.Deadlock; + return WaitStatus.Timeout; + } finally { + lock.unlock(); + } + } + void startReadTransaction(int thread) throws DatabaseException { + lock.lock(); + try { + assert(readCount == 0); + transactionToken.startReadTransaction(thread); + state |= ReadTransaction; + session.fireStartReadTransaction(); + ++readCount; + } catch (Throwable e) { + throw new DatabaseException("Failed to start read transaction.", e); + } finally { + lock.unlock(); + } + } + void stopReadTransaction() throws DatabaseException { + lock.lock(); + try { + assert (!queryProvider.hasScheduledUpdates()); + assert (readCount == 1); + session.writeSupport.gc(); + transactionToken.stopReadTransaction(); + readCount = 0; + state &= All ^ ReadTransaction; + condition.signal(); + } finally { + lock.unlock(); + } + } + + void startWriteTransaction(int thread) + throws DatabaseException { + lock.lock(); + try { +// System.out.println("stawt"); + boolean closed = isClosed(); + boolean closing = isClosing(); + int ac = asyncCount; + int wc = writeCount; + if (closed || (closing && ac < 1) || wc < 0) + throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount); + if (writeCount == 0) { + transactionToken.startWriteTransaction(thread); + state |= WriteTransaction; + writeOwner = Thread.currentThread(); + session.fireStartWriteTransaction(); + } + ++writeCount; + assert(writeCount == 1); + } catch (Throwable e) { + throw new DatabaseException(e); + } finally { + lock.unlock(); + } + } + + void writeTransactionEnded() { + session.defaultClusterSet = null; + session.fireFinishWriteTransaction(); + } + + void stopWriteTransaction(ClusterStream clusterStream) { + if(!isAlive()) return; + lock.lock(); + try { +// System.out.println("stowt"); + if (writeCount < 1) + throw new IllegalStateException(session + ": writeCount=" + writeCount); + else if (1 == writeCount) { + boolean empty = session.clusterStream.reallyFlush(); + if (!empty) { + String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n" + + "This is probably a serious error. Automatic cancel done as default recovery."; + Logger.defaultLogInfo(msg); + ClientChangesImpl cs = new ClientChangesImpl(session); + SynchronizeContext context = new SynchronizeContext(session, cs, 1); + transactionToken.cancelBegin(session.writeSupport, context, clusterStream); + } + transactionToken.stopWriteTransaction(); + state &= All ^ WriteTransaction; + writeOwner = null; + condition.signal(); + writeTransactionEnded(); + } + --writeCount; + } catch (Throwable e) { + e.printStackTrace(); + this.close(); // Everything is lost anyway. + throw new IllegalStateException(e); + } finally { + lock.unlock(); + } + } + + void cancelWriteTransaction(WriteGraphImpl graph) { + lock.lock(); + try { + if (writeCount < 1) + throw new IllegalStateException(session + ": writeCount=" + writeCount); + VirtualGraph vg = graph.getProvider(); + if (null == vg) { // Skip if virtual graph. + boolean empty = session.clusterStream.reallyFlush(); + if (!empty) { // Something to cancel. + // Send all local changes to server so it can calculate correct reverse change set. + ClientChangesImpl cs = new ClientChangesImpl(session); + SynchronizeContext context = new SynchronizeContext(session, cs, 1); + // This will send cancel and fetch reverse change set from server. + transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream); + try { + final boolean undo = false; + if (!context.isOk(undo)) // this is a blocking operation + throw new InternalException("Cancel failed. This should never happen. Contact application support."); +// System.out.println("session cs: " + session.clientChanges); +// System.out.println("reverse cs: " + cs); + queryProvider.performDirtyUpdates(graph); + queryProvider.performScheduledUpdates(graph); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + // This will send and accept the reverse change set. + transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream); + } + } + session.writeSupport.clearMetadata(); + if (--writeCount == 0) { + transactionToken.stopWriteTransaction(); + state &= All ^ WriteTransaction; + writeOwner = null; + condition.signal(); + writeTransactionEnded(); + } + } catch (Throwable e) { + e.printStackTrace(); + this.close(); // Everything is lost anyway. + throw new IllegalStateException(e); + } finally { + lock.unlock(); + } + } + + void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) { + assert(request != null); + graph.commitAccessorChanges(request); + boolean writeOnly = request instanceof WriteOnly; + lock.lock(); + try { + VirtualGraph vg = graph.getProvider(); + if (writeCount == 1) { + + if (vg != null && clusterStream.isDirty()) + new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace(); + +// long start = System.nanoTime(); + if (null == vg) { + clusterStream.reallyFlush(); + // This was fired just before in handleUpdatesAndMetadata +// if (!writeOnly) +// session.fireMetadataListeners(graph, cs); + } else + clusterStream.clear(); + +// long duration = System.nanoTime() - start; +// System.out.println("reallyFlush " + 1e-9*duration + "s. "); + + session.clientChanges = new ClientChangesImpl(session); + + +// start = System.nanoTime(); + queryProvider.performScheduledUpdates(graph); +// duration = System.nanoTime() - start; +// System.out.println("performScheduledUpdates " + 1e-9*duration + "s. "); + + // Process updates as long as pending primitives exist + while (session.dirtyPrimitives) { + session.dirtyPrimitives = false; + queryProvider.performDirtyUpdates(graph); + queryProvider.performScheduledUpdates(graph); + } + + if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please! + session.fireReactionsToCommit(graph, cs); // Does not throw exception! + + writeTransactionEnded(); + session.writeSupport.gc(); + } else // Only the last writer can commit. + throw new IllegalStateException(session + ": writeCount=" + writeCount); +// long start = System.nanoTime(); +// System.err.println("commit"); + +// start = System.nanoTime(); + if (null == vg) + transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op); + else { + // There should not be meta data because this is transient graph. + // This is an insurance that this is the case for now. + int n = session.writeSupport.clearMetadata(); + if (n > 0) + if (SessionImplSocket.DEBUG) + System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n); + } + transactionToken.stopWriteTransaction(); +// long duration = System.nanoTime() - start; +// System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. "); + + +// long duration = System.nanoTime() - start; +// System.err.println("commit2 " + 1e-9*duration); + state &= All ^ WriteTransaction; + writeCount = 0; + writeOwner = null; + condition.signal(); + } catch (Throwable e) { + Logger.defaultLogError(e); + this.close(); // Everything is lost anyway. + throw new IllegalStateException(e); + } finally { + lock.unlock(); + } + } + + void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) { + VirtualGraph vg = graph.getProvider(); + if (null != vg) + return; + lock.lock(); + try { + clusterStream.reallyFlush(); + transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null); + // See below for explanation, the same reasoning should apply here too. + if (graph.writeSupport instanceof WriteOnlySupport) + graph.writeSupport.flushCluster(); + } catch (Throwable e) { + Logger.defaultLogError(e); + this.close(); // Everything is lost anyway. + throw new IllegalStateException(e); + } finally { + lock.unlock(); + } + } + void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) { + VirtualGraph vg = graph.getProvider(); + if (null != vg) + return; + lock.lock(); + try { + transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null); + transactionToken.setCombine(true); + // This was added because test Issue3199Test2 failed because + // write only cluster does not need cluster when allocating resource index + // and the first claim after commitAndContinue2 caused cluster to be loaded + // from server and of course it's resource index was off by one. + if (graph.writeSupport instanceof WriteOnlySupport) + graph.writeSupport.flushCluster(); + } catch (Throwable e) { + Logger.defaultLogError(e); + this.close(); // Everything is lost anyway. + throw new IllegalStateException(e); + } finally { + lock.unlock(); + } + } + Operation getLastOperation() { + return transactionToken.getLastOperation(); + } + long getHeadRevisionId() { + return transactionToken.getHeadRevisionId(); + } +}