1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.locks.Condition;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReentrantLock;
19 import org.simantics.db.ChangeSet;
20 import org.simantics.db.Disposable;
21 import org.simantics.db.Operation;
22 import org.simantics.db.VirtualGraph;
23 import org.simantics.db.common.utils.Logger;
24 import org.simantics.db.exception.DatabaseException;
25 import org.simantics.db.exception.InternalException;
26 import org.simantics.db.exception.RuntimeDatabaseException;
27 import org.simantics.db.impl.graph.ReadGraphImpl;
28 import org.simantics.db.impl.graph.WriteGraphImpl;
29 import org.simantics.db.impl.query.QueryProcessor;
30 import org.simantics.db.request.WriteOnly;
31 import org.simantics.db.request.WriteTraits;
32 import org.simantics.db.service.LifecycleSupport.LifecycleState;
33 import org.simantics.db.service.TransactionPolicySupport;
35 import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;
38 static final int Closed = 1;
39 static final int Closing = 2;
40 static final int WriteTransaction = 4;
41 static final int ReadTransaction = 8;
42 static final int All = 15;
43 private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.
44 private SessionImplSocket session = null;
45 private GraphSession graphSession = null;
46 private QueryProcessor queryProvider = null;
47 private Lock lock = new ReentrantLock();
48 private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.
49 private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.
50 private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.
51 private Thread writeOwner = null;
52 private volatile int asyncCount = 1; // Must be volatile so we don't have to synchronize getAsyncCount.
53 private TransactionToken transactionToken = null;
54 void setCombine(boolean a) {
55 if (null != transactionToken)
56 transactionToken.setCombine(a);
58 void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {
59 this.session = session;
60 this.graphSession = graphSession;
61 this.queryProvider = queryProvider;
62 resetTransactionPolicy();
64 void resetTransactionPolicy() {
65 TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);
66 this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);
77 boolean isWriteTransaction() {
78 return 0 != (state & WriteTransaction);
81 return !isClosed() && !isClosing();
84 return 0 != (state & Closed);
87 return 0 != (state & Closing);
90 return 0 != (state & WriteTransaction);
93 boolean acquired = false;
95 acquired = lock.tryLock(1, TimeUnit.MINUTES);
96 } catch (InterruptedException e1) {
99 if (null != transactionToken)
100 transactionToken.close();
102 } catch (DatabaseException e) {
105 session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);
107 condition.signalAll();
116 boolean closed = isClosed();
117 boolean closing = isClosing();
118 if (closed || closing)
119 throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);
130 throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);
131 if (0 == --asyncCount) {
138 enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }
139 WaitStatus waitClosing(long timeout) {
143 return WaitStatus.IsClosed;
144 else if (isClosing())
145 return WaitStatus.IsClosing;
148 if (timeout < 0) { // Infinite timeout
149 for (int i=0; i<100; i++)
150 while (asyncCount > 0 || readCount > 0 ||
151 (null != writeOwner && Thread.currentThread() != writeOwner)) {
152 condition.signalAll();
153 condition.awaitNanos(10000000); // nanos
155 while (asyncCount > 0 || readCount > 0 ||
156 (null != writeOwner && Thread.currentThread() != writeOwner)) {
157 condition.signalAll();
160 } else if (timeout > 0) {
161 boolean interrupted = false;
162 while (!interrupted &&
163 (asyncCount > 0 || readCount > 0 ||
164 (null != writeOwner && Thread.currentThread() != writeOwner)))
165 interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);
167 transactionToken.closing();
168 } catch (Exception e) {
171 state &= All ^ Closing;
172 if (readCount == 0 && writeCount == 0 && asyncCount == 0)
173 return WaitStatus.CanBeClosed;
174 else if (null != writeOwner && Thread.currentThread() == writeOwner)
175 return WaitStatus.Deadlock;
176 return WaitStatus.Timeout;
181 void startReadTransaction(int thread) throws DatabaseException {
184 assert(readCount == 0);
185 transactionToken.startReadTransaction(thread);
186 state |= ReadTransaction;
187 session.fireStartReadTransaction();
189 } catch (Throwable e) {
190 throw new DatabaseException("Failed to start read transaction.", e);
195 void stopReadTransaction() throws DatabaseException {
198 assert (!queryProvider.listening.hasScheduledUpdates());
199 assert (readCount == 1);
200 session.writeSupport.gc();
201 transactionToken.stopReadTransaction();
203 state &= All ^ ReadTransaction;
210 void startWriteTransaction(int thread)
211 throws DatabaseException {
214 // System.out.println("stawt");
215 boolean closed = isClosed();
216 boolean closing = isClosing();
219 if (closed || (closing && ac < 1) || wc < 0)
220 throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);
221 if (writeCount == 0) {
222 transactionToken.startWriteTransaction(thread);
223 state |= WriteTransaction;
224 writeOwner = Thread.currentThread();
225 session.fireStartWriteTransaction();
228 assert(writeCount == 1);
229 } catch (Throwable e) {
230 throw new DatabaseException(e);
236 void writeTransactionEnded() {
237 session.defaultClusterSet = null;
238 session.fireFinishWriteTransaction();
241 void stopWriteTransaction(ClusterStream clusterStream) {
242 if(!isAlive()) return;
245 // System.out.println("stowt");
247 throw new IllegalStateException(session + ": writeCount=" + writeCount);
248 else if (1 == writeCount) {
249 boolean empty = session.clusterStream.reallyFlush();
251 String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"
252 + "This is probably a serious error. Automatic cancel done as default recovery.";
253 Logger.defaultLogInfo(msg);
254 ClientChangesImpl cs = new ClientChangesImpl(session);
255 SynchronizeContext context = new SynchronizeContext(session, cs, 1);
256 transactionToken.cancelBegin(session.writeSupport, context, clusterStream);
258 transactionToken.stopWriteTransaction();
259 state &= All ^ WriteTransaction;
262 writeTransactionEnded();
265 } catch (Throwable e) {
267 this.close(); // Everything is lost anyway.
268 throw new IllegalStateException(e);
274 void cancelWriteTransaction(WriteGraphImpl graph) {
278 throw new IllegalStateException(session + ": writeCount=" + writeCount);
279 VirtualGraph vg = graph.getProvider();
280 if (null == vg) { // Skip if virtual graph.
281 boolean empty = session.clusterStream.reallyFlush();
282 if (!empty) { // Something to cancel.
283 // Send all local changes to server so it can calculate correct reverse change set.
284 ClientChangesImpl cs = new ClientChangesImpl(session);
285 SynchronizeContext context = new SynchronizeContext(session, cs, 1);
286 // This will send cancel and fetch reverse change set from server.
287 transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);
289 final boolean undo = false;
290 if (!context.isOk(undo)) // this is a blocking operation
291 throw new InternalException("Cancel failed. This should never happen.");
292 // System.out.println("session cs: " + session.clientChanges);
293 // System.out.println("reverse cs: " + cs);
294 queryProvider.propagateChangesInQueryCache(graph);
295 queryProvider.listening.fireListeners(graph);
296 } catch (DatabaseException e) {
297 Logger.defaultLogError(e);
299 // This will send and accept the reverse change set.
300 transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
302 queryProvider.propagateChangesInQueryCache(graph);
303 queryProvider.listening.fireListeners(graph);
306 session.writeSupport.clearMetadata();
307 if (--writeCount == 0) {
308 transactionToken.stopWriteTransaction();
309 state &= All ^ WriteTransaction;
312 writeTransactionEnded();
314 } catch (Throwable e) {
316 this.close(); // Everything is lost anyway.
317 throw new IllegalStateException(e);
323 void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {
324 assert(request != null);
325 graph.commitAccessorChanges(request);
326 boolean writeOnly = request instanceof WriteOnly;
329 VirtualGraph vg = graph.getProvider();
330 if (writeCount == 1) {
332 if (vg != null && clusterStream.isDirty())
333 new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();
335 // This is needed even when the write targets a virtual graph -
336 // deny can always remove a persistent statement.
337 clusterStream.reallyFlush();
339 session.clientChanges = new ClientChangesImpl(session);
342 // start = System.nanoTime();
343 queryProvider.propagateChangesInQueryCache(graph);
344 ReadGraphImpl listenerGraph = graph.forRecompute(null);
345 listenerGraph.asyncBarrier.inc();
346 queryProvider.listening.fireListeners(listenerGraph);
347 listenerGraph.asyncBarrier.dec();
348 // duration = System.nanoTime() - start;
349 // System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
351 // Process updates as long as pending primitives exist
352 while (session.dirtyPrimitives) {
353 session.dirtyPrimitives = false;
354 queryProvider.propagateChangesInQueryCache(graph);
355 queryProvider.listening.fireListeners(graph);
358 if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
359 session.fireReactionsToCommit(graph, cs); // Does not throw exception!
361 writeTransactionEnded();
362 session.writeSupport.gc();
363 } else // Only the last writer can commit.
364 throw new IllegalStateException(session + ": writeCount=" + writeCount);
365 // long start = System.nanoTime();
366 // System.err.println("commit");
368 // start = System.nanoTime();
370 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);
372 // There should not be meta data because this is transient graph.
373 // This is an insurance that this is the case for now.
374 int n = session.writeSupport.clearMetadata();
376 if (SessionImplSocket.DEBUG)
377 System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);
379 transactionToken.stopWriteTransaction();
380 // long duration = System.nanoTime() - start;
381 // System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");
384 // long duration = System.nanoTime() - start;
385 // System.err.println("commit2 " + 1e-9*duration);
386 state &= All ^ WriteTransaction;
390 } catch (Throwable e) {
391 Logger.defaultLogError(e);
392 this.close(); // Everything is lost anyway.
393 throw new IllegalStateException(e);
395 Disposable.safeDispose(cs);
400 void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
401 VirtualGraph vg = graph.getProvider();
406 clusterStream.reallyFlush();
407 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
408 // See below for explanation, the same reasoning should apply here too.
409 if (graph.writeSupport instanceof WriteOnlySupport)
410 graph.writeSupport.flushCluster();
411 } catch (Throwable e) {
412 Logger.defaultLogError(e);
413 this.close(); // Everything is lost anyway.
414 throw new IllegalStateException(e);
419 void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
420 VirtualGraph vg = graph.getProvider();
425 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
426 transactionToken.setCombine(true);
427 // This was added because test Issue3199Test2 failed because
428 // write only cluster does not need cluster when allocating resource index
429 // and the first claim after commitAndContinue2 caused cluster to be loaded
430 // from server and of course it's resource index was off by one.
431 if (graph.writeSupport instanceof WriteOnlySupport)
432 graph.writeSupport.flushCluster();
433 } catch (Throwable e) {
434 Logger.defaultLogError(e);
435 this.close(); // Everything is lost anyway.
436 throw new IllegalStateException(e);
441 Operation getLastOperation() {
442 return transactionToken.getLastOperation();
444 long getHeadRevisionId() {
445 return transactionToken.getHeadRevisionId();