1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.locks.Condition;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReentrantLock;
19 import org.simantics.db.ChangeSet;
20 import org.simantics.db.Disposable;
21 import org.simantics.db.Operation;
22 import org.simantics.db.VirtualGraph;
23 import org.simantics.db.common.utils.Logger;
24 import org.simantics.db.exception.DatabaseException;
25 import org.simantics.db.exception.InternalException;
26 import org.simantics.db.exception.RuntimeDatabaseException;
27 import org.simantics.db.impl.graph.WriteGraphImpl;
28 import org.simantics.db.impl.query.QueryProcessor;
29 import org.simantics.db.request.WriteOnly;
30 import org.simantics.db.request.WriteTraits;
31 import org.simantics.db.service.LifecycleSupport.LifecycleState;
32 import org.simantics.db.service.TransactionPolicySupport;
34 import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;
37 static final int Closed = 1;
38 static final int Closing = 2;
39 static final int WriteTransaction = 4;
40 static final int ReadTransaction = 8;
41 static final int All = 15;
42 private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.
43 private SessionImplSocket session = null;
44 private GraphSession graphSession = null;
45 private QueryProcessor queryProvider = null;
46 private Lock lock = new ReentrantLock();
47 private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.
48 private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.
49 private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.
50 private Thread writeOwner = null;
51 private volatile int asyncCount = 1; // Must be volatile so we don't have to synchronize getAsyncCount.
52 private TransactionToken transactionToken = null;
53 void setCombine(boolean a) {
54 if (null != transactionToken)
55 transactionToken.setCombine(a);
57 void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {
58 this.session = session;
59 this.graphSession = graphSession;
60 this.queryProvider = queryProvider;
61 resetTransactionPolicy();
63 void resetTransactionPolicy() {
64 TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);
65 this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);
76 boolean isWriteTransaction() {
77 return 0 != (state & WriteTransaction);
80 return !isClosed() && !isClosing();
83 return 0 != (state & Closed);
86 return 0 != (state & Closing);
89 return 0 != (state & WriteTransaction);
92 boolean acquired = false;
94 acquired = lock.tryLock(1, TimeUnit.MINUTES);
95 } catch (InterruptedException e1) {
98 if (null != transactionToken)
99 transactionToken.close();
101 } catch (DatabaseException e) {
104 session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);
106 condition.signalAll();
115 boolean closed = isClosed();
116 boolean closing = isClosing();
117 if (closed || closing)
118 throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);
129 throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);
130 if (0 == --asyncCount) {
137 enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }
138 WaitStatus waitClosing(long timeout) {
142 return WaitStatus.IsClosed;
143 else if (isClosing())
144 return WaitStatus.IsClosing;
147 if (timeout < 0) { // Infinite timeout
148 for (int i=0; i<100; i++)
149 while (asyncCount > 0 || readCount > 0 ||
150 (null != writeOwner && Thread.currentThread() != writeOwner)) {
151 condition.signalAll();
152 condition.awaitNanos(10000000); // nanos
154 while (asyncCount > 0 || readCount > 0 ||
155 (null != writeOwner && Thread.currentThread() != writeOwner)) {
156 condition.signalAll();
159 } else if (timeout > 0) {
160 boolean interrupted = false;
161 while (!interrupted &&
162 (asyncCount > 0 || readCount > 0 ||
163 (null != writeOwner && Thread.currentThread() != writeOwner)))
164 interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);
166 transactionToken.closing();
167 } catch (Exception e) {
170 state &= All ^ Closing;
171 if (readCount == 0 && writeCount == 0 && asyncCount == 0)
172 return WaitStatus.CanBeClosed;
173 else if (null != writeOwner && Thread.currentThread() == writeOwner)
174 return WaitStatus.Deadlock;
175 return WaitStatus.Timeout;
180 void startReadTransaction(int thread) throws DatabaseException {
183 assert(readCount == 0);
184 transactionToken.startReadTransaction(thread);
185 state |= ReadTransaction;
186 session.fireStartReadTransaction();
188 } catch (Throwable e) {
189 throw new DatabaseException("Failed to start read transaction.", e);
194 void stopReadTransaction() throws DatabaseException {
197 assert (!queryProvider.hasScheduledUpdates());
198 assert (readCount == 1);
199 session.writeSupport.gc();
200 transactionToken.stopReadTransaction();
202 state &= All ^ ReadTransaction;
209 void startWriteTransaction(int thread)
210 throws DatabaseException {
213 // System.out.println("stawt");
214 boolean closed = isClosed();
215 boolean closing = isClosing();
218 if (closed || (closing && ac < 1) || wc < 0)
219 throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);
220 if (writeCount == 0) {
221 transactionToken.startWriteTransaction(thread);
222 state |= WriteTransaction;
223 writeOwner = Thread.currentThread();
224 session.fireStartWriteTransaction();
227 assert(writeCount == 1);
228 } catch (Throwable e) {
229 throw new DatabaseException(e);
235 void writeTransactionEnded() {
236 session.defaultClusterSet = null;
237 session.fireFinishWriteTransaction();
240 void stopWriteTransaction(ClusterStream clusterStream) {
241 if(!isAlive()) return;
244 // System.out.println("stowt");
246 throw new IllegalStateException(session + ": writeCount=" + writeCount);
247 else if (1 == writeCount) {
248 boolean empty = session.clusterStream.reallyFlush();
250 String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"
251 + "This is probably a serious error. Automatic cancel done as default recovery.";
252 Logger.defaultLogInfo(msg);
253 ClientChangesImpl cs = new ClientChangesImpl(session);
254 SynchronizeContext context = new SynchronizeContext(session, cs, 1);
255 transactionToken.cancelBegin(session.writeSupport, context, clusterStream);
257 transactionToken.stopWriteTransaction();
258 state &= All ^ WriteTransaction;
261 writeTransactionEnded();
264 } catch (Throwable e) {
266 this.close(); // Everything is lost anyway.
267 throw new IllegalStateException(e);
273 void cancelWriteTransaction(WriteGraphImpl graph) {
277 throw new IllegalStateException(session + ": writeCount=" + writeCount);
278 VirtualGraph vg = graph.getProvider();
279 if (null == vg) { // Skip if virtual graph.
280 boolean empty = session.clusterStream.reallyFlush();
281 if (!empty) { // Something to cancel.
282 // Send all local changes to server so it can calculate correct reverse change set.
283 ClientChangesImpl cs = new ClientChangesImpl(session);
284 SynchronizeContext context = new SynchronizeContext(session, cs, 1);
285 // This will send cancel and fetch reverse change set from server.
286 transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);
288 final boolean undo = false;
289 if (!context.isOk(undo)) // this is a blocking operation
290 throw new InternalException("Cancel failed. This should never happen. Contact application support.");
291 // System.out.println("session cs: " + session.clientChanges);
292 // System.out.println("reverse cs: " + cs);
293 queryProvider.performDirtyUpdates(graph);
294 queryProvider.performScheduledUpdates(graph);
295 } catch (DatabaseException e) {
296 Logger.defaultLogError(e);
298 // This will send and accept the reverse change set.
299 transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
301 queryProvider.performDirtyUpdates(graph);
302 queryProvider.performScheduledUpdates(graph);
305 session.writeSupport.clearMetadata();
306 if (--writeCount == 0) {
307 transactionToken.stopWriteTransaction();
308 state &= All ^ WriteTransaction;
311 writeTransactionEnded();
313 } catch (Throwable e) {
315 this.close(); // Everything is lost anyway.
316 throw new IllegalStateException(e);
322 void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {
323 assert(request != null);
324 graph.commitAccessorChanges(request);
325 boolean writeOnly = request instanceof WriteOnly;
328 VirtualGraph vg = graph.getProvider();
329 if (writeCount == 1) {
331 if (vg != null && clusterStream.isDirty())
332 new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();
334 // This is needed even when the write targets a virtual graph -
335 // deny can always remove a persistent statement.
336 clusterStream.reallyFlush();
338 session.clientChanges = new ClientChangesImpl(session);
341 // start = System.nanoTime();
342 queryProvider.performScheduledUpdates(graph);
343 // duration = System.nanoTime() - start;
344 // System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
346 // Process updates as long as pending primitives exist
347 while (session.dirtyPrimitives) {
348 session.dirtyPrimitives = false;
349 queryProvider.performDirtyUpdates(graph);
350 queryProvider.performScheduledUpdates(graph);
353 if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
354 session.fireReactionsToCommit(graph, cs); // Does not throw exception!
356 writeTransactionEnded();
357 session.writeSupport.gc();
358 } else // Only the last writer can commit.
359 throw new IllegalStateException(session + ": writeCount=" + writeCount);
360 // long start = System.nanoTime();
361 // System.err.println("commit");
363 // start = System.nanoTime();
365 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);
367 // There should not be meta data because this is transient graph.
368 // This is an insurance that this is the case for now.
369 int n = session.writeSupport.clearMetadata();
371 if (SessionImplSocket.DEBUG)
372 System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);
374 transactionToken.stopWriteTransaction();
375 // long duration = System.nanoTime() - start;
376 // System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");
379 // long duration = System.nanoTime() - start;
380 // System.err.println("commit2 " + 1e-9*duration);
381 state &= All ^ WriteTransaction;
385 } catch (Throwable e) {
386 Logger.defaultLogError(e);
387 this.close(); // Everything is lost anyway.
388 throw new IllegalStateException(e);
390 Disposable.safeDispose(cs);
395 void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
396 VirtualGraph vg = graph.getProvider();
401 clusterStream.reallyFlush();
402 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
403 // See below for explanation, the same reasoning should apply here too.
404 if (graph.writeSupport instanceof WriteOnlySupport)
405 graph.writeSupport.flushCluster();
406 } catch (Throwable e) {
407 Logger.defaultLogError(e);
408 this.close(); // Everything is lost anyway.
409 throw new IllegalStateException(e);
414 void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
415 VirtualGraph vg = graph.getProvider();
420 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
421 transactionToken.setCombine(true);
422 // This was added because test Issue3199Test2 failed because
423 // write only cluster does not need cluster when allocating resource index
424 // and the first claim after commitAndContinue2 caused cluster to be loaded
425 // from server and of course it's resource index was off by one.
426 if (graph.writeSupport instanceof WriteOnlySupport)
427 graph.writeSupport.flushCluster();
428 } catch (Throwable e) {
429 Logger.defaultLogError(e);
430 this.close(); // Everything is lost anyway.
431 throw new IllegalStateException(e);
436 Operation getLastOperation() {
437 return transactionToken.getLastOperation();
439 long getHeadRevisionId() {
440 return transactionToken.getHeadRevisionId();