-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.util.concurrent.TimeUnit;\r
-import java.util.concurrent.locks.Condition;\r
-import java.util.concurrent.locks.Lock;\r
-import java.util.concurrent.locks.ReentrantLock;\r
-\r
-import org.simantics.db.ChangeSet;\r
-import org.simantics.db.Operation;\r
-import org.simantics.db.VirtualGraph;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.exception.InternalException;\r
-import org.simantics.db.exception.RuntimeDatabaseException;\r
-import org.simantics.db.impl.graph.WriteGraphImpl;\r
-import org.simantics.db.impl.query.QueryProcessor;\r
-import org.simantics.db.request.WriteOnly;\r
-import org.simantics.db.request.WriteTraits;\r
-import org.simantics.db.service.LifecycleSupport.LifecycleState;\r
-import org.simantics.db.service.TransactionPolicySupport;\r
-\r
-import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;\r
-\r
-class State {\r
- static final int Closed = 1;\r
- static final int Closing = 2;\r
- static final int WriteTransaction = 4;\r
- static final int ReadTransaction = 8;\r
- static final int All = 15;\r
- private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.\r
- private SessionImplSocket session = null;\r
- private GraphSession graphSession = null;\r
- private QueryProcessor queryProvider = null;\r
- private Lock lock = new ReentrantLock();\r
- private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.\r
- private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.\r
- private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.\r
- private Thread writeOwner = null;\r
- private int asyncCount = 1;\r
- private TransactionToken transactionToken = null;\r
- void setCombine(boolean a) {\r
- if (null != transactionToken)\r
- transactionToken.setCombine(a);\r
- }\r
- void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {\r
- this.session = session;\r
- this.graphSession = graphSession;\r
- this.queryProvider = queryProvider;\r
- resetTransactionPolicy();\r
- }\r
- void resetTransactionPolicy() {\r
- TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);\r
- this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);\r
- }\r
- int getAsyncCount() {\r
- return asyncCount;\r
- }\r
- int getReadCount() {\r
- return readCount;\r
- }\r
- int getWriteCount() {\r
- return writeCount;\r
- }\r
- boolean isWriteTransaction() {\r
- return 0 != (state & WriteTransaction);\r
- }\r
- boolean isAlive() {\r
- return !isClosed() && !isClosing();\r
- }\r
- boolean isClosed() {\r
- return 0 != (state & Closed);\r
- }\r
- boolean isClosing() {\r
- return 0 != (state & Closing);\r
- }\r
- boolean isWriting() {\r
- return 0 != (state & WriteTransaction);\r
- }\r
- void close() {\r
- boolean acquired = false;\r
- try {\r
- acquired = lock.tryLock(1, TimeUnit.MINUTES);\r
- } catch (InterruptedException e1) {\r
- }\r
- try {\r
- if (null != transactionToken)\r
- transactionToken.close();\r
- graphSession.stop();\r
- } catch (DatabaseException e) {\r
- } finally {\r
- state = Closed;\r
- session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);\r
- if (acquired) {\r
- condition.signalAll();\r
- lock.unlock();\r
- }\r
- }\r
- }\r
- void incAsync()\r
- {\r
- lock.lock();\r
- try {\r
- boolean closed = isClosed();\r
- boolean closing = isClosing();\r
- if (closed || closing)\r
- throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);\r
- ++asyncCount;\r
- } finally {\r
- lock.unlock();\r
- }\r
- }\r
- void decAsync()\r
- {\r
- lock.lock();\r
- try {\r
- if (asyncCount < 1)\r
- throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);\r
- if (0 == --asyncCount) {\r
- condition.signal();\r
- }\r
- } finally {\r
- lock.unlock();\r
- }\r
- }\r
- enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }\r
- WaitStatus waitClosing(long timeout) {\r
- lock.lock();\r
- try {\r
- if (isClosed())\r
- return WaitStatus.IsClosed;\r
- else if (isClosing())\r
- return WaitStatus.IsClosing;\r
- state |= Closing;\r
- try {\r
- if (timeout < 0) { // Infinite timeout\r
- for (int i=0; i<100; i++)\r
- while (asyncCount > 0 || readCount > 0 ||\r
- (null != writeOwner && Thread.currentThread() != writeOwner)) {\r
- condition.signalAll();\r
- condition.awaitNanos(10000000); // nanos\r
- }\r
- while (asyncCount > 0 || readCount > 0 ||\r
- (null != writeOwner && Thread.currentThread() != writeOwner)) {\r
- condition.signalAll();\r
- condition.await();\r
- }\r
- } else if (timeout > 0) {\r
- boolean interrupted = false;\r
- while (!interrupted &&\r
- (asyncCount > 0 || readCount > 0 ||\r
- (null != writeOwner && Thread.currentThread() != writeOwner)))\r
- interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);\r
- }\r
- transactionToken.closing();\r
- } catch (Exception e) {\r
- e.printStackTrace();\r
- }\r
- state &= All ^ Closing;\r
- if (readCount == 0 && writeCount == 0 && asyncCount == 0)\r
- return WaitStatus.CanBeClosed;\r
- else if (null != writeOwner && Thread.currentThread() == writeOwner)\r
- return WaitStatus.Deadlock;\r
- return WaitStatus.Timeout;\r
- } finally {\r
- lock.unlock();\r
- }\r
- }\r
- void startReadTransaction(int thread) throws DatabaseException {\r
- lock.lock();\r
- try {\r
- assert(readCount == 0);\r
- transactionToken.startReadTransaction(thread);\r
- state |= ReadTransaction;\r
- session.fireStartReadTransaction();\r
- ++readCount;\r
- } catch (Throwable e) {\r
- throw new DatabaseException("Failed to start read transaction.", e);\r
- } finally {\r
- lock.unlock();\r
- }\r
- }\r
- void stopReadTransaction() throws DatabaseException {\r
- lock.lock();\r
- try {\r
- assert (!queryProvider.hasScheduledUpdates());\r
- assert (readCount == 1);\r
- session.writeSupport.gc();\r
- transactionToken.stopReadTransaction();\r
- readCount = 0;\r
- state &= All ^ ReadTransaction;\r
- condition.signal();\r
- } finally {\r
- lock.unlock();\r
- }\r
- }\r
-\r
- void startWriteTransaction(int thread)\r
- throws DatabaseException {\r
- lock.lock();\r
- try {\r
-// System.out.println("stawt");\r
- boolean closed = isClosed();\r
- boolean closing = isClosing();\r
- int ac = asyncCount;\r
- int wc = writeCount;\r
- if (closed || (closing && ac < 1) || wc < 0)\r
- throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);\r
- if (writeCount == 0) {\r
- transactionToken.startWriteTransaction(thread);\r
- state |= WriteTransaction;\r
- writeOwner = Thread.currentThread();\r
- session.fireStartWriteTransaction();\r
- }\r
- ++writeCount;\r
- assert(writeCount == 1);\r
- } catch (Throwable e) {\r
- throw new DatabaseException(e);\r
- } finally {\r
- lock.unlock();\r
- }\r
- }\r
-\r
- void writeTransactionEnded() {\r
- session.defaultClusterSet = null;\r
- session.fireFinishWriteTransaction();\r
- }\r
-\r
- void stopWriteTransaction(ClusterStream clusterStream) {\r
- if(!isAlive()) return;\r
- lock.lock();\r
- try {\r
-// System.out.println("stowt");\r
- if (writeCount < 1)\r
- throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
- else if (1 == writeCount) {\r
- boolean empty = session.clusterStream.reallyFlush();\r
- if (!empty) {\r
- String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"\r
- + "This is probably a serious error. Automatic cancel done as default recovery.";\r
- Logger.defaultLogInfo(msg);\r
- ClientChangesImpl cs = new ClientChangesImpl(session);\r
- SynchronizeContext context = new SynchronizeContext(session, cs, 1);\r
- transactionToken.cancelBegin(session.writeSupport, context, clusterStream);\r
- }\r
- transactionToken.stopWriteTransaction();\r
- state &= All ^ WriteTransaction;\r
- writeOwner = null;\r
- condition.signal();\r
- writeTransactionEnded();\r
- }\r
- --writeCount;\r
- } catch (Throwable e) {\r
- e.printStackTrace();\r
- this.close(); // Everything is lost anyway.\r
- throw new IllegalStateException(e);\r
- } finally {\r
- lock.unlock();\r
- }\r
- }\r
-\r
- void cancelWriteTransaction(WriteGraphImpl graph) {\r
- lock.lock();\r
- try {\r
- if (writeCount < 1)\r
- throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
- VirtualGraph vg = graph.getProvider();\r
- if (null == vg) { // Skip if virtual graph.\r
- boolean empty = session.clusterStream.reallyFlush();\r
- if (!empty) { // Something to cancel.\r
- // Send all local changes to server so it can calculate correct reverse change set.\r
- ClientChangesImpl cs = new ClientChangesImpl(session);\r
- SynchronizeContext context = new SynchronizeContext(session, cs, 1);\r
- // This will send cancel and fetch reverse change set from server.\r
- transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);\r
- try {\r
- final boolean undo = false;\r
- if (!context.isOk(undo)) // this is a blocking operation\r
- throw new InternalException("Cancel failed. This should never happen. Contact application support.");\r
-// System.out.println("session cs: " + session.clientChanges);\r
-// System.out.println("reverse cs: " + cs);\r
- queryProvider.performDirtyUpdates(graph);\r
- queryProvider.performScheduledUpdates(graph);\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- }\r
- // This will send and accept the reverse change set.\r
- transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);\r
- }\r
- }\r
- session.writeSupport.clearMetadata();\r
- if (--writeCount == 0) {\r
- transactionToken.stopWriteTransaction();\r
- state &= All ^ WriteTransaction;\r
- writeOwner = null;\r
- condition.signal();\r
- writeTransactionEnded();\r
- }\r
- } catch (Throwable e) {\r
- e.printStackTrace();\r
- this.close(); // Everything is lost anyway.\r
- throw new IllegalStateException(e);\r
- } finally {\r
- lock.unlock();\r
- }\r
- }\r
-\r
- void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {\r
- assert(request != null);\r
- graph.commitAccessorChanges(request);\r
- boolean writeOnly = request instanceof WriteOnly;\r
- lock.lock();\r
- try {\r
- VirtualGraph vg = graph.getProvider();\r
- if (writeCount == 1) {\r
-\r
- if (vg != null && clusterStream.isDirty())\r
- new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();\r
-\r
-// long start = System.nanoTime();\r
- if (null == vg) {\r
- clusterStream.reallyFlush();\r
- // This was fired just before in handleUpdatesAndMetadata\r
-// if (!writeOnly)\r
-// session.fireMetadataListeners(graph, cs);\r
- } else\r
- clusterStream.clear();\r
-\r
-// long duration = System.nanoTime() - start;\r
-// System.out.println("reallyFlush " + 1e-9*duration + "s. ");\r
-\r
- session.clientChanges = new ClientChangesImpl(session);\r
-\r
-\r
-// start = System.nanoTime();\r
- queryProvider.performScheduledUpdates(graph);\r
-// duration = System.nanoTime() - start;\r
-// System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");\r
-\r
- // Process updates as long as pending primitives exist\r
- while (session.dirtyPrimitives) {\r
- session.dirtyPrimitives = false;\r
- queryProvider.performDirtyUpdates(graph);\r
- queryProvider.performScheduledUpdates(graph);\r
- }\r
-\r
- if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!\r
- session.fireReactionsToCommit(graph, cs); // Does not throw exception!\r
-\r
- writeTransactionEnded();\r
- session.writeSupport.gc();\r
- } else // Only the last writer can commit.\r
- throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
-// long start = System.nanoTime();\r
-// System.err.println("commit");\r
-\r
-// start = System.nanoTime();\r
- if (null == vg)\r
- transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);\r
- else {\r
- // There should not be meta data because this is transient graph.\r
- // This is an insurance that this is the case for now.\r
- int n = session.writeSupport.clearMetadata();\r
- if (n > 0)\r
- if (SessionImplSocket.DEBUG)\r
- System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);\r
- }\r
- transactionToken.stopWriteTransaction();\r
-// long duration = System.nanoTime() - start;\r
-// System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");\r
-\r
-\r
-// long duration = System.nanoTime() - start;\r
-// System.err.println("commit2 " + 1e-9*duration);\r
- state &= All ^ WriteTransaction;\r
- writeCount = 0;\r
- writeOwner = null;\r
- condition.signal();\r
- } catch (Throwable e) {\r
- Logger.defaultLogError(e);\r
- this.close(); // Everything is lost anyway.\r
- throw new IllegalStateException(e);\r
- } finally {\r
- lock.unlock();\r
- }\r
- }\r
-\r
- void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {\r
- VirtualGraph vg = graph.getProvider();\r
- if (null != vg)\r
- return;\r
- lock.lock();\r
- try {\r
- clusterStream.reallyFlush();\r
- transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);\r
- // See below for explanation, the same reasoning should apply here too.\r
- if (graph.writeSupport instanceof WriteOnlySupport)\r
- graph.writeSupport.flushCluster();\r
- } catch (Throwable e) {\r
- Logger.defaultLogError(e);\r
- this.close(); // Everything is lost anyway.\r
- throw new IllegalStateException(e);\r
- } finally {\r
- lock.unlock();\r
- }\r
- }\r
- void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {\r
- VirtualGraph vg = graph.getProvider();\r
- if (null != vg)\r
- return;\r
- lock.lock();\r
- try {\r
- transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);\r
- transactionToken.setCombine(true);\r
- // This was added because test Issue3199Test2 failed because\r
- // write only cluster does not need cluster when allocating resource index\r
- // and the first claim after commitAndContinue2 caused cluster to be loaded\r
- // from server and of course it's resource index was off by one.\r
- if (graph.writeSupport instanceof WriteOnlySupport)\r
- graph.writeSupport.flushCluster();\r
- } catch (Throwable e) {\r
- Logger.defaultLogError(e);\r
- this.close(); // Everything is lost anyway.\r
- throw new IllegalStateException(e);\r
- } finally {\r
- lock.unlock();\r
- }\r
- }\r
- Operation getLastOperation() {\r
- return transactionToken.getLastOperation();\r
- }\r
- long getHeadRevisionId() {\r
- return transactionToken.getHeadRevisionId();\r
- }\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package fi.vtt.simantics.procore.internal;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.simantics.db.ChangeSet;
+import org.simantics.db.Disposable;
+import org.simantics.db.Operation;
+import org.simantics.db.VirtualGraph;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.InternalException;
+import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.graph.WriteGraphImpl;
+import org.simantics.db.impl.query.QueryProcessor;
+import org.simantics.db.request.WriteOnly;
+import org.simantics.db.request.WriteTraits;
+import org.simantics.db.service.LifecycleSupport.LifecycleState;
+import org.simantics.db.service.TransactionPolicySupport;
+
+import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;
+
+class State {
+ static final int Closed = 1;
+ static final int Closing = 2;
+ static final int WriteTransaction = 4;
+ static final int ReadTransaction = 8;
+ static final int All = 15;
+ private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.
+ private SessionImplSocket session = null;
+ private GraphSession graphSession = null;
+ private QueryProcessor queryProvider = null;
+ private Lock lock = new ReentrantLock();
+ private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.
+ private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.
+ private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.
+ private Thread writeOwner = null;
+ private volatile int asyncCount = 1; // Must be volatile so we don't have to synchronize getAsyncCount.
+ private TransactionToken transactionToken = null;
+ void setCombine(boolean a) {
+ if (null != transactionToken)
+ transactionToken.setCombine(a);
+ }
+ void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {
+ this.session = session;
+ this.graphSession = graphSession;
+ this.queryProvider = queryProvider;
+ resetTransactionPolicy();
+ }
+ void resetTransactionPolicy() {
+ TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);
+ this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);
+ }
+ int getAsyncCount() {
+ return asyncCount;
+ }
+ int getReadCount() {
+ return readCount;
+ }
+ int getWriteCount() {
+ return writeCount;
+ }
+ boolean isWriteTransaction() {
+ return 0 != (state & WriteTransaction);
+ }
+ boolean isAlive() {
+ return !isClosed() && !isClosing();
+ }
+ boolean isClosed() {
+ return 0 != (state & Closed);
+ }
+ boolean isClosing() {
+ return 0 != (state & Closing);
+ }
+ boolean isWriting() {
+ return 0 != (state & WriteTransaction);
+ }
+ void close() {
+ boolean acquired = false;
+ try {
+ acquired = lock.tryLock(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e1) {
+ }
+ try {
+ if (null != transactionToken)
+ transactionToken.close();
+ graphSession.stop();
+ } catch (DatabaseException e) {
+ } finally {
+ state = Closed;
+ session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);
+ if (acquired) {
+ condition.signalAll();
+ lock.unlock();
+ }
+ }
+ }
+ void incAsync()
+ {
+ lock.lock();
+ try {
+ boolean closed = isClosed();
+ boolean closing = isClosing();
+ if (closed || closing)
+ throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);
+ ++asyncCount;
+ } finally {
+ lock.unlock();
+ }
+ }
+ void decAsync()
+ {
+ lock.lock();
+ try {
+ if (asyncCount < 1)
+ throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);
+ if (0 == --asyncCount) {
+ condition.signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }
+ WaitStatus waitClosing(long timeout) {
+ lock.lock();
+ try {
+ if (isClosed())
+ return WaitStatus.IsClosed;
+ else if (isClosing())
+ return WaitStatus.IsClosing;
+ state |= Closing;
+ try {
+ if (timeout < 0) { // Infinite timeout
+ for (int i=0; i<100; i++)
+ while (asyncCount > 0 || readCount > 0 ||
+ (null != writeOwner && Thread.currentThread() != writeOwner)) {
+ condition.signalAll();
+ condition.awaitNanos(10000000); // nanos
+ }
+ while (asyncCount > 0 || readCount > 0 ||
+ (null != writeOwner && Thread.currentThread() != writeOwner)) {
+ condition.signalAll();
+ condition.await();
+ }
+ } else if (timeout > 0) {
+ boolean interrupted = false;
+ while (!interrupted &&
+ (asyncCount > 0 || readCount > 0 ||
+ (null != writeOwner && Thread.currentThread() != writeOwner)))
+ interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ transactionToken.closing();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ state &= All ^ Closing;
+ if (readCount == 0 && writeCount == 0 && asyncCount == 0)
+ return WaitStatus.CanBeClosed;
+ else if (null != writeOwner && Thread.currentThread() == writeOwner)
+ return WaitStatus.Deadlock;
+ return WaitStatus.Timeout;
+ } finally {
+ lock.unlock();
+ }
+ }
+ void startReadTransaction(int thread) throws DatabaseException {
+ lock.lock();
+ try {
+ assert(readCount == 0);
+ transactionToken.startReadTransaction(thread);
+ state |= ReadTransaction;
+ session.fireStartReadTransaction();
+ ++readCount;
+ } catch (Throwable e) {
+ throw new DatabaseException("Failed to start read transaction.", e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ void stopReadTransaction() throws DatabaseException {
+ lock.lock();
+ try {
+ assert (!queryProvider.listening.hasScheduledUpdates());
+ assert (readCount == 1);
+ session.writeSupport.gc();
+ transactionToken.stopReadTransaction();
+ readCount = 0;
+ state &= All ^ ReadTransaction;
+ condition.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void startWriteTransaction(int thread)
+ throws DatabaseException {
+ lock.lock();
+ try {
+// System.out.println("stawt");
+ boolean closed = isClosed();
+ boolean closing = isClosing();
+ int ac = asyncCount;
+ int wc = writeCount;
+ if (closed || (closing && ac < 1) || wc < 0)
+ throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);
+ if (writeCount == 0) {
+ transactionToken.startWriteTransaction(thread);
+ state |= WriteTransaction;
+ writeOwner = Thread.currentThread();
+ session.fireStartWriteTransaction();
+ }
+ ++writeCount;
+ assert(writeCount == 1);
+ } catch (Throwable e) {
+ throw new DatabaseException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void writeTransactionEnded() {
+ session.defaultClusterSet = null;
+ session.fireFinishWriteTransaction();
+ }
+
+ void stopWriteTransaction(ClusterStream clusterStream) {
+ if(!isAlive()) return;
+ lock.lock();
+ try {
+// System.out.println("stowt");
+ if (writeCount < 1)
+ throw new IllegalStateException(session + ": writeCount=" + writeCount);
+ else if (1 == writeCount) {
+ boolean empty = session.clusterStream.reallyFlush();
+ if (!empty) {
+ String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"
+ + "This is probably a serious error. Automatic cancel done as default recovery.";
+ Logger.defaultLogInfo(msg);
+ ClientChangesImpl cs = new ClientChangesImpl(session);
+ SynchronizeContext context = new SynchronizeContext(session, cs, 1);
+ transactionToken.cancelBegin(session.writeSupport, context, clusterStream);
+ }
+ transactionToken.stopWriteTransaction();
+ state &= All ^ WriteTransaction;
+ writeOwner = null;
+ condition.signal();
+ writeTransactionEnded();
+ }
+ --writeCount;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ this.close(); // Everything is lost anyway.
+ throw new IllegalStateException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void cancelWriteTransaction(WriteGraphImpl graph) {
+ lock.lock();
+ try {
+ if (writeCount < 1)
+ throw new IllegalStateException(session + ": writeCount=" + writeCount);
+ VirtualGraph vg = graph.getProvider();
+ if (null == vg) { // Skip if virtual graph.
+ boolean empty = session.clusterStream.reallyFlush();
+ if (!empty) { // Something to cancel.
+ // Send all local changes to server so it can calculate correct reverse change set.
+ ClientChangesImpl cs = new ClientChangesImpl(session);
+ SynchronizeContext context = new SynchronizeContext(session, cs, 1);
+ // This will send cancel and fetch reverse change set from server.
+ transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);
+ try {
+ final boolean undo = false;
+ if (!context.isOk(undo)) // this is a blocking operation
+ throw new InternalException("Cancel failed. This should never happen.");
+// System.out.println("session cs: " + session.clientChanges);
+// System.out.println("reverse cs: " + cs);
+ queryProvider.propagateChangesInQueryCache(graph);
+ queryProvider.listening.fireListeners(graph);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+ // This will send and accept the reverse change set.
+ transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
+ } else {
+ queryProvider.propagateChangesInQueryCache(graph);
+ queryProvider.listening.fireListeners(graph);
+ }
+ }
+ session.writeSupport.clearMetadata();
+ if (--writeCount == 0) {
+ transactionToken.stopWriteTransaction();
+ state &= All ^ WriteTransaction;
+ writeOwner = null;
+ condition.signal();
+ writeTransactionEnded();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ this.close(); // Everything is lost anyway.
+ throw new IllegalStateException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {
+ assert(request != null);
+ graph.commitAccessorChanges(request);
+ boolean writeOnly = request instanceof WriteOnly;
+ lock.lock();
+ try {
+ VirtualGraph vg = graph.getProvider();
+ if (writeCount == 1) {
+
+ if (vg != null && clusterStream.isDirty())
+ new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();
+
+ // This is needed even when the write targets a virtual graph -
+ // deny can always remove a persistent statement.
+ clusterStream.reallyFlush();
+
+ session.clientChanges = new ClientChangesImpl(session);
+
+
+// start = System.nanoTime();
+ queryProvider.propagateChangesInQueryCache(graph);
+ ReadGraphImpl listenerGraph = graph.forRecompute(null);
+ listenerGraph.asyncBarrier.inc();
+ queryProvider.listening.fireListeners(listenerGraph);
+ listenerGraph.asyncBarrier.dec();
+// duration = System.nanoTime() - start;
+// System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
+
+ // Process updates as long as pending primitives exist
+ while (session.dirtyPrimitives) {
+ session.dirtyPrimitives = false;
+ queryProvider.propagateChangesInQueryCache(graph);
+ queryProvider.listening.fireListeners(graph);
+ }
+
+ if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
+ session.fireReactionsToCommit(graph, cs); // Does not throw exception!
+
+ writeTransactionEnded();
+ session.writeSupport.gc();
+ } else // Only the last writer can commit.
+ throw new IllegalStateException(session + ": writeCount=" + writeCount);
+// long start = System.nanoTime();
+// System.err.println("commit");
+
+// start = System.nanoTime();
+ if (null == vg)
+ transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);
+ else {
+ // There should not be meta data because this is transient graph.
+ // This is an insurance that this is the case for now.
+ int n = session.writeSupport.clearMetadata();
+ if (n > 0)
+ if (SessionImplSocket.DEBUG)
+ System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);
+ }
+ transactionToken.stopWriteTransaction();
+// long duration = System.nanoTime() - start;
+// System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");
+
+
+// long duration = System.nanoTime() - start;
+// System.err.println("commit2 " + 1e-9*duration);
+ state &= All ^ WriteTransaction;
+ writeCount = 0;
+ writeOwner = null;
+ condition.signal();
+ } catch (Throwable e) {
+ Logger.defaultLogError(e);
+ this.close(); // Everything is lost anyway.
+ throw new IllegalStateException(e);
+ } finally {
+ Disposable.safeDispose(cs);
+ lock.unlock();
+ }
+ }
+
+ void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
+ VirtualGraph vg = graph.getProvider();
+ if (null != vg)
+ return;
+ lock.lock();
+ try {
+ clusterStream.reallyFlush();
+ transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
+ // See below for explanation, the same reasoning should apply here too.
+ if (graph.writeSupport instanceof WriteOnlySupport)
+ graph.writeSupport.flushCluster();
+ } catch (Throwable e) {
+ Logger.defaultLogError(e);
+ this.close(); // Everything is lost anyway.
+ throw new IllegalStateException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
+ VirtualGraph vg = graph.getProvider();
+ if (null != vg)
+ return;
+ lock.lock();
+ try {
+ transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
+ transactionToken.setCombine(true);
+ // This was added because test Issue3199Test2 failed because
+ // write only cluster does not need cluster when allocating resource index
+ // and the first claim after commitAndContinue2 caused cluster to be loaded
+ // from server and of course it's resource index was off by one.
+ if (graph.writeSupport instanceof WriteOnlySupport)
+ graph.writeSupport.flushCluster();
+ } catch (Throwable e) {
+ Logger.defaultLogError(e);
+ this.close(); // Everything is lost anyway.
+ throw new IllegalStateException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ Operation getLastOperation() {
+ return transactionToken.getLastOperation();
+ }
+ long getHeadRevisionId() {
+ return transactionToken.getHeadRevisionId();
+ }
+}