--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package fi.vtt.simantics.procore.internal;\r
+\r
+import java.util.concurrent.TimeUnit;\r
+import java.util.concurrent.locks.Condition;\r
+import java.util.concurrent.locks.Lock;\r
+import java.util.concurrent.locks.ReentrantLock;\r
+\r
+import org.simantics.db.ChangeSet;\r
+import org.simantics.db.Operation;\r
+import org.simantics.db.VirtualGraph;\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.exception.InternalException;\r
+import org.simantics.db.exception.RuntimeDatabaseException;\r
+import org.simantics.db.impl.graph.WriteGraphImpl;\r
+import org.simantics.db.impl.query.QueryProcessor;\r
+import org.simantics.db.request.WriteOnly;\r
+import org.simantics.db.request.WriteTraits;\r
+import org.simantics.db.service.LifecycleSupport.LifecycleState;\r
+import org.simantics.db.service.TransactionPolicySupport;\r
+\r
+import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;\r
+\r
+class State {\r
+ static final int Closed = 1;\r
+ static final int Closing = 2;\r
+ static final int WriteTransaction = 4;\r
+ static final int ReadTransaction = 8;\r
+ static final int All = 15;\r
+ private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.\r
+ private SessionImplSocket session = null;\r
+ private GraphSession graphSession = null;\r
+ private QueryProcessor queryProvider = null;\r
+ private Lock lock = new ReentrantLock();\r
+ private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.\r
+ private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.\r
+ private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.\r
+ private Thread writeOwner = null;\r
+ private int asyncCount = 1;\r
+ private TransactionToken transactionToken = null;\r
+ void setCombine(boolean a) {\r
+ if (null != transactionToken)\r
+ transactionToken.setCombine(a);\r
+ }\r
+ void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {\r
+ this.session = session;\r
+ this.graphSession = graphSession;\r
+ this.queryProvider = queryProvider;\r
+ resetTransactionPolicy();\r
+ }\r
+ void resetTransactionPolicy() {\r
+ TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);\r
+ this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);\r
+ }\r
+ int getAsyncCount() {\r
+ return asyncCount;\r
+ }\r
+ int getReadCount() {\r
+ return readCount;\r
+ }\r
+ int getWriteCount() {\r
+ return writeCount;\r
+ }\r
+ boolean isWriteTransaction() {\r
+ return 0 != (state & WriteTransaction);\r
+ }\r
+ boolean isAlive() {\r
+ return !isClosed() && !isClosing();\r
+ }\r
+ boolean isClosed() {\r
+ return 0 != (state & Closed);\r
+ }\r
+ boolean isClosing() {\r
+ return 0 != (state & Closing);\r
+ }\r
+ boolean isWriting() {\r
+ return 0 != (state & WriteTransaction);\r
+ }\r
+ void close() {\r
+ boolean acquired = false;\r
+ try {\r
+ acquired = lock.tryLock(1, TimeUnit.MINUTES);\r
+ } catch (InterruptedException e1) {\r
+ }\r
+ try {\r
+ if (null != transactionToken)\r
+ transactionToken.close();\r
+ graphSession.stop();\r
+ } catch (DatabaseException e) {\r
+ } finally {\r
+ state = Closed;\r
+ session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);\r
+ if (acquired) {\r
+ condition.signalAll();\r
+ lock.unlock();\r
+ }\r
+ }\r
+ }\r
+ void incAsync()\r
+ {\r
+ lock.lock();\r
+ try {\r
+ boolean closed = isClosed();\r
+ boolean closing = isClosing();\r
+ if (closed || closing)\r
+ throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);\r
+ ++asyncCount;\r
+ } finally {\r
+ lock.unlock();\r
+ }\r
+ }\r
+ void decAsync()\r
+ {\r
+ lock.lock();\r
+ try {\r
+ if (asyncCount < 1)\r
+ throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);\r
+ if (0 == --asyncCount) {\r
+ condition.signal();\r
+ }\r
+ } finally {\r
+ lock.unlock();\r
+ }\r
+ }\r
+ enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }\r
+ WaitStatus waitClosing(long timeout) {\r
+ lock.lock();\r
+ try {\r
+ if (isClosed())\r
+ return WaitStatus.IsClosed;\r
+ else if (isClosing())\r
+ return WaitStatus.IsClosing;\r
+ state |= Closing;\r
+ try {\r
+ if (timeout < 0) { // Infinite timeout\r
+ for (int i=0; i<100; i++)\r
+ while (asyncCount > 0 || readCount > 0 ||\r
+ (null != writeOwner && Thread.currentThread() != writeOwner)) {\r
+ condition.signalAll();\r
+ condition.awaitNanos(10000000); // nanos\r
+ }\r
+ while (asyncCount > 0 || readCount > 0 ||\r
+ (null != writeOwner && Thread.currentThread() != writeOwner)) {\r
+ condition.signalAll();\r
+ condition.await();\r
+ }\r
+ } else if (timeout > 0) {\r
+ boolean interrupted = false;\r
+ while (!interrupted &&\r
+ (asyncCount > 0 || readCount > 0 ||\r
+ (null != writeOwner && Thread.currentThread() != writeOwner)))\r
+ interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);\r
+ }\r
+ transactionToken.closing();\r
+ } catch (Exception e) {\r
+ e.printStackTrace();\r
+ }\r
+ state &= All ^ Closing;\r
+ if (readCount == 0 && writeCount == 0 && asyncCount == 0)\r
+ return WaitStatus.CanBeClosed;\r
+ else if (null != writeOwner && Thread.currentThread() == writeOwner)\r
+ return WaitStatus.Deadlock;\r
+ return WaitStatus.Timeout;\r
+ } finally {\r
+ lock.unlock();\r
+ }\r
+ }\r
+ void startReadTransaction(int thread) throws DatabaseException {\r
+ lock.lock();\r
+ try {\r
+ assert(readCount == 0);\r
+ transactionToken.startReadTransaction(thread);\r
+ state |= ReadTransaction;\r
+ session.fireStartReadTransaction();\r
+ ++readCount;\r
+ } catch (Throwable e) {\r
+ throw new DatabaseException("Failed to start read transaction.", e);\r
+ } finally {\r
+ lock.unlock();\r
+ }\r
+ }\r
+ void stopReadTransaction() throws DatabaseException {\r
+ lock.lock();\r
+ try {\r
+ assert (!queryProvider.hasScheduledUpdates());\r
+ assert (readCount == 1);\r
+ session.writeSupport.gc();\r
+ transactionToken.stopReadTransaction();\r
+ readCount = 0;\r
+ state &= All ^ ReadTransaction;\r
+ condition.signal();\r
+ } finally {\r
+ lock.unlock();\r
+ }\r
+ }\r
+\r
+ void startWriteTransaction(int thread)\r
+ throws DatabaseException {\r
+ lock.lock();\r
+ try {\r
+// System.out.println("stawt");\r
+ boolean closed = isClosed();\r
+ boolean closing = isClosing();\r
+ int ac = asyncCount;\r
+ int wc = writeCount;\r
+ if (closed || (closing && ac < 1) || wc < 0)\r
+ throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);\r
+ if (writeCount == 0) {\r
+ transactionToken.startWriteTransaction(thread);\r
+ state |= WriteTransaction;\r
+ writeOwner = Thread.currentThread();\r
+ session.fireStartWriteTransaction();\r
+ }\r
+ ++writeCount;\r
+ assert(writeCount == 1);\r
+ } catch (Throwable e) {\r
+ throw new DatabaseException(e);\r
+ } finally {\r
+ lock.unlock();\r
+ }\r
+ }\r
+\r
+ void writeTransactionEnded() {\r
+ session.defaultClusterSet = null;\r
+ session.fireFinishWriteTransaction();\r
+ }\r
+\r
+ void stopWriteTransaction(ClusterStream clusterStream) {\r
+ if(!isAlive()) return;\r
+ lock.lock();\r
+ try {\r
+// System.out.println("stowt");\r
+ if (writeCount < 1)\r
+ throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
+ else if (1 == writeCount) {\r
+ boolean empty = session.clusterStream.reallyFlush();\r
+ if (!empty) {\r
+ String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"\r
+ + "This is probably a serious error. Automatic cancel done as default recovery.";\r
+ Logger.defaultLogInfo(msg);\r
+ ClientChangesImpl cs = new ClientChangesImpl(session);\r
+ SynchronizeContext context = new SynchronizeContext(session, cs, 1);\r
+ transactionToken.cancelBegin(session.writeSupport, context, clusterStream);\r
+ }\r
+ transactionToken.stopWriteTransaction();\r
+ state &= All ^ WriteTransaction;\r
+ writeOwner = null;\r
+ condition.signal();\r
+ writeTransactionEnded();\r
+ }\r
+ --writeCount;\r
+ } catch (Throwable e) {\r
+ e.printStackTrace();\r
+ this.close(); // Everything is lost anyway.\r
+ throw new IllegalStateException(e);\r
+ } finally {\r
+ lock.unlock();\r
+ }\r
+ }\r
+\r
+ void cancelWriteTransaction(WriteGraphImpl graph) {\r
+ lock.lock();\r
+ try {\r
+ if (writeCount < 1)\r
+ throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
+ VirtualGraph vg = graph.getProvider();\r
+ if (null == vg) { // Skip if virtual graph.\r
+ boolean empty = session.clusterStream.reallyFlush();\r
+ if (!empty) { // Something to cancel.\r
+ // Send all local changes to server so it can calculate correct reverse change set.\r
+ ClientChangesImpl cs = new ClientChangesImpl(session);\r
+ SynchronizeContext context = new SynchronizeContext(session, cs, 1);\r
+ // This will send cancel and fetch reverse change set from server.\r
+ transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);\r
+ try {\r
+ final boolean undo = false;\r
+ if (!context.isOk(undo)) // this is a blocking operation\r
+ throw new InternalException("Cancel failed. This should never happen. Contact application support.");\r
+// System.out.println("session cs: " + session.clientChanges);\r
+// System.out.println("reverse cs: " + cs);\r
+ queryProvider.performDirtyUpdates(graph);\r
+ queryProvider.performScheduledUpdates(graph);\r
+ } catch (DatabaseException e) {\r
+ Logger.defaultLogError(e);\r
+ }\r
+ // This will send and accept the reverse change set.\r
+ transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);\r
+ }\r
+ }\r
+ session.writeSupport.clearMetadata();\r
+ if (--writeCount == 0) {\r
+ transactionToken.stopWriteTransaction();\r
+ state &= All ^ WriteTransaction;\r
+ writeOwner = null;\r
+ condition.signal();\r
+ writeTransactionEnded();\r
+ }\r
+ } catch (Throwable e) {\r
+ e.printStackTrace();\r
+ this.close(); // Everything is lost anyway.\r
+ throw new IllegalStateException(e);\r
+ } finally {\r
+ lock.unlock();\r
+ }\r
+ }\r
+\r
+ void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {\r
+ assert(request != null);\r
+ graph.commitAccessorChanges(request);\r
+ boolean writeOnly = request instanceof WriteOnly;\r
+ lock.lock();\r
+ try {\r
+ VirtualGraph vg = graph.getProvider();\r
+ if (writeCount == 1) {\r
+\r
+ if (vg != null && clusterStream.isDirty())\r
+ new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();\r
+\r
+// long start = System.nanoTime();\r
+ if (null == vg) {\r
+ clusterStream.reallyFlush();\r
+ // This was fired just before in handleUpdatesAndMetadata\r
+// if (!writeOnly)\r
+// session.fireMetadataListeners(graph, cs);\r
+ } else\r
+ clusterStream.clear();\r
+\r
+// long duration = System.nanoTime() - start;\r
+// System.out.println("reallyFlush " + 1e-9*duration + "s. ");\r
+\r
+ session.clientChanges = new ClientChangesImpl(session);\r
+\r
+\r
+// start = System.nanoTime();\r
+ queryProvider.performScheduledUpdates(graph);\r
+// duration = System.nanoTime() - start;\r
+// System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");\r
+\r
+ // Process updates as long as pending primitives exist\r
+ while (session.dirtyPrimitives) {\r
+ session.dirtyPrimitives = false;\r
+ queryProvider.performDirtyUpdates(graph);\r
+ queryProvider.performScheduledUpdates(graph);\r
+ }\r
+\r
+ if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!\r
+ session.fireReactionsToCommit(graph, cs); // Does not throw exception!\r
+\r
+ writeTransactionEnded();\r
+ session.writeSupport.gc();\r
+ } else // Only the last writer can commit.\r
+ throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
+// long start = System.nanoTime();\r
+// System.err.println("commit");\r
+\r
+// start = System.nanoTime();\r
+ if (null == vg)\r
+ transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);\r
+ else {\r
+ // There should not be meta data because this is transient graph.\r
+ // This is an insurance that this is the case for now.\r
+ int n = session.writeSupport.clearMetadata();\r
+ if (n > 0)\r
+ if (SessionImplSocket.DEBUG)\r
+ System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);\r
+ }\r
+ transactionToken.stopWriteTransaction();\r
+// long duration = System.nanoTime() - start;\r
+// System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");\r
+\r
+\r
+// long duration = System.nanoTime() - start;\r
+// System.err.println("commit2 " + 1e-9*duration);\r
+ state &= All ^ WriteTransaction;\r
+ writeCount = 0;\r
+ writeOwner = null;\r
+ condition.signal();\r
+ } catch (Throwable e) {\r
+ Logger.defaultLogError(e);\r
+ this.close(); // Everything is lost anyway.\r
+ throw new IllegalStateException(e);\r
+ } finally {\r
+ lock.unlock();\r
+ }\r
+ }\r
+\r
+ void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {\r
+ VirtualGraph vg = graph.getProvider();\r
+ if (null != vg)\r
+ return;\r
+ lock.lock();\r
+ try {\r
+ clusterStream.reallyFlush();\r
+ transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);\r
+ // See below for explanation, the same reasoning should apply here too.\r
+ if (graph.writeSupport instanceof WriteOnlySupport)\r
+ graph.writeSupport.flushCluster();\r
+ } catch (Throwable e) {\r
+ Logger.defaultLogError(e);\r
+ this.close(); // Everything is lost anyway.\r
+ throw new IllegalStateException(e);\r
+ } finally {\r
+ lock.unlock();\r
+ }\r
+ }\r
+ void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {\r
+ VirtualGraph vg = graph.getProvider();\r
+ if (null != vg)\r
+ return;\r
+ lock.lock();\r
+ try {\r
+ transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);\r
+ transactionToken.setCombine(true);\r
+ // This was added because test Issue3199Test2 failed because\r
+ // write only cluster does not need cluster when allocating resource index\r
+ // and the first claim after commitAndContinue2 caused cluster to be loaded\r
+ // from server and of course it's resource index was off by one.\r
+ if (graph.writeSupport instanceof WriteOnlySupport)\r
+ graph.writeSupport.flushCluster();\r
+ } catch (Throwable e) {\r
+ Logger.defaultLogError(e);\r
+ this.close(); // Everything is lost anyway.\r
+ throw new IllegalStateException(e);\r
+ } finally {\r
+ lock.unlock();\r
+ }\r
+ }\r
+ Operation getLastOperation() {\r
+ return transactionToken.getLastOperation();\r
+ }\r
+ long getHeadRevisionId() {\r
+ return transactionToken.getHeadRevisionId();\r
+ }\r
+}\r