]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java
Declare asyncCount volatile since it is accessed from multiple threads
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / State.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.locks.Condition;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReentrantLock;
18
19 import org.simantics.db.ChangeSet;
20 import org.simantics.db.Operation;
21 import org.simantics.db.VirtualGraph;
22 import org.simantics.db.common.utils.Logger;
23 import org.simantics.db.exception.DatabaseException;
24 import org.simantics.db.exception.InternalException;
25 import org.simantics.db.exception.RuntimeDatabaseException;
26 import org.simantics.db.impl.graph.WriteGraphImpl;
27 import org.simantics.db.impl.query.QueryProcessor;
28 import org.simantics.db.request.WriteOnly;
29 import org.simantics.db.request.WriteTraits;
30 import org.simantics.db.service.LifecycleSupport.LifecycleState;
31 import org.simantics.db.service.TransactionPolicySupport;
32
33 import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;
34
35 class State {
36     static final int          Closed           = 1;
37     static final int          Closing          = 2;
38     static final int          WriteTransaction = 4;
39     static final int          ReadTransaction  = 8;
40     static final int          All              = 15;
41     private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.
42     private SessionImplSocket session          = null;
43     private GraphSession      graphSession     = null;
44     private QueryProcessor    queryProvider    = null;
45     private Lock              lock             = new ReentrantLock();
46     private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.
47     private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.
48     private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.
49     private Thread            writeOwner       = null;
50     private volatile int asyncCount = 1; // Must be volatile so we don't have to synchronize getAsyncCount.
51     private TransactionToken  transactionToken = null;
52     void setCombine(boolean a) {
53         if (null != transactionToken)
54             transactionToken.setCombine(a);
55     }
56     void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {
57         this.session = session;
58         this.graphSession = graphSession;
59         this.queryProvider = queryProvider;
60         resetTransactionPolicy();
61     }
62     void resetTransactionPolicy() {
63         TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);
64         this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);
65     }
66     int getAsyncCount() {
67         return asyncCount;
68     }
69     int getReadCount() {
70         return readCount;
71     }
72     int getWriteCount() {
73         return writeCount;
74     }
75     boolean isWriteTransaction() {
76         return 0 != (state & WriteTransaction);
77     }
78     boolean isAlive() {
79         return !isClosed() && !isClosing();
80     }
81     boolean isClosed() {
82         return 0 != (state & Closed);
83     }
84     boolean isClosing() {
85         return 0 != (state & Closing);
86     }
87     boolean isWriting() {
88         return 0 != (state & WriteTransaction);
89     }
90     void close() {
91         boolean acquired = false;
92         try {
93             acquired = lock.tryLock(1, TimeUnit.MINUTES);
94         } catch (InterruptedException e1) {
95         }
96         try {
97             if (null != transactionToken)
98                 transactionToken.close();
99             graphSession.stop();
100         } catch (DatabaseException e) {
101         } finally {
102             state = Closed;
103             session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);
104             if (acquired) {
105                 condition.signalAll();
106                 lock.unlock();
107             }
108         }
109     }
110     void incAsync()
111     {
112         lock.lock();
113         try {
114             boolean closed = isClosed();
115             boolean closing = isClosing();
116             if (closed || closing)
117                 throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);
118             ++asyncCount;
119         } finally {
120             lock.unlock();
121         }
122     }
123     void decAsync()
124     {
125         lock.lock();
126         try {
127             if (asyncCount < 1)
128                 throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);
129             if (0 == --asyncCount) {
130                 condition.signal();
131             }
132         } finally {
133             lock.unlock();
134         }
135     }
136     enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }
137     WaitStatus waitClosing(long timeout) {
138         lock.lock();
139         try {
140             if (isClosed())
141                 return WaitStatus.IsClosed;
142             else if (isClosing())
143                 return WaitStatus.IsClosing;
144             state |= Closing;
145             try {
146                 if (timeout < 0) { // Infinite timeout
147                     for (int i=0; i<100; i++)
148                     while (asyncCount > 0  || readCount > 0 ||
149                            (null != writeOwner && Thread.currentThread() != writeOwner)) {
150                         condition.signalAll();
151                         condition.awaitNanos(10000000); // nanos
152                     }
153                     while (asyncCount > 0  || readCount > 0 ||
154                             (null != writeOwner && Thread.currentThread() != writeOwner)) {
155                          condition.signalAll();
156                          condition.await();
157                      }
158                 } else if (timeout > 0) {
159                     boolean interrupted = false;
160                     while (!interrupted &&
161                            (asyncCount > 0  || readCount > 0 ||
162                             (null != writeOwner && Thread.currentThread() != writeOwner)))
163                         interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);
164                 }
165                 transactionToken.closing();
166             } catch (Exception e) {
167                 e.printStackTrace();
168             }
169             state &= All ^ Closing;
170             if (readCount == 0 && writeCount == 0 && asyncCount == 0)
171                 return WaitStatus.CanBeClosed;
172             else if (null != writeOwner && Thread.currentThread() == writeOwner)
173                 return WaitStatus.Deadlock;
174             return WaitStatus.Timeout;
175         } finally {
176             lock.unlock();
177         }
178     }
179     void startReadTransaction(int thread) throws DatabaseException {
180         lock.lock();
181         try {
182             assert(readCount == 0);
183             transactionToken.startReadTransaction(thread);
184             state |= ReadTransaction;
185             session.fireStartReadTransaction();
186             ++readCount;
187         } catch (Throwable e) {
188             throw new DatabaseException("Failed to start read transaction.", e);
189         } finally {
190             lock.unlock();
191         }
192     }
193     void stopReadTransaction() throws DatabaseException {
194         lock.lock();
195         try {
196             assert (!queryProvider.hasScheduledUpdates());
197             assert (readCount == 1);
198             session.writeSupport.gc();
199             transactionToken.stopReadTransaction();
200             readCount = 0;
201             state &= All ^ ReadTransaction;
202             condition.signal();
203         } finally {
204             lock.unlock();
205         }
206     }
207
208     void startWriteTransaction(int thread)
209     throws DatabaseException {
210         lock.lock();
211         try {
212 //            System.out.println("stawt");
213             boolean closed = isClosed();
214             boolean closing = isClosing();
215             int ac = asyncCount;
216             int wc = writeCount;
217             if (closed || (closing && ac < 1) || wc < 0)
218                 throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);
219             if (writeCount == 0) {
220                 transactionToken.startWriteTransaction(thread);
221                 state |= WriteTransaction;
222                 writeOwner = Thread.currentThread();
223                 session.fireStartWriteTransaction();
224             }
225             ++writeCount;
226             assert(writeCount == 1);
227         } catch (Throwable e) {
228             throw new DatabaseException(e);
229         } finally {
230             lock.unlock();
231         }
232     }
233
234     void writeTransactionEnded() {
235         session.defaultClusterSet = null;
236         session.fireFinishWriteTransaction();
237     }
238
239     void stopWriteTransaction(ClusterStream clusterStream) {
240         if(!isAlive()) return;
241         lock.lock();
242         try {
243 //            System.out.println("stowt");
244             if (writeCount < 1)
245                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
246             else if (1 == writeCount) {
247                 boolean empty = session.clusterStream.reallyFlush();
248                 if (!empty) {
249                     String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"
250                             + "This is probably a serious error. Automatic cancel done as default recovery.";
251                     Logger.defaultLogInfo(msg);
252                     ClientChangesImpl cs = new ClientChangesImpl(session);
253                     SynchronizeContext context = new SynchronizeContext(session, cs, 1);
254                     transactionToken.cancelBegin(session.writeSupport, context, clusterStream);
255                 }
256                 transactionToken.stopWriteTransaction();
257                 state &= All ^ WriteTransaction;
258                 writeOwner = null;
259                 condition.signal();
260                 writeTransactionEnded();
261             }
262             --writeCount;
263         } catch (Throwable e) {
264             e.printStackTrace();
265             this.close(); // Everything is lost anyway.
266             throw new IllegalStateException(e);
267         } finally {
268             lock.unlock();
269         }
270     }
271
272     void cancelWriteTransaction(WriteGraphImpl graph) {
273         lock.lock();
274         try {
275             if (writeCount < 1)
276                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
277             VirtualGraph vg = graph.getProvider();
278             if (null == vg) { // Skip if virtual graph.
279                 boolean empty = session.clusterStream.reallyFlush();
280                 if (!empty) { // Something to cancel.
281                     // Send all local changes to server so it can calculate correct reverse change set.
282                     ClientChangesImpl cs = new ClientChangesImpl(session);
283                     SynchronizeContext context = new SynchronizeContext(session, cs, 1);
284                     // This will send cancel and fetch reverse change set from server.
285                     transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);
286                     try {
287                         final boolean undo = false;
288                         if (!context.isOk(undo)) // this is a blocking operation
289                             throw new InternalException("Cancel failed. This should never happen. Contact application support.");
290 //                        System.out.println("session cs: " + session.clientChanges);
291 //                        System.out.println("reverse cs: " + cs);
292                         queryProvider.performDirtyUpdates(graph);
293                         queryProvider.performScheduledUpdates(graph);
294                     } catch (DatabaseException e) {
295                         Logger.defaultLogError(e);
296                     }
297                     // This will send and accept the reverse change set.
298                     transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
299                 }
300             }
301             session.writeSupport.clearMetadata();
302             if (--writeCount == 0) {
303                 transactionToken.stopWriteTransaction();
304                 state &= All ^ WriteTransaction;
305                 writeOwner = null;
306                 condition.signal();
307                 writeTransactionEnded();
308             }
309         } catch (Throwable e) {
310             e.printStackTrace();
311             this.close(); // Everything is lost anyway.
312             throw new IllegalStateException(e);
313         } finally {
314             lock.unlock();
315         }
316     }
317
318     void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {
319         assert(request != null);
320         graph.commitAccessorChanges(request);
321         boolean writeOnly = request instanceof WriteOnly;
322         lock.lock();
323         try {
324             VirtualGraph vg = graph.getProvider();
325             if (writeCount == 1) {
326
327                 if (vg != null && clusterStream.isDirty())
328                     new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();
329
330 //                long start = System.nanoTime();
331                 if (null == vg) {
332                     clusterStream.reallyFlush();
333                     // This was fired just before in handleUpdatesAndMetadata
334 //                    if (!writeOnly)
335 //                        session.fireMetadataListeners(graph, cs);
336                 } else
337                     clusterStream.clear();
338
339 //                long duration = System.nanoTime() - start;
340 //                System.out.println("reallyFlush " + 1e-9*duration + "s. ");
341
342                 session.clientChanges = new ClientChangesImpl(session);
343
344
345 //                start = System.nanoTime();
346                 queryProvider.performScheduledUpdates(graph);
347 //                duration = System.nanoTime() - start;
348 //                System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
349
350                 // Process updates as long as pending primitives exist
351                 while (session.dirtyPrimitives) {
352                     session.dirtyPrimitives = false;
353                     queryProvider.performDirtyUpdates(graph);
354                     queryProvider.performScheduledUpdates(graph);
355                 }
356
357                 if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
358                     session.fireReactionsToCommit(graph, cs); // Does not throw exception!
359
360                 writeTransactionEnded();
361                 session.writeSupport.gc();
362             } else // Only the last writer can commit.
363                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
364 //            long start = System.nanoTime();
365 //            System.err.println("commit");
366
367 //            start = System.nanoTime();
368             if (null == vg)
369                 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);
370             else {
371                 // There should not be meta data because this is transient graph.
372                 // This is an insurance that this is the case for now.
373                 int n = session.writeSupport.clearMetadata();
374                 if (n > 0)
375                     if (SessionImplSocket.DEBUG)
376                         System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);
377             }
378             transactionToken.stopWriteTransaction();
379 //            long duration = System.nanoTime() - start;
380 //            System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");
381
382
383 //            long duration = System.nanoTime() - start;
384 //            System.err.println("commit2 " + 1e-9*duration);
385             state &= All ^ WriteTransaction;
386             writeCount = 0;
387             writeOwner = null;
388             condition.signal();
389         } catch (Throwable e) {
390             Logger.defaultLogError(e);
391             this.close(); // Everything is lost anyway.
392             throw new IllegalStateException(e);
393         } finally {
394             lock.unlock();
395         }
396     }
397
398     void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
399         VirtualGraph vg = graph.getProvider();
400         if (null != vg)
401             return;
402         lock.lock();
403         try {
404             clusterStream.reallyFlush();
405             transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
406             // See below for explanation, the same reasoning should apply here too.
407             if (graph.writeSupport instanceof WriteOnlySupport)
408                 graph.writeSupport.flushCluster();
409         } catch (Throwable e) {
410             Logger.defaultLogError(e);
411             this.close(); // Everything is lost anyway.
412             throw new IllegalStateException(e);
413         } finally {
414             lock.unlock();
415         }
416     }
417     void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
418         VirtualGraph vg = graph.getProvider();
419         if (null != vg)
420             return;
421         lock.lock();
422         try {
423             transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
424             transactionToken.setCombine(true);
425             // This was added because test Issue3199Test2 failed because
426             // write only cluster does not need cluster when allocating resource index
427             // and the first claim after commitAndContinue2 caused cluster to be loaded
428             // from server and of course it's resource index was off by one.
429             if (graph.writeSupport instanceof WriteOnlySupport)
430                 graph.writeSupport.flushCluster();
431         } catch (Throwable e) {
432             Logger.defaultLogError(e);
433             this.close(); // Everything is lost anyway.
434             throw new IllegalStateException(e);
435         } finally {
436             lock.unlock();
437         }
438     }
439     Operation getLastOperation() {
440         return transactionToken.getLastOperation();
441     }
442     long getHeadRevisionId() {
443         return transactionToken.getHeadRevisionId();
444     }
445 }