1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package fi.vtt.simantics.procore.internal;
\r
14 import java.util.concurrent.TimeUnit;
\r
15 import java.util.concurrent.locks.Condition;
\r
16 import java.util.concurrent.locks.Lock;
\r
17 import java.util.concurrent.locks.ReentrantLock;
\r
19 import org.simantics.db.ChangeSet;
\r
20 import org.simantics.db.Operation;
\r
21 import org.simantics.db.VirtualGraph;
\r
22 import org.simantics.db.common.utils.Logger;
\r
23 import org.simantics.db.exception.DatabaseException;
\r
24 import org.simantics.db.exception.InternalException;
\r
25 import org.simantics.db.exception.RuntimeDatabaseException;
\r
26 import org.simantics.db.impl.graph.WriteGraphImpl;
\r
27 import org.simantics.db.impl.query.QueryProcessor;
\r
28 import org.simantics.db.request.WriteOnly;
\r
29 import org.simantics.db.request.WriteTraits;
\r
30 import org.simantics.db.service.LifecycleSupport.LifecycleState;
\r
31 import org.simantics.db.service.TransactionPolicySupport;
\r
33 import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;
\r
36 static final int Closed = 1;
\r
37 static final int Closing = 2;
\r
38 static final int WriteTransaction = 4;
\r
39 static final int ReadTransaction = 8;
\r
40 static final int All = 15;
\r
41 private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.
\r
42 private SessionImplSocket session = null;
\r
43 private GraphSession graphSession = null;
\r
44 private QueryProcessor queryProvider = null;
\r
45 private Lock lock = new ReentrantLock();
\r
46 private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.
\r
47 private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.
\r
48 private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.
\r
49 private Thread writeOwner = null;
\r
50 private int asyncCount = 1;
\r
51 private TransactionToken transactionToken = null;
\r
52 void setCombine(boolean a) {
\r
53 if (null != transactionToken)
\r
54 transactionToken.setCombine(a);
\r
56 void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {
\r
57 this.session = session;
\r
58 this.graphSession = graphSession;
\r
59 this.queryProvider = queryProvider;
\r
60 resetTransactionPolicy();
\r
62 void resetTransactionPolicy() {
\r
63 TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);
\r
64 this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);
\r
66 int getAsyncCount() {
\r
69 int getReadCount() {
\r
72 int getWriteCount() {
\r
75 boolean isWriteTransaction() {
\r
76 return 0 != (state & WriteTransaction);
\r
79 return !isClosed() && !isClosing();
\r
81 boolean isClosed() {
\r
82 return 0 != (state & Closed);
\r
84 boolean isClosing() {
\r
85 return 0 != (state & Closing);
\r
87 boolean isWriting() {
\r
88 return 0 != (state & WriteTransaction);
\r
91 boolean acquired = false;
\r
93 acquired = lock.tryLock(1, TimeUnit.MINUTES);
\r
94 } catch (InterruptedException e1) {
\r
97 if (null != transactionToken)
\r
98 transactionToken.close();
\r
99 graphSession.stop();
\r
100 } catch (DatabaseException e) {
\r
103 session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);
\r
105 condition.signalAll();
\r
114 boolean closed = isClosed();
\r
115 boolean closing = isClosing();
\r
116 if (closed || closing)
\r
117 throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);
\r
127 if (asyncCount < 1)
\r
128 throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);
\r
129 if (0 == --asyncCount) {
\r
130 condition.signal();
\r
136 enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }
\r
137 WaitStatus waitClosing(long timeout) {
\r
141 return WaitStatus.IsClosed;
\r
142 else if (isClosing())
\r
143 return WaitStatus.IsClosing;
\r
146 if (timeout < 0) { // Infinite timeout
\r
147 for (int i=0; i<100; i++)
\r
148 while (asyncCount > 0 || readCount > 0 ||
\r
149 (null != writeOwner && Thread.currentThread() != writeOwner)) {
\r
150 condition.signalAll();
\r
151 condition.awaitNanos(10000000); // nanos
\r
153 while (asyncCount > 0 || readCount > 0 ||
\r
154 (null != writeOwner && Thread.currentThread() != writeOwner)) {
\r
155 condition.signalAll();
\r
158 } else if (timeout > 0) {
\r
159 boolean interrupted = false;
\r
160 while (!interrupted &&
\r
161 (asyncCount > 0 || readCount > 0 ||
\r
162 (null != writeOwner && Thread.currentThread() != writeOwner)))
\r
163 interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);
\r
165 transactionToken.closing();
\r
166 } catch (Exception e) {
\r
167 e.printStackTrace();
\r
169 state &= All ^ Closing;
\r
170 if (readCount == 0 && writeCount == 0 && asyncCount == 0)
\r
171 return WaitStatus.CanBeClosed;
\r
172 else if (null != writeOwner && Thread.currentThread() == writeOwner)
\r
173 return WaitStatus.Deadlock;
\r
174 return WaitStatus.Timeout;
\r
179 void startReadTransaction(int thread) throws DatabaseException {
\r
182 assert(readCount == 0);
\r
183 transactionToken.startReadTransaction(thread);
\r
184 state |= ReadTransaction;
\r
185 session.fireStartReadTransaction();
\r
187 } catch (Throwable e) {
\r
188 throw new DatabaseException("Failed to start read transaction.", e);
\r
193 void stopReadTransaction() throws DatabaseException {
\r
196 assert (!queryProvider.hasScheduledUpdates());
\r
197 assert (readCount == 1);
\r
198 session.writeSupport.gc();
\r
199 transactionToken.stopReadTransaction();
\r
201 state &= All ^ ReadTransaction;
\r
202 condition.signal();
\r
208 void startWriteTransaction(int thread)
\r
209 throws DatabaseException {
\r
212 // System.out.println("stawt");
\r
213 boolean closed = isClosed();
\r
214 boolean closing = isClosing();
\r
215 int ac = asyncCount;
\r
216 int wc = writeCount;
\r
217 if (closed || (closing && ac < 1) || wc < 0)
\r
218 throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);
\r
219 if (writeCount == 0) {
\r
220 transactionToken.startWriteTransaction(thread);
\r
221 state |= WriteTransaction;
\r
222 writeOwner = Thread.currentThread();
\r
223 session.fireStartWriteTransaction();
\r
226 assert(writeCount == 1);
\r
227 } catch (Throwable e) {
\r
228 throw new DatabaseException(e);
\r
234 void writeTransactionEnded() {
\r
235 session.defaultClusterSet = null;
\r
236 session.fireFinishWriteTransaction();
\r
239 void stopWriteTransaction(ClusterStream clusterStream) {
\r
240 if(!isAlive()) return;
\r
243 // System.out.println("stowt");
\r
244 if (writeCount < 1)
\r
245 throw new IllegalStateException(session + ": writeCount=" + writeCount);
\r
246 else if (1 == writeCount) {
\r
247 boolean empty = session.clusterStream.reallyFlush();
\r
249 String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"
\r
250 + "This is probably a serious error. Automatic cancel done as default recovery.";
\r
251 Logger.defaultLogInfo(msg);
\r
252 ClientChangesImpl cs = new ClientChangesImpl(session);
\r
253 SynchronizeContext context = new SynchronizeContext(session, cs, 1);
\r
254 transactionToken.cancelBegin(session.writeSupport, context, clusterStream);
\r
256 transactionToken.stopWriteTransaction();
\r
257 state &= All ^ WriteTransaction;
\r
259 condition.signal();
\r
260 writeTransactionEnded();
\r
263 } catch (Throwable e) {
\r
264 e.printStackTrace();
\r
265 this.close(); // Everything is lost anyway.
\r
266 throw new IllegalStateException(e);
\r
272 void cancelWriteTransaction(WriteGraphImpl graph) {
\r
275 if (writeCount < 1)
\r
276 throw new IllegalStateException(session + ": writeCount=" + writeCount);
\r
277 VirtualGraph vg = graph.getProvider();
\r
278 if (null == vg) { // Skip if virtual graph.
\r
279 boolean empty = session.clusterStream.reallyFlush();
\r
280 if (!empty) { // Something to cancel.
\r
281 // Send all local changes to server so it can calculate correct reverse change set.
\r
282 ClientChangesImpl cs = new ClientChangesImpl(session);
\r
283 SynchronizeContext context = new SynchronizeContext(session, cs, 1);
\r
284 // This will send cancel and fetch reverse change set from server.
\r
285 transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);
\r
287 final boolean undo = false;
\r
288 if (!context.isOk(undo)) // this is a blocking operation
\r
289 throw new InternalException("Cancel failed. This should never happen. Contact application support.");
\r
290 // System.out.println("session cs: " + session.clientChanges);
\r
291 // System.out.println("reverse cs: " + cs);
\r
292 queryProvider.performDirtyUpdates(graph);
\r
293 queryProvider.performScheduledUpdates(graph);
\r
294 } catch (DatabaseException e) {
\r
295 Logger.defaultLogError(e);
\r
297 // This will send and accept the reverse change set.
\r
298 transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
\r
301 session.writeSupport.clearMetadata();
\r
302 if (--writeCount == 0) {
\r
303 transactionToken.stopWriteTransaction();
\r
304 state &= All ^ WriteTransaction;
\r
306 condition.signal();
\r
307 writeTransactionEnded();
\r
309 } catch (Throwable e) {
\r
310 e.printStackTrace();
\r
311 this.close(); // Everything is lost anyway.
\r
312 throw new IllegalStateException(e);
\r
318 void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {
\r
319 assert(request != null);
\r
320 graph.commitAccessorChanges(request);
\r
321 boolean writeOnly = request instanceof WriteOnly;
\r
324 VirtualGraph vg = graph.getProvider();
\r
325 if (writeCount == 1) {
\r
327 if (vg != null && clusterStream.isDirty())
\r
328 new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();
\r
330 // long start = System.nanoTime();
\r
332 clusterStream.reallyFlush();
\r
333 // This was fired just before in handleUpdatesAndMetadata
\r
335 // session.fireMetadataListeners(graph, cs);
\r
337 clusterStream.clear();
\r
339 // long duration = System.nanoTime() - start;
\r
340 // System.out.println("reallyFlush " + 1e-9*duration + "s. ");
\r
342 session.clientChanges = new ClientChangesImpl(session);
\r
345 // start = System.nanoTime();
\r
346 queryProvider.performScheduledUpdates(graph);
\r
347 // duration = System.nanoTime() - start;
\r
348 // System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
\r
350 // Process updates as long as pending primitives exist
\r
351 while (session.dirtyPrimitives) {
\r
352 session.dirtyPrimitives = false;
\r
353 queryProvider.performDirtyUpdates(graph);
\r
354 queryProvider.performScheduledUpdates(graph);
\r
357 if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
\r
358 session.fireReactionsToCommit(graph, cs); // Does not throw exception!
\r
360 writeTransactionEnded();
\r
361 session.writeSupport.gc();
\r
362 } else // Only the last writer can commit.
\r
363 throw new IllegalStateException(session + ": writeCount=" + writeCount);
\r
364 // long start = System.nanoTime();
\r
365 // System.err.println("commit");
\r
367 // start = System.nanoTime();
\r
369 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);
\r
371 // There should not be meta data because this is transient graph.
\r
372 // This is an insurance that this is the case for now.
\r
373 int n = session.writeSupport.clearMetadata();
\r
375 if (SessionImplSocket.DEBUG)
\r
376 System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);
\r
378 transactionToken.stopWriteTransaction();
\r
379 // long duration = System.nanoTime() - start;
\r
380 // System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");
\r
383 // long duration = System.nanoTime() - start;
\r
384 // System.err.println("commit2 " + 1e-9*duration);
\r
385 state &= All ^ WriteTransaction;
\r
388 condition.signal();
\r
389 } catch (Throwable e) {
\r
390 Logger.defaultLogError(e);
\r
391 this.close(); // Everything is lost anyway.
\r
392 throw new IllegalStateException(e);
\r
398 void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
\r
399 VirtualGraph vg = graph.getProvider();
\r
404 clusterStream.reallyFlush();
\r
405 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
\r
406 // See below for explanation, the same reasoning should apply here too.
\r
407 if (graph.writeSupport instanceof WriteOnlySupport)
\r
408 graph.writeSupport.flushCluster();
\r
409 } catch (Throwable e) {
\r
410 Logger.defaultLogError(e);
\r
411 this.close(); // Everything is lost anyway.
\r
412 throw new IllegalStateException(e);
\r
417 void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
\r
418 VirtualGraph vg = graph.getProvider();
\r
423 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
\r
424 transactionToken.setCombine(true);
\r
425 // This was added because test Issue3199Test2 failed because
\r
426 // write only cluster does not need cluster when allocating resource index
\r
427 // and the first claim after commitAndContinue2 caused cluster to be loaded
\r
428 // from server and of course it's resource index was off by one.
\r
429 if (graph.writeSupport instanceof WriteOnlySupport)
\r
430 graph.writeSupport.flushCluster();
\r
431 } catch (Throwable e) {
\r
432 Logger.defaultLogError(e);
\r
433 this.close(); // Everything is lost anyway.
\r
434 throw new IllegalStateException(e);
\r
439 Operation getLastOperation() {
\r
440 return transactionToken.getLastOperation();
\r
442 long getHeadRevisionId() {
\r
443 return transactionToken.getHeadRevisionId();
\r