efd82b6ca8f2822ab59871793c515d8a5a4f8e80
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / State.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.locks.Condition;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReentrantLock;
18
19 import org.simantics.db.ChangeSet;
20 import org.simantics.db.Disposable;
21 import org.simantics.db.Operation;
22 import org.simantics.db.VirtualGraph;
23 import org.simantics.db.common.utils.Logger;
24 import org.simantics.db.exception.DatabaseException;
25 import org.simantics.db.exception.InternalException;
26 import org.simantics.db.exception.RuntimeDatabaseException;
27 import org.simantics.db.impl.graph.WriteGraphImpl;
28 import org.simantics.db.impl.query.QueryProcessor;
29 import org.simantics.db.request.WriteOnly;
30 import org.simantics.db.request.WriteTraits;
31 import org.simantics.db.service.LifecycleSupport.LifecycleState;
32 import org.simantics.db.service.TransactionPolicySupport;
33
34 import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;
35
36 class State {
37     static final int          Closed           = 1;
38     static final int          Closing          = 2;
39     static final int          WriteTransaction = 4;
40     static final int          ReadTransaction  = 8;
41     static final int          All              = 15;
42     private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.
43     private SessionImplSocket session          = null;
44     private GraphSession      graphSession     = null;
45     private QueryProcessor    queryProvider    = null;
46     private Lock              lock             = new ReentrantLock();
47     private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.
48     private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.
49     private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.
50     private Thread            writeOwner       = null;
51     private volatile int asyncCount = 1; // Must be volatile so we don't have to synchronize getAsyncCount.
52     private TransactionToken  transactionToken = null;
53     void setCombine(boolean a) {
54         if (null != transactionToken)
55             transactionToken.setCombine(a);
56     }
57     void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {
58         this.session = session;
59         this.graphSession = graphSession;
60         this.queryProvider = queryProvider;
61         resetTransactionPolicy();
62     }
63     void resetTransactionPolicy() {
64         TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);
65         this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);
66     }
67     int getAsyncCount() {
68         return asyncCount;
69     }
70     int getReadCount() {
71         return readCount;
72     }
73     int getWriteCount() {
74         return writeCount;
75     }
76     boolean isWriteTransaction() {
77         return 0 != (state & WriteTransaction);
78     }
79     boolean isAlive() {
80         return !isClosed() && !isClosing();
81     }
82     boolean isClosed() {
83         return 0 != (state & Closed);
84     }
85     boolean isClosing() {
86         return 0 != (state & Closing);
87     }
88     boolean isWriting() {
89         return 0 != (state & WriteTransaction);
90     }
91     void close() {
92         boolean acquired = false;
93         try {
94             acquired = lock.tryLock(1, TimeUnit.MINUTES);
95         } catch (InterruptedException e1) {
96         }
97         try {
98             if (null != transactionToken)
99                 transactionToken.close();
100             graphSession.stop();
101         } catch (DatabaseException e) {
102         } finally {
103             state = Closed;
104             session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);
105             if (acquired) {
106                 condition.signalAll();
107                 lock.unlock();
108             }
109         }
110     }
111     void incAsync()
112     {
113         lock.lock();
114         try {
115             boolean closed = isClosed();
116             boolean closing = isClosing();
117             if (closed || closing)
118                 throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);
119             ++asyncCount;
120         } finally {
121             lock.unlock();
122         }
123     }
124     void decAsync()
125     {
126         lock.lock();
127         try {
128             if (asyncCount < 1)
129                 throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);
130             if (0 == --asyncCount) {
131                 condition.signal();
132             }
133         } finally {
134             lock.unlock();
135         }
136     }
137     enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }
138     WaitStatus waitClosing(long timeout) {
139         lock.lock();
140         try {
141             if (isClosed())
142                 return WaitStatus.IsClosed;
143             else if (isClosing())
144                 return WaitStatus.IsClosing;
145             state |= Closing;
146             try {
147                 if (timeout < 0) { // Infinite timeout
148                     for (int i=0; i<100; i++)
149                     while (asyncCount > 0  || readCount > 0 ||
150                            (null != writeOwner && Thread.currentThread() != writeOwner)) {
151                         condition.signalAll();
152                         condition.awaitNanos(10000000); // nanos
153                     }
154                     while (asyncCount > 0  || readCount > 0 ||
155                             (null != writeOwner && Thread.currentThread() != writeOwner)) {
156                          condition.signalAll();
157                          condition.await();
158                      }
159                 } else if (timeout > 0) {
160                     boolean interrupted = false;
161                     while (!interrupted &&
162                            (asyncCount > 0  || readCount > 0 ||
163                             (null != writeOwner && Thread.currentThread() != writeOwner)))
164                         interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);
165                 }
166                 transactionToken.closing();
167             } catch (Exception e) {
168                 e.printStackTrace();
169             }
170             state &= All ^ Closing;
171             if (readCount == 0 && writeCount == 0 && asyncCount == 0)
172                 return WaitStatus.CanBeClosed;
173             else if (null != writeOwner && Thread.currentThread() == writeOwner)
174                 return WaitStatus.Deadlock;
175             return WaitStatus.Timeout;
176         } finally {
177             lock.unlock();
178         }
179     }
180     void startReadTransaction(int thread) throws DatabaseException {
181         lock.lock();
182         try {
183             assert(readCount == 0);
184             transactionToken.startReadTransaction(thread);
185             state |= ReadTransaction;
186             session.fireStartReadTransaction();
187             ++readCount;
188         } catch (Throwable e) {
189             throw new DatabaseException("Failed to start read transaction.", e);
190         } finally {
191             lock.unlock();
192         }
193     }
194     void stopReadTransaction() throws DatabaseException {
195         lock.lock();
196         try {
197             assert (!queryProvider.hasScheduledUpdates());
198             assert (readCount == 1);
199             session.writeSupport.gc();
200             transactionToken.stopReadTransaction();
201             readCount = 0;
202             state &= All ^ ReadTransaction;
203             condition.signal();
204         } finally {
205             lock.unlock();
206         }
207     }
208
209     void startWriteTransaction(int thread)
210     throws DatabaseException {
211         lock.lock();
212         try {
213 //            System.out.println("stawt");
214             boolean closed = isClosed();
215             boolean closing = isClosing();
216             int ac = asyncCount;
217             int wc = writeCount;
218             if (closed || (closing && ac < 1) || wc < 0)
219                 throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);
220             if (writeCount == 0) {
221                 transactionToken.startWriteTransaction(thread);
222                 state |= WriteTransaction;
223                 writeOwner = Thread.currentThread();
224                 session.fireStartWriteTransaction();
225             }
226             ++writeCount;
227             assert(writeCount == 1);
228         } catch (Throwable e) {
229             throw new DatabaseException(e);
230         } finally {
231             lock.unlock();
232         }
233     }
234
235     void writeTransactionEnded() {
236         session.defaultClusterSet = null;
237         session.fireFinishWriteTransaction();
238     }
239
240     void stopWriteTransaction(ClusterStream clusterStream) {
241         if(!isAlive()) return;
242         lock.lock();
243         try {
244 //            System.out.println("stowt");
245             if (writeCount < 1)
246                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
247             else if (1 == writeCount) {
248                 boolean empty = session.clusterStream.reallyFlush();
249                 if (!empty) {
250                     String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"
251                             + "This is probably a serious error. Automatic cancel done as default recovery.";
252                     Logger.defaultLogInfo(msg);
253                     ClientChangesImpl cs = new ClientChangesImpl(session);
254                     SynchronizeContext context = new SynchronizeContext(session, cs, 1);
255                     transactionToken.cancelBegin(session.writeSupport, context, clusterStream);
256                 }
257                 transactionToken.stopWriteTransaction();
258                 state &= All ^ WriteTransaction;
259                 writeOwner = null;
260                 condition.signal();
261                 writeTransactionEnded();
262             }
263             --writeCount;
264         } catch (Throwable e) {
265             e.printStackTrace();
266             this.close(); // Everything is lost anyway.
267             throw new IllegalStateException(e);
268         } finally {
269             lock.unlock();
270         }
271     }
272
273     void cancelWriteTransaction(WriteGraphImpl graph) {
274         lock.lock();
275         try {
276             if (writeCount < 1)
277                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
278             VirtualGraph vg = graph.getProvider();
279             if (null == vg) { // Skip if virtual graph.
280                 boolean empty = session.clusterStream.reallyFlush();
281                 if (!empty) { // Something to cancel.
282                     // Send all local changes to server so it can calculate correct reverse change set.
283                     ClientChangesImpl cs = new ClientChangesImpl(session);
284                     SynchronizeContext context = new SynchronizeContext(session, cs, 1);
285                     // This will send cancel and fetch reverse change set from server.
286                     transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);
287                     try {
288                         final boolean undo = false;
289                         if (!context.isOk(undo)) // this is a blocking operation
290                             throw new InternalException("Cancel failed. This should never happen. Contact application support.");
291 //                        System.out.println("session cs: " + session.clientChanges);
292 //                        System.out.println("reverse cs: " + cs);
293                         queryProvider.performDirtyUpdates(graph);
294                         queryProvider.performScheduledUpdates(graph);
295                     } catch (DatabaseException e) {
296                         Logger.defaultLogError(e);
297                     }
298                     // This will send and accept the reverse change set.
299                     transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
300                 } else {
301                     queryProvider.performDirtyUpdates(graph);
302                     queryProvider.performScheduledUpdates(graph);
303                 }
304             }
305             session.writeSupport.clearMetadata();
306             if (--writeCount == 0) {
307                 transactionToken.stopWriteTransaction();
308                 state &= All ^ WriteTransaction;
309                 writeOwner = null;
310                 condition.signal();
311                 writeTransactionEnded();
312             }
313         } catch (Throwable e) {
314             e.printStackTrace();
315             this.close(); // Everything is lost anyway.
316             throw new IllegalStateException(e);
317         } finally {
318             lock.unlock();
319         }
320     }
321
322     void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {
323         assert(request != null);
324         graph.commitAccessorChanges(request);
325         boolean writeOnly = request instanceof WriteOnly;
326         lock.lock();
327         try {
328             VirtualGraph vg = graph.getProvider();
329             if (writeCount == 1) {
330
331                 if (vg != null && clusterStream.isDirty())
332                     new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();
333
334                 // This is needed even when the write targets a virtual graph -
335                 // deny can always remove a persistent statement. 
336                 clusterStream.reallyFlush();
337
338                 session.clientChanges = new ClientChangesImpl(session);
339
340
341 //                start = System.nanoTime();
342                 queryProvider.performScheduledUpdates(graph);
343 //                duration = System.nanoTime() - start;
344 //                System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
345
346                 // Process updates as long as pending primitives exist
347                 while (session.dirtyPrimitives) {
348                     session.dirtyPrimitives = false;
349                     queryProvider.performDirtyUpdates(graph);
350                     queryProvider.performScheduledUpdates(graph);
351                 }
352
353                 if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
354                     session.fireReactionsToCommit(graph, cs); // Does not throw exception!
355
356                 writeTransactionEnded();
357                 session.writeSupport.gc();
358             } else // Only the last writer can commit.
359                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
360 //            long start = System.nanoTime();
361 //            System.err.println("commit");
362
363 //            start = System.nanoTime();
364             if (null == vg)
365                 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);
366             else {
367                 // There should not be meta data because this is transient graph.
368                 // This is an insurance that this is the case for now.
369                 int n = session.writeSupport.clearMetadata();
370                 if (n > 0)
371                     if (SessionImplSocket.DEBUG)
372                         System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);
373             }
374             transactionToken.stopWriteTransaction();
375 //            long duration = System.nanoTime() - start;
376 //            System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");
377
378
379 //            long duration = System.nanoTime() - start;
380 //            System.err.println("commit2 " + 1e-9*duration);
381             state &= All ^ WriteTransaction;
382             writeCount = 0;
383             writeOwner = null;
384             condition.signal();
385         } catch (Throwable e) {
386             Logger.defaultLogError(e);
387             this.close(); // Everything is lost anyway.
388             throw new IllegalStateException(e);
389         } finally {
390             Disposable.safeDispose(cs);
391             lock.unlock();
392         }
393     }
394
395     void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
396         VirtualGraph vg = graph.getProvider();
397         if (null != vg)
398             return;
399         lock.lock();
400         try {
401             clusterStream.reallyFlush();
402             transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
403             // See below for explanation, the same reasoning should apply here too.
404             if (graph.writeSupport instanceof WriteOnlySupport)
405                 graph.writeSupport.flushCluster();
406         } catch (Throwable e) {
407             Logger.defaultLogError(e);
408             this.close(); // Everything is lost anyway.
409             throw new IllegalStateException(e);
410         } finally {
411             lock.unlock();
412         }
413     }
414     void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
415         VirtualGraph vg = graph.getProvider();
416         if (null != vg)
417             return;
418         lock.lock();
419         try {
420             transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
421             transactionToken.setCombine(true);
422             // This was added because test Issue3199Test2 failed because
423             // write only cluster does not need cluster when allocating resource index
424             // and the first claim after commitAndContinue2 caused cluster to be loaded
425             // from server and of course it's resource index was off by one.
426             if (graph.writeSupport instanceof WriteOnlySupport)
427                 graph.writeSupport.flushCluster();
428         } catch (Throwable e) {
429             Logger.defaultLogError(e);
430             this.close(); // Everything is lost anyway.
431             throw new IllegalStateException(e);
432         } finally {
433             lock.unlock();
434         }
435     }
436     Operation getLastOperation() {
437         return transactionToken.getLastOperation();
438     }
439     long getHeadRevisionId() {
440         return transactionToken.getHeadRevisionId();
441     }
442 }