]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java
Fixed all line endings of the repository
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / State.java
index 8345c7877b817ab54c90b500e500385bbece519c..5adc593001cd19ae305c5fb3e00dfe181b0003e0 100644 (file)
-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- *     VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.util.concurrent.TimeUnit;\r
-import java.util.concurrent.locks.Condition;\r
-import java.util.concurrent.locks.Lock;\r
-import java.util.concurrent.locks.ReentrantLock;\r
-\r
-import org.simantics.db.ChangeSet;\r
-import org.simantics.db.Operation;\r
-import org.simantics.db.VirtualGraph;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.exception.InternalException;\r
-import org.simantics.db.exception.RuntimeDatabaseException;\r
-import org.simantics.db.impl.graph.WriteGraphImpl;\r
-import org.simantics.db.impl.query.QueryProcessor;\r
-import org.simantics.db.request.WriteOnly;\r
-import org.simantics.db.request.WriteTraits;\r
-import org.simantics.db.service.LifecycleSupport.LifecycleState;\r
-import org.simantics.db.service.TransactionPolicySupport;\r
-\r
-import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;\r
-\r
-class State {\r
-    static final int          Closed           = 1;\r
-    static final int          Closing          = 2;\r
-    static final int          WriteTransaction = 4;\r
-    static final int          ReadTransaction  = 8;\r
-    static final int          All              = 15;\r
-    private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.\r
-    private SessionImplSocket session          = null;\r
-    private GraphSession      graphSession     = null;\r
-    private QueryProcessor    queryProvider    = null;\r
-    private Lock              lock             = new ReentrantLock();\r
-    private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.\r
-    private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.\r
-    private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.\r
-    private Thread            writeOwner       = null;\r
-    private int               asyncCount       = 1;\r
-    private TransactionToken  transactionToken = null;\r
-    void setCombine(boolean a) {\r
-        if (null != transactionToken)\r
-            transactionToken.setCombine(a);\r
-    }\r
-    void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {\r
-        this.session = session;\r
-        this.graphSession = graphSession;\r
-        this.queryProvider = queryProvider;\r
-        resetTransactionPolicy();\r
-    }\r
-    void resetTransactionPolicy() {\r
-        TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);\r
-        this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);\r
-    }\r
-    int getAsyncCount() {\r
-        return asyncCount;\r
-    }\r
-    int getReadCount() {\r
-        return readCount;\r
-    }\r
-    int getWriteCount() {\r
-        return writeCount;\r
-    }\r
-    boolean isWriteTransaction() {\r
-        return 0 != (state & WriteTransaction);\r
-    }\r
-    boolean isAlive() {\r
-        return !isClosed() && !isClosing();\r
-    }\r
-    boolean isClosed() {\r
-        return 0 != (state & Closed);\r
-    }\r
-    boolean isClosing() {\r
-        return 0 != (state & Closing);\r
-    }\r
-    boolean isWriting() {\r
-        return 0 != (state & WriteTransaction);\r
-    }\r
-    void close() {\r
-        boolean acquired = false;\r
-        try {\r
-            acquired = lock.tryLock(1, TimeUnit.MINUTES);\r
-        } catch (InterruptedException e1) {\r
-        }\r
-        try {\r
-            if (null != transactionToken)\r
-                transactionToken.close();\r
-            graphSession.stop();\r
-        } catch (DatabaseException e) {\r
-        } finally {\r
-            state = Closed;\r
-            session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);\r
-            if (acquired) {\r
-                condition.signalAll();\r
-                lock.unlock();\r
-            }\r
-        }\r
-    }\r
-    void incAsync()\r
-    {\r
-        lock.lock();\r
-        try {\r
-            boolean closed = isClosed();\r
-            boolean closing = isClosing();\r
-            if (closed || closing)\r
-                throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);\r
-            ++asyncCount;\r
-        } finally {\r
-            lock.unlock();\r
-        }\r
-    }\r
-    void decAsync()\r
-    {\r
-        lock.lock();\r
-        try {\r
-            if (asyncCount < 1)\r
-                throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);\r
-            if (0 == --asyncCount) {\r
-                condition.signal();\r
-            }\r
-        } finally {\r
-            lock.unlock();\r
-        }\r
-    }\r
-    enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }\r
-    WaitStatus waitClosing(long timeout) {\r
-        lock.lock();\r
-        try {\r
-            if (isClosed())\r
-                return WaitStatus.IsClosed;\r
-            else if (isClosing())\r
-                return WaitStatus.IsClosing;\r
-            state |= Closing;\r
-            try {\r
-                if (timeout < 0) { // Infinite timeout\r
-                    for (int i=0; i<100; i++)\r
-                    while (asyncCount > 0  || readCount > 0 ||\r
-                           (null != writeOwner && Thread.currentThread() != writeOwner)) {\r
-                        condition.signalAll();\r
-                        condition.awaitNanos(10000000); // nanos\r
-                    }\r
-                    while (asyncCount > 0  || readCount > 0 ||\r
-                            (null != writeOwner && Thread.currentThread() != writeOwner)) {\r
-                         condition.signalAll();\r
-                         condition.await();\r
-                     }\r
-                } else if (timeout > 0) {\r
-                    boolean interrupted = false;\r
-                    while (!interrupted &&\r
-                           (asyncCount > 0  || readCount > 0 ||\r
-                            (null != writeOwner && Thread.currentThread() != writeOwner)))\r
-                        interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);\r
-                }\r
-                transactionToken.closing();\r
-            } catch (Exception e) {\r
-                e.printStackTrace();\r
-            }\r
-            state &= All ^ Closing;\r
-            if (readCount == 0 && writeCount == 0 && asyncCount == 0)\r
-                return WaitStatus.CanBeClosed;\r
-            else if (null != writeOwner && Thread.currentThread() == writeOwner)\r
-                return WaitStatus.Deadlock;\r
-            return WaitStatus.Timeout;\r
-        } finally {\r
-            lock.unlock();\r
-        }\r
-    }\r
-    void startReadTransaction(int thread) throws DatabaseException {\r
-        lock.lock();\r
-        try {\r
-            assert(readCount == 0);\r
-            transactionToken.startReadTransaction(thread);\r
-            state |= ReadTransaction;\r
-            session.fireStartReadTransaction();\r
-            ++readCount;\r
-        } catch (Throwable e) {\r
-            throw new DatabaseException("Failed to start read transaction.", e);\r
-        } finally {\r
-            lock.unlock();\r
-        }\r
-    }\r
-    void stopReadTransaction() throws DatabaseException {\r
-        lock.lock();\r
-        try {\r
-            assert (!queryProvider.hasScheduledUpdates());\r
-            assert (readCount == 1);\r
-            session.writeSupport.gc();\r
-            transactionToken.stopReadTransaction();\r
-            readCount = 0;\r
-            state &= All ^ ReadTransaction;\r
-            condition.signal();\r
-        } finally {\r
-            lock.unlock();\r
-        }\r
-    }\r
-\r
-    void startWriteTransaction(int thread)\r
-    throws DatabaseException {\r
-        lock.lock();\r
-        try {\r
-//            System.out.println("stawt");\r
-            boolean closed = isClosed();\r
-            boolean closing = isClosing();\r
-            int ac = asyncCount;\r
-            int wc = writeCount;\r
-            if (closed || (closing && ac < 1) || wc < 0)\r
-                throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);\r
-            if (writeCount == 0) {\r
-                transactionToken.startWriteTransaction(thread);\r
-                state |= WriteTransaction;\r
-                writeOwner = Thread.currentThread();\r
-                session.fireStartWriteTransaction();\r
-            }\r
-            ++writeCount;\r
-            assert(writeCount == 1);\r
-        } catch (Throwable e) {\r
-            throw new DatabaseException(e);\r
-        } finally {\r
-            lock.unlock();\r
-        }\r
-    }\r
-\r
-    void writeTransactionEnded() {\r
-        session.defaultClusterSet = null;\r
-        session.fireFinishWriteTransaction();\r
-    }\r
-\r
-    void stopWriteTransaction(ClusterStream clusterStream) {\r
-        if(!isAlive()) return;\r
-        lock.lock();\r
-        try {\r
-//            System.out.println("stowt");\r
-            if (writeCount < 1)\r
-                throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
-            else if (1 == writeCount) {\r
-                boolean empty = session.clusterStream.reallyFlush();\r
-                if (!empty) {\r
-                    String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"\r
-                            + "This is probably a serious error. Automatic cancel done as default recovery.";\r
-                    Logger.defaultLogInfo(msg);\r
-                    ClientChangesImpl cs = new ClientChangesImpl(session);\r
-                    SynchronizeContext context = new SynchronizeContext(session, cs, 1);\r
-                    transactionToken.cancelBegin(session.writeSupport, context, clusterStream);\r
-                }\r
-                transactionToken.stopWriteTransaction();\r
-                state &= All ^ WriteTransaction;\r
-                writeOwner = null;\r
-                condition.signal();\r
-                writeTransactionEnded();\r
-            }\r
-            --writeCount;\r
-        } catch (Throwable e) {\r
-            e.printStackTrace();\r
-            this.close(); // Everything is lost anyway.\r
-            throw new IllegalStateException(e);\r
-        } finally {\r
-            lock.unlock();\r
-        }\r
-    }\r
-\r
-    void cancelWriteTransaction(WriteGraphImpl graph) {\r
-        lock.lock();\r
-        try {\r
-            if (writeCount < 1)\r
-                throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
-            VirtualGraph vg = graph.getProvider();\r
-            if (null == vg) { // Skip if virtual graph.\r
-                boolean empty = session.clusterStream.reallyFlush();\r
-                if (!empty) { // Something to cancel.\r
-                    // Send all local changes to server so it can calculate correct reverse change set.\r
-                    ClientChangesImpl cs = new ClientChangesImpl(session);\r
-                    SynchronizeContext context = new SynchronizeContext(session, cs, 1);\r
-                    // This will send cancel and fetch reverse change set from server.\r
-                    transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);\r
-                    try {\r
-                        final boolean undo = false;\r
-                        if (!context.isOk(undo)) // this is a blocking operation\r
-                            throw new InternalException("Cancel failed. This should never happen. Contact application support.");\r
-//                        System.out.println("session cs: " + session.clientChanges);\r
-//                        System.out.println("reverse cs: " + cs);\r
-                        queryProvider.performDirtyUpdates(graph);\r
-                        queryProvider.performScheduledUpdates(graph);\r
-                    } catch (DatabaseException e) {\r
-                        Logger.defaultLogError(e);\r
-                    }\r
-                    // This will send and accept the reverse change set.\r
-                    transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);\r
-                }\r
-            }\r
-            session.writeSupport.clearMetadata();\r
-            if (--writeCount == 0) {\r
-                transactionToken.stopWriteTransaction();\r
-                state &= All ^ WriteTransaction;\r
-                writeOwner = null;\r
-                condition.signal();\r
-                writeTransactionEnded();\r
-            }\r
-        } catch (Throwable e) {\r
-            e.printStackTrace();\r
-            this.close(); // Everything is lost anyway.\r
-            throw new IllegalStateException(e);\r
-        } finally {\r
-            lock.unlock();\r
-        }\r
-    }\r
-\r
-    void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {\r
-        assert(request != null);\r
-        graph.commitAccessorChanges(request);\r
-        boolean writeOnly = request instanceof WriteOnly;\r
-        lock.lock();\r
-        try {\r
-            VirtualGraph vg = graph.getProvider();\r
-            if (writeCount == 1) {\r
-\r
-                if (vg != null && clusterStream.isDirty())\r
-                    new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();\r
-\r
-//                long start = System.nanoTime();\r
-                if (null == vg) {\r
-                    clusterStream.reallyFlush();\r
-                    // This was fired just before in handleUpdatesAndMetadata\r
-//                    if (!writeOnly)\r
-//                        session.fireMetadataListeners(graph, cs);\r
-                } else\r
-                    clusterStream.clear();\r
-\r
-//                long duration = System.nanoTime() - start;\r
-//                System.out.println("reallyFlush " + 1e-9*duration + "s. ");\r
-\r
-                session.clientChanges = new ClientChangesImpl(session);\r
-\r
-\r
-//                start = System.nanoTime();\r
-                queryProvider.performScheduledUpdates(graph);\r
-//                duration = System.nanoTime() - start;\r
-//                System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");\r
-\r
-                // Process updates as long as pending primitives exist\r
-                while (session.dirtyPrimitives) {\r
-                    session.dirtyPrimitives = false;\r
-                    queryProvider.performDirtyUpdates(graph);\r
-                    queryProvider.performScheduledUpdates(graph);\r
-                }\r
-\r
-                if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!\r
-                    session.fireReactionsToCommit(graph, cs); // Does not throw exception!\r
-\r
-                writeTransactionEnded();\r
-                session.writeSupport.gc();\r
-            } else // Only the last writer can commit.\r
-                throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
-//            long start = System.nanoTime();\r
-//            System.err.println("commit");\r
-\r
-//            start = System.nanoTime();\r
-            if (null == vg)\r
-                transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);\r
-            else {\r
-                // There should not be meta data because this is transient graph.\r
-                // This is an insurance that this is the case for now.\r
-                int n = session.writeSupport.clearMetadata();\r
-                if (n > 0)\r
-                    if (SessionImplSocket.DEBUG)\r
-                        System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);\r
-            }\r
-            transactionToken.stopWriteTransaction();\r
-//            long duration = System.nanoTime() - start;\r
-//            System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");\r
-\r
-\r
-//            long duration = System.nanoTime() - start;\r
-//            System.err.println("commit2 " + 1e-9*duration);\r
-            state &= All ^ WriteTransaction;\r
-            writeCount = 0;\r
-            writeOwner = null;\r
-            condition.signal();\r
-        } catch (Throwable e) {\r
-            Logger.defaultLogError(e);\r
-            this.close(); // Everything is lost anyway.\r
-            throw new IllegalStateException(e);\r
-        } finally {\r
-            lock.unlock();\r
-        }\r
-    }\r
-\r
-    void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {\r
-        VirtualGraph vg = graph.getProvider();\r
-        if (null != vg)\r
-            return;\r
-        lock.lock();\r
-        try {\r
-            clusterStream.reallyFlush();\r
-            transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);\r
-            // See below for explanation, the same reasoning should apply here too.\r
-            if (graph.writeSupport instanceof WriteOnlySupport)\r
-                graph.writeSupport.flushCluster();\r
-        } catch (Throwable e) {\r
-            Logger.defaultLogError(e);\r
-            this.close(); // Everything is lost anyway.\r
-            throw new IllegalStateException(e);\r
-        } finally {\r
-            lock.unlock();\r
-        }\r
-    }\r
-    void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {\r
-        VirtualGraph vg = graph.getProvider();\r
-        if (null != vg)\r
-            return;\r
-        lock.lock();\r
-        try {\r
-            transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);\r
-            transactionToken.setCombine(true);\r
-            // This was added because test Issue3199Test2 failed because\r
-            // write only cluster does not need cluster when allocating resource index\r
-            // and the first claim after commitAndContinue2 caused cluster to be loaded\r
-            // from server and of course it's resource index was off by one.\r
-            if (graph.writeSupport instanceof WriteOnlySupport)\r
-                graph.writeSupport.flushCluster();\r
-        } catch (Throwable e) {\r
-            Logger.defaultLogError(e);\r
-            this.close(); // Everything is lost anyway.\r
-            throw new IllegalStateException(e);\r
-        } finally {\r
-            lock.unlock();\r
-        }\r
-    }\r
-    Operation getLastOperation() {\r
-        return transactionToken.getLastOperation();\r
-    }\r
-    long getHeadRevisionId() {\r
-        return transactionToken.getHeadRevisionId();\r
-    }\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package fi.vtt.simantics.procore.internal;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.simantics.db.ChangeSet;
+import org.simantics.db.Operation;
+import org.simantics.db.VirtualGraph;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.InternalException;
+import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.impl.graph.WriteGraphImpl;
+import org.simantics.db.impl.query.QueryProcessor;
+import org.simantics.db.request.WriteOnly;
+import org.simantics.db.request.WriteTraits;
+import org.simantics.db.service.LifecycleSupport.LifecycleState;
+import org.simantics.db.service.TransactionPolicySupport;
+
+import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;
+
+class State {
+    static final int          Closed           = 1;
+    static final int          Closing          = 2;
+    static final int          WriteTransaction = 4;
+    static final int          ReadTransaction  = 8;
+    static final int          All              = 15;
+    private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.
+    private SessionImplSocket session          = null;
+    private GraphSession      graphSession     = null;
+    private QueryProcessor    queryProvider    = null;
+    private Lock              lock             = new ReentrantLock();
+    private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.
+    private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.
+    private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.
+    private Thread            writeOwner       = null;
+    private int               asyncCount       = 1;
+    private TransactionToken  transactionToken = null;
+    void setCombine(boolean a) {
+        if (null != transactionToken)
+            transactionToken.setCombine(a);
+    }
+    void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {
+        this.session = session;
+        this.graphSession = graphSession;
+        this.queryProvider = queryProvider;
+        resetTransactionPolicy();
+    }
+    void resetTransactionPolicy() {
+        TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);
+        this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);
+    }
+    int getAsyncCount() {
+        return asyncCount;
+    }
+    int getReadCount() {
+        return readCount;
+    }
+    int getWriteCount() {
+        return writeCount;
+    }
+    boolean isWriteTransaction() {
+        return 0 != (state & WriteTransaction);
+    }
+    boolean isAlive() {
+        return !isClosed() && !isClosing();
+    }
+    boolean isClosed() {
+        return 0 != (state & Closed);
+    }
+    boolean isClosing() {
+        return 0 != (state & Closing);
+    }
+    boolean isWriting() {
+        return 0 != (state & WriteTransaction);
+    }
+    void close() {
+        boolean acquired = false;
+        try {
+            acquired = lock.tryLock(1, TimeUnit.MINUTES);
+        } catch (InterruptedException e1) {
+        }
+        try {
+            if (null != transactionToken)
+                transactionToken.close();
+            graphSession.stop();
+        } catch (DatabaseException e) {
+        } finally {
+            state = Closed;
+            session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);
+            if (acquired) {
+                condition.signalAll();
+                lock.unlock();
+            }
+        }
+    }
+    void incAsync()
+    {
+        lock.lock();
+        try {
+            boolean closed = isClosed();
+            boolean closing = isClosing();
+            if (closed || closing)
+                throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);
+            ++asyncCount;
+        } finally {
+            lock.unlock();
+        }
+    }
+    void decAsync()
+    {
+        lock.lock();
+        try {
+            if (asyncCount < 1)
+                throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);
+            if (0 == --asyncCount) {
+                condition.signal();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+    enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }
+    WaitStatus waitClosing(long timeout) {
+        lock.lock();
+        try {
+            if (isClosed())
+                return WaitStatus.IsClosed;
+            else if (isClosing())
+                return WaitStatus.IsClosing;
+            state |= Closing;
+            try {
+                if (timeout < 0) { // Infinite timeout
+                    for (int i=0; i<100; i++)
+                    while (asyncCount > 0  || readCount > 0 ||
+                           (null != writeOwner && Thread.currentThread() != writeOwner)) {
+                        condition.signalAll();
+                        condition.awaitNanos(10000000); // nanos
+                    }
+                    while (asyncCount > 0  || readCount > 0 ||
+                            (null != writeOwner && Thread.currentThread() != writeOwner)) {
+                         condition.signalAll();
+                         condition.await();
+                     }
+                } else if (timeout > 0) {
+                    boolean interrupted = false;
+                    while (!interrupted &&
+                           (asyncCount > 0  || readCount > 0 ||
+                            (null != writeOwner && Thread.currentThread() != writeOwner)))
+                        interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);
+                }
+                transactionToken.closing();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            state &= All ^ Closing;
+            if (readCount == 0 && writeCount == 0 && asyncCount == 0)
+                return WaitStatus.CanBeClosed;
+            else if (null != writeOwner && Thread.currentThread() == writeOwner)
+                return WaitStatus.Deadlock;
+            return WaitStatus.Timeout;
+        } finally {
+            lock.unlock();
+        }
+    }
+    void startReadTransaction(int thread) throws DatabaseException {
+        lock.lock();
+        try {
+            assert(readCount == 0);
+            transactionToken.startReadTransaction(thread);
+            state |= ReadTransaction;
+            session.fireStartReadTransaction();
+            ++readCount;
+        } catch (Throwable e) {
+            throw new DatabaseException("Failed to start read transaction.", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+    void stopReadTransaction() throws DatabaseException {
+        lock.lock();
+        try {
+            assert (!queryProvider.hasScheduledUpdates());
+            assert (readCount == 1);
+            session.writeSupport.gc();
+            transactionToken.stopReadTransaction();
+            readCount = 0;
+            state &= All ^ ReadTransaction;
+            condition.signal();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    void startWriteTransaction(int thread)
+    throws DatabaseException {
+        lock.lock();
+        try {
+//            System.out.println("stawt");
+            boolean closed = isClosed();
+            boolean closing = isClosing();
+            int ac = asyncCount;
+            int wc = writeCount;
+            if (closed || (closing && ac < 1) || wc < 0)
+                throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);
+            if (writeCount == 0) {
+                transactionToken.startWriteTransaction(thread);
+                state |= WriteTransaction;
+                writeOwner = Thread.currentThread();
+                session.fireStartWriteTransaction();
+            }
+            ++writeCount;
+            assert(writeCount == 1);
+        } catch (Throwable e) {
+            throw new DatabaseException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    void writeTransactionEnded() {
+        session.defaultClusterSet = null;
+        session.fireFinishWriteTransaction();
+    }
+
+    void stopWriteTransaction(ClusterStream clusterStream) {
+        if(!isAlive()) return;
+        lock.lock();
+        try {
+//            System.out.println("stowt");
+            if (writeCount < 1)
+                throw new IllegalStateException(session + ": writeCount=" + writeCount);
+            else if (1 == writeCount) {
+                boolean empty = session.clusterStream.reallyFlush();
+                if (!empty) {
+                    String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"
+                            + "This is probably a serious error. Automatic cancel done as default recovery.";
+                    Logger.defaultLogInfo(msg);
+                    ClientChangesImpl cs = new ClientChangesImpl(session);
+                    SynchronizeContext context = new SynchronizeContext(session, cs, 1);
+                    transactionToken.cancelBegin(session.writeSupport, context, clusterStream);
+                }
+                transactionToken.stopWriteTransaction();
+                state &= All ^ WriteTransaction;
+                writeOwner = null;
+                condition.signal();
+                writeTransactionEnded();
+            }
+            --writeCount;
+        } catch (Throwable e) {
+            e.printStackTrace();
+            this.close(); // Everything is lost anyway.
+            throw new IllegalStateException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    void cancelWriteTransaction(WriteGraphImpl graph) {
+        lock.lock();
+        try {
+            if (writeCount < 1)
+                throw new IllegalStateException(session + ": writeCount=" + writeCount);
+            VirtualGraph vg = graph.getProvider();
+            if (null == vg) { // Skip if virtual graph.
+                boolean empty = session.clusterStream.reallyFlush();
+                if (!empty) { // Something to cancel.
+                    // Send all local changes to server so it can calculate correct reverse change set.
+                    ClientChangesImpl cs = new ClientChangesImpl(session);
+                    SynchronizeContext context = new SynchronizeContext(session, cs, 1);
+                    // This will send cancel and fetch reverse change set from server.
+                    transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);
+                    try {
+                        final boolean undo = false;
+                        if (!context.isOk(undo)) // this is a blocking operation
+                            throw new InternalException("Cancel failed. This should never happen. Contact application support.");
+//                        System.out.println("session cs: " + session.clientChanges);
+//                        System.out.println("reverse cs: " + cs);
+                        queryProvider.performDirtyUpdates(graph);
+                        queryProvider.performScheduledUpdates(graph);
+                    } catch (DatabaseException e) {
+                        Logger.defaultLogError(e);
+                    }
+                    // This will send and accept the reverse change set.
+                    transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
+                }
+            }
+            session.writeSupport.clearMetadata();
+            if (--writeCount == 0) {
+                transactionToken.stopWriteTransaction();
+                state &= All ^ WriteTransaction;
+                writeOwner = null;
+                condition.signal();
+                writeTransactionEnded();
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+            this.close(); // Everything is lost anyway.
+            throw new IllegalStateException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {
+        assert(request != null);
+        graph.commitAccessorChanges(request);
+        boolean writeOnly = request instanceof WriteOnly;
+        lock.lock();
+        try {
+            VirtualGraph vg = graph.getProvider();
+            if (writeCount == 1) {
+
+                if (vg != null && clusterStream.isDirty())
+                    new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();
+
+//                long start = System.nanoTime();
+                if (null == vg) {
+                    clusterStream.reallyFlush();
+                    // This was fired just before in handleUpdatesAndMetadata
+//                    if (!writeOnly)
+//                        session.fireMetadataListeners(graph, cs);
+                } else
+                    clusterStream.clear();
+
+//                long duration = System.nanoTime() - start;
+//                System.out.println("reallyFlush " + 1e-9*duration + "s. ");
+
+                session.clientChanges = new ClientChangesImpl(session);
+
+
+//                start = System.nanoTime();
+                queryProvider.performScheduledUpdates(graph);
+//                duration = System.nanoTime() - start;
+//                System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
+
+                // Process updates as long as pending primitives exist
+                while (session.dirtyPrimitives) {
+                    session.dirtyPrimitives = false;
+                    queryProvider.performDirtyUpdates(graph);
+                    queryProvider.performScheduledUpdates(graph);
+                }
+
+                if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
+                    session.fireReactionsToCommit(graph, cs); // Does not throw exception!
+
+                writeTransactionEnded();
+                session.writeSupport.gc();
+            } else // Only the last writer can commit.
+                throw new IllegalStateException(session + ": writeCount=" + writeCount);
+//            long start = System.nanoTime();
+//            System.err.println("commit");
+
+//            start = System.nanoTime();
+            if (null == vg)
+                transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);
+            else {
+                // There should not be meta data because this is transient graph.
+                // This is an insurance that this is the case for now.
+                int n = session.writeSupport.clearMetadata();
+                if (n > 0)
+                    if (SessionImplSocket.DEBUG)
+                        System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);
+            }
+            transactionToken.stopWriteTransaction();
+//            long duration = System.nanoTime() - start;
+//            System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");
+
+
+//            long duration = System.nanoTime() - start;
+//            System.err.println("commit2 " + 1e-9*duration);
+            state &= All ^ WriteTransaction;
+            writeCount = 0;
+            writeOwner = null;
+            condition.signal();
+        } catch (Throwable e) {
+            Logger.defaultLogError(e);
+            this.close(); // Everything is lost anyway.
+            throw new IllegalStateException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
+        VirtualGraph vg = graph.getProvider();
+        if (null != vg)
+            return;
+        lock.lock();
+        try {
+            clusterStream.reallyFlush();
+            transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
+            // See below for explanation, the same reasoning should apply here too.
+            if (graph.writeSupport instanceof WriteOnlySupport)
+                graph.writeSupport.flushCluster();
+        } catch (Throwable e) {
+            Logger.defaultLogError(e);
+            this.close(); // Everything is lost anyway.
+            throw new IllegalStateException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+    void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
+        VirtualGraph vg = graph.getProvider();
+        if (null != vg)
+            return;
+        lock.lock();
+        try {
+            transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
+            transactionToken.setCombine(true);
+            // This was added because test Issue3199Test2 failed because
+            // write only cluster does not need cluster when allocating resource index
+            // and the first claim after commitAndContinue2 caused cluster to be loaded
+            // from server and of course it's resource index was off by one.
+            if (graph.writeSupport instanceof WriteOnlySupport)
+                graph.writeSupport.flushCluster();
+        } catch (Throwable e) {
+            Logger.defaultLogError(e);
+            this.close(); // Everything is lost anyway.
+            throw new IllegalStateException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+    Operation getLastOperation() {
+        return transactionToken.getLastOperation();
+    }
+    long getHeadRevisionId() {
+        return transactionToken.getHeadRevisionId();
+    }
+}