]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / State.java
diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java
new file mode 100644 (file)
index 0000000..8345c78
--- /dev/null
@@ -0,0 +1,445 @@
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ *     VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package fi.vtt.simantics.procore.internal;\r
+\r
+import java.util.concurrent.TimeUnit;\r
+import java.util.concurrent.locks.Condition;\r
+import java.util.concurrent.locks.Lock;\r
+import java.util.concurrent.locks.ReentrantLock;\r
+\r
+import org.simantics.db.ChangeSet;\r
+import org.simantics.db.Operation;\r
+import org.simantics.db.VirtualGraph;\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.exception.InternalException;\r
+import org.simantics.db.exception.RuntimeDatabaseException;\r
+import org.simantics.db.impl.graph.WriteGraphImpl;\r
+import org.simantics.db.impl.query.QueryProcessor;\r
+import org.simantics.db.request.WriteOnly;\r
+import org.simantics.db.request.WriteTraits;\r
+import org.simantics.db.service.LifecycleSupport.LifecycleState;\r
+import org.simantics.db.service.TransactionPolicySupport;\r
+\r
+import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;\r
+\r
+class State {\r
+    static final int          Closed           = 1;\r
+    static final int          Closing          = 2;\r
+    static final int          WriteTransaction = 4;\r
+    static final int          ReadTransaction  = 8;\r
+    static final int          All              = 15;\r
+    private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.\r
+    private SessionImplSocket session          = null;\r
+    private GraphSession      graphSession     = null;\r
+    private QueryProcessor    queryProvider    = null;\r
+    private Lock              lock             = new ReentrantLock();\r
+    private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.\r
+    private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.\r
+    private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.\r
+    private Thread            writeOwner       = null;\r
+    private int               asyncCount       = 1;\r
+    private TransactionToken  transactionToken = null;\r
+    void setCombine(boolean a) {\r
+        if (null != transactionToken)\r
+            transactionToken.setCombine(a);\r
+    }\r
+    void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {\r
+        this.session = session;\r
+        this.graphSession = graphSession;\r
+        this.queryProvider = queryProvider;\r
+        resetTransactionPolicy();\r
+    }\r
+    void resetTransactionPolicy() {\r
+        TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);\r
+        this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);\r
+    }\r
+    int getAsyncCount() {\r
+        return asyncCount;\r
+    }\r
+    int getReadCount() {\r
+        return readCount;\r
+    }\r
+    int getWriteCount() {\r
+        return writeCount;\r
+    }\r
+    boolean isWriteTransaction() {\r
+        return 0 != (state & WriteTransaction);\r
+    }\r
+    boolean isAlive() {\r
+        return !isClosed() && !isClosing();\r
+    }\r
+    boolean isClosed() {\r
+        return 0 != (state & Closed);\r
+    }\r
+    boolean isClosing() {\r
+        return 0 != (state & Closing);\r
+    }\r
+    boolean isWriting() {\r
+        return 0 != (state & WriteTransaction);\r
+    }\r
+    void close() {\r
+        boolean acquired = false;\r
+        try {\r
+            acquired = lock.tryLock(1, TimeUnit.MINUTES);\r
+        } catch (InterruptedException e1) {\r
+        }\r
+        try {\r
+            if (null != transactionToken)\r
+                transactionToken.close();\r
+            graphSession.stop();\r
+        } catch (DatabaseException e) {\r
+        } finally {\r
+            state = Closed;\r
+            session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);\r
+            if (acquired) {\r
+                condition.signalAll();\r
+                lock.unlock();\r
+            }\r
+        }\r
+    }\r
+    void incAsync()\r
+    {\r
+        lock.lock();\r
+        try {\r
+            boolean closed = isClosed();\r
+            boolean closing = isClosing();\r
+            if (closed || closing)\r
+                throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);\r
+            ++asyncCount;\r
+        } finally {\r
+            lock.unlock();\r
+        }\r
+    }\r
+    void decAsync()\r
+    {\r
+        lock.lock();\r
+        try {\r
+            if (asyncCount < 1)\r
+                throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);\r
+            if (0 == --asyncCount) {\r
+                condition.signal();\r
+            }\r
+        } finally {\r
+            lock.unlock();\r
+        }\r
+    }\r
+    enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }\r
+    WaitStatus waitClosing(long timeout) {\r
+        lock.lock();\r
+        try {\r
+            if (isClosed())\r
+                return WaitStatus.IsClosed;\r
+            else if (isClosing())\r
+                return WaitStatus.IsClosing;\r
+            state |= Closing;\r
+            try {\r
+                if (timeout < 0) { // Infinite timeout\r
+                    for (int i=0; i<100; i++)\r
+                    while (asyncCount > 0  || readCount > 0 ||\r
+                           (null != writeOwner && Thread.currentThread() != writeOwner)) {\r
+                        condition.signalAll();\r
+                        condition.awaitNanos(10000000); // nanos\r
+                    }\r
+                    while (asyncCount > 0  || readCount > 0 ||\r
+                            (null != writeOwner && Thread.currentThread() != writeOwner)) {\r
+                         condition.signalAll();\r
+                         condition.await();\r
+                     }\r
+                } else if (timeout > 0) {\r
+                    boolean interrupted = false;\r
+                    while (!interrupted &&\r
+                           (asyncCount > 0  || readCount > 0 ||\r
+                            (null != writeOwner && Thread.currentThread() != writeOwner)))\r
+                        interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);\r
+                }\r
+                transactionToken.closing();\r
+            } catch (Exception e) {\r
+                e.printStackTrace();\r
+            }\r
+            state &= All ^ Closing;\r
+            if (readCount == 0 && writeCount == 0 && asyncCount == 0)\r
+                return WaitStatus.CanBeClosed;\r
+            else if (null != writeOwner && Thread.currentThread() == writeOwner)\r
+                return WaitStatus.Deadlock;\r
+            return WaitStatus.Timeout;\r
+        } finally {\r
+            lock.unlock();\r
+        }\r
+    }\r
+    void startReadTransaction(int thread) throws DatabaseException {\r
+        lock.lock();\r
+        try {\r
+            assert(readCount == 0);\r
+            transactionToken.startReadTransaction(thread);\r
+            state |= ReadTransaction;\r
+            session.fireStartReadTransaction();\r
+            ++readCount;\r
+        } catch (Throwable e) {\r
+            throw new DatabaseException("Failed to start read transaction.", e);\r
+        } finally {\r
+            lock.unlock();\r
+        }\r
+    }\r
+    void stopReadTransaction() throws DatabaseException {\r
+        lock.lock();\r
+        try {\r
+            assert (!queryProvider.hasScheduledUpdates());\r
+            assert (readCount == 1);\r
+            session.writeSupport.gc();\r
+            transactionToken.stopReadTransaction();\r
+            readCount = 0;\r
+            state &= All ^ ReadTransaction;\r
+            condition.signal();\r
+        } finally {\r
+            lock.unlock();\r
+        }\r
+    }\r
+\r
+    void startWriteTransaction(int thread)\r
+    throws DatabaseException {\r
+        lock.lock();\r
+        try {\r
+//            System.out.println("stawt");\r
+            boolean closed = isClosed();\r
+            boolean closing = isClosing();\r
+            int ac = asyncCount;\r
+            int wc = writeCount;\r
+            if (closed || (closing && ac < 1) || wc < 0)\r
+                throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);\r
+            if (writeCount == 0) {\r
+                transactionToken.startWriteTransaction(thread);\r
+                state |= WriteTransaction;\r
+                writeOwner = Thread.currentThread();\r
+                session.fireStartWriteTransaction();\r
+            }\r
+            ++writeCount;\r
+            assert(writeCount == 1);\r
+        } catch (Throwable e) {\r
+            throw new DatabaseException(e);\r
+        } finally {\r
+            lock.unlock();\r
+        }\r
+    }\r
+\r
+    void writeTransactionEnded() {\r
+        session.defaultClusterSet = null;\r
+        session.fireFinishWriteTransaction();\r
+    }\r
+\r
+    void stopWriteTransaction(ClusterStream clusterStream) {\r
+        if(!isAlive()) return;\r
+        lock.lock();\r
+        try {\r
+//            System.out.println("stowt");\r
+            if (writeCount < 1)\r
+                throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
+            else if (1 == writeCount) {\r
+                boolean empty = session.clusterStream.reallyFlush();\r
+                if (!empty) {\r
+                    String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"\r
+                            + "This is probably a serious error. Automatic cancel done as default recovery.";\r
+                    Logger.defaultLogInfo(msg);\r
+                    ClientChangesImpl cs = new ClientChangesImpl(session);\r
+                    SynchronizeContext context = new SynchronizeContext(session, cs, 1);\r
+                    transactionToken.cancelBegin(session.writeSupport, context, clusterStream);\r
+                }\r
+                transactionToken.stopWriteTransaction();\r
+                state &= All ^ WriteTransaction;\r
+                writeOwner = null;\r
+                condition.signal();\r
+                writeTransactionEnded();\r
+            }\r
+            --writeCount;\r
+        } catch (Throwable e) {\r
+            e.printStackTrace();\r
+            this.close(); // Everything is lost anyway.\r
+            throw new IllegalStateException(e);\r
+        } finally {\r
+            lock.unlock();\r
+        }\r
+    }\r
+\r
+    void cancelWriteTransaction(WriteGraphImpl graph) {\r
+        lock.lock();\r
+        try {\r
+            if (writeCount < 1)\r
+                throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
+            VirtualGraph vg = graph.getProvider();\r
+            if (null == vg) { // Skip if virtual graph.\r
+                boolean empty = session.clusterStream.reallyFlush();\r
+                if (!empty) { // Something to cancel.\r
+                    // Send all local changes to server so it can calculate correct reverse change set.\r
+                    ClientChangesImpl cs = new ClientChangesImpl(session);\r
+                    SynchronizeContext context = new SynchronizeContext(session, cs, 1);\r
+                    // This will send cancel and fetch reverse change set from server.\r
+                    transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);\r
+                    try {\r
+                        final boolean undo = false;\r
+                        if (!context.isOk(undo)) // this is a blocking operation\r
+                            throw new InternalException("Cancel failed. This should never happen. Contact application support.");\r
+//                        System.out.println("session cs: " + session.clientChanges);\r
+//                        System.out.println("reverse cs: " + cs);\r
+                        queryProvider.performDirtyUpdates(graph);\r
+                        queryProvider.performScheduledUpdates(graph);\r
+                    } catch (DatabaseException e) {\r
+                        Logger.defaultLogError(e);\r
+                    }\r
+                    // This will send and accept the reverse change set.\r
+                    transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);\r
+                }\r
+            }\r
+            session.writeSupport.clearMetadata();\r
+            if (--writeCount == 0) {\r
+                transactionToken.stopWriteTransaction();\r
+                state &= All ^ WriteTransaction;\r
+                writeOwner = null;\r
+                condition.signal();\r
+                writeTransactionEnded();\r
+            }\r
+        } catch (Throwable e) {\r
+            e.printStackTrace();\r
+            this.close(); // Everything is lost anyway.\r
+            throw new IllegalStateException(e);\r
+        } finally {\r
+            lock.unlock();\r
+        }\r
+    }\r
+\r
+    void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {\r
+        assert(request != null);\r
+        graph.commitAccessorChanges(request);\r
+        boolean writeOnly = request instanceof WriteOnly;\r
+        lock.lock();\r
+        try {\r
+            VirtualGraph vg = graph.getProvider();\r
+            if (writeCount == 1) {\r
+\r
+                if (vg != null && clusterStream.isDirty())\r
+                    new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();\r
+\r
+//                long start = System.nanoTime();\r
+                if (null == vg) {\r
+                    clusterStream.reallyFlush();\r
+                    // This was fired just before in handleUpdatesAndMetadata\r
+//                    if (!writeOnly)\r
+//                        session.fireMetadataListeners(graph, cs);\r
+                } else\r
+                    clusterStream.clear();\r
+\r
+//                long duration = System.nanoTime() - start;\r
+//                System.out.println("reallyFlush " + 1e-9*duration + "s. ");\r
+\r
+                session.clientChanges = new ClientChangesImpl(session);\r
+\r
+\r
+//                start = System.nanoTime();\r
+                queryProvider.performScheduledUpdates(graph);\r
+//                duration = System.nanoTime() - start;\r
+//                System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");\r
+\r
+                // Process updates as long as pending primitives exist\r
+                while (session.dirtyPrimitives) {\r
+                    session.dirtyPrimitives = false;\r
+                    queryProvider.performDirtyUpdates(graph);\r
+                    queryProvider.performScheduledUpdates(graph);\r
+                }\r
+\r
+                if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!\r
+                    session.fireReactionsToCommit(graph, cs); // Does not throw exception!\r
+\r
+                writeTransactionEnded();\r
+                session.writeSupport.gc();\r
+            } else // Only the last writer can commit.\r
+                throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
+//            long start = System.nanoTime();\r
+//            System.err.println("commit");\r
+\r
+//            start = System.nanoTime();\r
+            if (null == vg)\r
+                transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);\r
+            else {\r
+                // There should not be meta data because this is transient graph.\r
+                // This is an insurance that this is the case for now.\r
+                int n = session.writeSupport.clearMetadata();\r
+                if (n > 0)\r
+                    if (SessionImplSocket.DEBUG)\r
+                        System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);\r
+            }\r
+            transactionToken.stopWriteTransaction();\r
+//            long duration = System.nanoTime() - start;\r
+//            System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");\r
+\r
+\r
+//            long duration = System.nanoTime() - start;\r
+//            System.err.println("commit2 " + 1e-9*duration);\r
+            state &= All ^ WriteTransaction;\r
+            writeCount = 0;\r
+            writeOwner = null;\r
+            condition.signal();\r
+        } catch (Throwable e) {\r
+            Logger.defaultLogError(e);\r
+            this.close(); // Everything is lost anyway.\r
+            throw new IllegalStateException(e);\r
+        } finally {\r
+            lock.unlock();\r
+        }\r
+    }\r
+\r
+    void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {\r
+        VirtualGraph vg = graph.getProvider();\r
+        if (null != vg)\r
+            return;\r
+        lock.lock();\r
+        try {\r
+            clusterStream.reallyFlush();\r
+            transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);\r
+            // See below for explanation, the same reasoning should apply here too.\r
+            if (graph.writeSupport instanceof WriteOnlySupport)\r
+                graph.writeSupport.flushCluster();\r
+        } catch (Throwable e) {\r
+            Logger.defaultLogError(e);\r
+            this.close(); // Everything is lost anyway.\r
+            throw new IllegalStateException(e);\r
+        } finally {\r
+            lock.unlock();\r
+        }\r
+    }\r
+    void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {\r
+        VirtualGraph vg = graph.getProvider();\r
+        if (null != vg)\r
+            return;\r
+        lock.lock();\r
+        try {\r
+            transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);\r
+            transactionToken.setCombine(true);\r
+            // This was added because test Issue3199Test2 failed because\r
+            // write only cluster does not need cluster when allocating resource index\r
+            // and the first claim after commitAndContinue2 caused cluster to be loaded\r
+            // from server and of course it's resource index was off by one.\r
+            if (graph.writeSupport instanceof WriteOnlySupport)\r
+                graph.writeSupport.flushCluster();\r
+        } catch (Throwable e) {\r
+            Logger.defaultLogError(e);\r
+            this.close(); // Everything is lost anyway.\r
+            throw new IllegalStateException(e);\r
+        } finally {\r
+            lock.unlock();\r
+        }\r
+    }\r
+    Operation getLastOperation() {\r
+        return transactionToken.getLastOperation();\r
+    }\r
+    long getHeadRevisionId() {\r
+        return transactionToken.getHeadRevisionId();\r
+    }\r
+}\r