1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.locks.Condition;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReentrantLock;
19 import org.simantics.db.ChangeSet;
20 import org.simantics.db.Operation;
21 import org.simantics.db.VirtualGraph;
22 import org.simantics.db.common.utils.Logger;
23 import org.simantics.db.exception.DatabaseException;
24 import org.simantics.db.exception.InternalException;
25 import org.simantics.db.exception.RuntimeDatabaseException;
26 import org.simantics.db.impl.graph.WriteGraphImpl;
27 import org.simantics.db.impl.query.QueryProcessor;
28 import org.simantics.db.request.WriteOnly;
29 import org.simantics.db.request.WriteTraits;
30 import org.simantics.db.service.LifecycleSupport.LifecycleState;
31 import org.simantics.db.service.TransactionPolicySupport;
33 import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;
36 static final int Closed = 1;
37 static final int Closing = 2;
38 static final int WriteTransaction = 4;
39 static final int ReadTransaction = 8;
40 static final int All = 15;
41 private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.
42 private SessionImplSocket session = null;
43 private GraphSession graphSession = null;
44 private QueryProcessor queryProvider = null;
45 private Lock lock = new ReentrantLock();
46 private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.
47 private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.
48 private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.
49 private Thread writeOwner = null;
50 private volatile int asyncCount = 1; // Must be volatile so we don't have to synchronize getAsyncCount.
51 private TransactionToken transactionToken = null;
52 void setCombine(boolean a) {
53 if (null != transactionToken)
54 transactionToken.setCombine(a);
56 void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {
57 this.session = session;
58 this.graphSession = graphSession;
59 this.queryProvider = queryProvider;
60 resetTransactionPolicy();
62 void resetTransactionPolicy() {
63 TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);
64 this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);
75 boolean isWriteTransaction() {
76 return 0 != (state & WriteTransaction);
79 return !isClosed() && !isClosing();
82 return 0 != (state & Closed);
85 return 0 != (state & Closing);
88 return 0 != (state & WriteTransaction);
91 boolean acquired = false;
93 acquired = lock.tryLock(1, TimeUnit.MINUTES);
94 } catch (InterruptedException e1) {
97 if (null != transactionToken)
98 transactionToken.close();
100 } catch (DatabaseException e) {
103 session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);
105 condition.signalAll();
114 boolean closed = isClosed();
115 boolean closing = isClosing();
116 if (closed || closing)
117 throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);
128 throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);
129 if (0 == --asyncCount) {
136 enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }
137 WaitStatus waitClosing(long timeout) {
141 return WaitStatus.IsClosed;
142 else if (isClosing())
143 return WaitStatus.IsClosing;
146 if (timeout < 0) { // Infinite timeout
147 for (int i=0; i<100; i++)
148 while (asyncCount > 0 || readCount > 0 ||
149 (null != writeOwner && Thread.currentThread() != writeOwner)) {
150 condition.signalAll();
151 condition.awaitNanos(10000000); // nanos
153 while (asyncCount > 0 || readCount > 0 ||
154 (null != writeOwner && Thread.currentThread() != writeOwner)) {
155 condition.signalAll();
158 } else if (timeout > 0) {
159 boolean interrupted = false;
160 while (!interrupted &&
161 (asyncCount > 0 || readCount > 0 ||
162 (null != writeOwner && Thread.currentThread() != writeOwner)))
163 interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);
165 transactionToken.closing();
166 } catch (Exception e) {
169 state &= All ^ Closing;
170 if (readCount == 0 && writeCount == 0 && asyncCount == 0)
171 return WaitStatus.CanBeClosed;
172 else if (null != writeOwner && Thread.currentThread() == writeOwner)
173 return WaitStatus.Deadlock;
174 return WaitStatus.Timeout;
179 void startReadTransaction(int thread) throws DatabaseException {
182 assert(readCount == 0);
183 transactionToken.startReadTransaction(thread);
184 state |= ReadTransaction;
185 session.fireStartReadTransaction();
187 } catch (Throwable e) {
188 throw new DatabaseException("Failed to start read transaction.", e);
193 void stopReadTransaction() throws DatabaseException {
196 assert (!queryProvider.hasScheduledUpdates());
197 assert (readCount == 1);
198 session.writeSupport.gc();
199 transactionToken.stopReadTransaction();
201 state &= All ^ ReadTransaction;
208 void startWriteTransaction(int thread)
209 throws DatabaseException {
212 // System.out.println("stawt");
213 boolean closed = isClosed();
214 boolean closing = isClosing();
217 if (closed || (closing && ac < 1) || wc < 0)
218 throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);
219 if (writeCount == 0) {
220 transactionToken.startWriteTransaction(thread);
221 state |= WriteTransaction;
222 writeOwner = Thread.currentThread();
223 session.fireStartWriteTransaction();
226 assert(writeCount == 1);
227 } catch (Throwable e) {
228 throw new DatabaseException(e);
234 void writeTransactionEnded() {
235 session.defaultClusterSet = null;
236 session.fireFinishWriteTransaction();
239 void stopWriteTransaction(ClusterStream clusterStream) {
240 if(!isAlive()) return;
243 // System.out.println("stowt");
245 throw new IllegalStateException(session + ": writeCount=" + writeCount);
246 else if (1 == writeCount) {
247 boolean empty = session.clusterStream.reallyFlush();
249 String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"
250 + "This is probably a serious error. Automatic cancel done as default recovery.";
251 Logger.defaultLogInfo(msg);
252 ClientChangesImpl cs = new ClientChangesImpl(session);
253 SynchronizeContext context = new SynchronizeContext(session, cs, 1);
254 transactionToken.cancelBegin(session.writeSupport, context, clusterStream);
256 transactionToken.stopWriteTransaction();
257 state &= All ^ WriteTransaction;
260 writeTransactionEnded();
263 } catch (Throwable e) {
265 this.close(); // Everything is lost anyway.
266 throw new IllegalStateException(e);
272 void cancelWriteTransaction(WriteGraphImpl graph) {
276 throw new IllegalStateException(session + ": writeCount=" + writeCount);
277 VirtualGraph vg = graph.getProvider();
278 if (null == vg) { // Skip if virtual graph.
279 boolean empty = session.clusterStream.reallyFlush();
280 if (!empty) { // Something to cancel.
281 // Send all local changes to server so it can calculate correct reverse change set.
282 ClientChangesImpl cs = new ClientChangesImpl(session);
283 SynchronizeContext context = new SynchronizeContext(session, cs, 1);
284 // This will send cancel and fetch reverse change set from server.
285 transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);
287 final boolean undo = false;
288 if (!context.isOk(undo)) // this is a blocking operation
289 throw new InternalException("Cancel failed. This should never happen. Contact application support.");
290 // System.out.println("session cs: " + session.clientChanges);
291 // System.out.println("reverse cs: " + cs);
292 queryProvider.performDirtyUpdates(graph);
293 queryProvider.performScheduledUpdates(graph);
294 } catch (DatabaseException e) {
295 Logger.defaultLogError(e);
297 // This will send and accept the reverse change set.
298 transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
301 session.writeSupport.clearMetadata();
302 if (--writeCount == 0) {
303 transactionToken.stopWriteTransaction();
304 state &= All ^ WriteTransaction;
307 writeTransactionEnded();
309 } catch (Throwable e) {
311 this.close(); // Everything is lost anyway.
312 throw new IllegalStateException(e);
318 void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {
319 assert(request != null);
320 graph.commitAccessorChanges(request);
321 boolean writeOnly = request instanceof WriteOnly;
324 VirtualGraph vg = graph.getProvider();
325 if (writeCount == 1) {
327 if (vg != null && clusterStream.isDirty())
328 new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();
330 // long start = System.nanoTime();
332 clusterStream.reallyFlush();
333 // This was fired just before in handleUpdatesAndMetadata
335 // session.fireMetadataListeners(graph, cs);
337 clusterStream.clear();
339 // long duration = System.nanoTime() - start;
340 // System.out.println("reallyFlush " + 1e-9*duration + "s. ");
342 session.clientChanges = new ClientChangesImpl(session);
345 // start = System.nanoTime();
346 queryProvider.performScheduledUpdates(graph);
347 // duration = System.nanoTime() - start;
348 // System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
350 // Process updates as long as pending primitives exist
351 while (session.dirtyPrimitives) {
352 session.dirtyPrimitives = false;
353 queryProvider.performDirtyUpdates(graph);
354 queryProvider.performScheduledUpdates(graph);
357 if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
358 session.fireReactionsToCommit(graph, cs); // Does not throw exception!
360 writeTransactionEnded();
361 session.writeSupport.gc();
362 } else // Only the last writer can commit.
363 throw new IllegalStateException(session + ": writeCount=" + writeCount);
364 // long start = System.nanoTime();
365 // System.err.println("commit");
367 // start = System.nanoTime();
369 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);
371 // There should not be meta data because this is transient graph.
372 // This is an insurance that this is the case for now.
373 int n = session.writeSupport.clearMetadata();
375 if (SessionImplSocket.DEBUG)
376 System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);
378 transactionToken.stopWriteTransaction();
379 // long duration = System.nanoTime() - start;
380 // System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");
383 // long duration = System.nanoTime() - start;
384 // System.err.println("commit2 " + 1e-9*duration);
385 state &= All ^ WriteTransaction;
389 } catch (Throwable e) {
390 Logger.defaultLogError(e);
391 this.close(); // Everything is lost anyway.
392 throw new IllegalStateException(e);
398 void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
399 VirtualGraph vg = graph.getProvider();
404 clusterStream.reallyFlush();
405 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
406 // See below for explanation, the same reasoning should apply here too.
407 if (graph.writeSupport instanceof WriteOnlySupport)
408 graph.writeSupport.flushCluster();
409 } catch (Throwable e) {
410 Logger.defaultLogError(e);
411 this.close(); // Everything is lost anyway.
412 throw new IllegalStateException(e);
417 void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
418 VirtualGraph vg = graph.getProvider();
423 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
424 transactionToken.setCombine(true);
425 // This was added because test Issue3199Test2 failed because
426 // write only cluster does not need cluster when allocating resource index
427 // and the first claim after commitAndContinue2 caused cluster to be loaded
428 // from server and of course it's resource index was off by one.
429 if (graph.writeSupport instanceof WriteOnlySupport)
430 graph.writeSupport.flushCluster();
431 } catch (Throwable e) {
432 Logger.defaultLogError(e);
433 this.close(); // Everything is lost anyway.
434 throw new IllegalStateException(e);
439 Operation getLastOperation() {
440 return transactionToken.getLastOperation();
442 long getHeadRevisionId() {
443 return transactionToken.getHeadRevisionId();