]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / State.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.locks.Condition;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReentrantLock;
18
19 import org.simantics.db.ChangeSet;
20 import org.simantics.db.Disposable;
21 import org.simantics.db.Operation;
22 import org.simantics.db.VirtualGraph;
23 import org.simantics.db.common.utils.Logger;
24 import org.simantics.db.exception.DatabaseException;
25 import org.simantics.db.exception.InternalException;
26 import org.simantics.db.exception.RuntimeDatabaseException;
27 import org.simantics.db.impl.graph.ReadGraphImpl;
28 import org.simantics.db.impl.graph.WriteGraphImpl;
29 import org.simantics.db.impl.query.QueryProcessor;
30 import org.simantics.db.request.WriteOnly;
31 import org.simantics.db.request.WriteTraits;
32 import org.simantics.db.service.LifecycleSupport.LifecycleState;
33 import org.simantics.db.service.TransactionPolicySupport;
34
35 import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;
36
37 class State {
38     static final int          Closed           = 1;
39     static final int          Closing          = 2;
40     static final int          WriteTransaction = 4;
41     static final int          ReadTransaction  = 8;
42     static final int          All              = 15;
43     private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.
44     private SessionImplSocket session          = null;
45     private GraphSession      graphSession     = null;
46     private QueryProcessor    queryProvider    = null;
47     private Lock              lock             = new ReentrantLock();
48     private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.
49     private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.
50     private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.
51     private Thread            writeOwner       = null;
52     private volatile int asyncCount = 1; // Must be volatile so we don't have to synchronize getAsyncCount.
53     private TransactionToken  transactionToken = null;
54     void setCombine(boolean a) {
55         if (null != transactionToken)
56             transactionToken.setCombine(a);
57     }
58     void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {
59         this.session = session;
60         this.graphSession = graphSession;
61         this.queryProvider = queryProvider;
62         resetTransactionPolicy();
63     }
64     void resetTransactionPolicy() {
65         TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);
66         this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);
67     }
68     int getAsyncCount() {
69         return asyncCount;
70     }
71     int getReadCount() {
72         return readCount;
73     }
74     int getWriteCount() {
75         return writeCount;
76     }
77     boolean isWriteTransaction() {
78         return 0 != (state & WriteTransaction);
79     }
80     boolean isAlive() {
81         return !isClosed() && !isClosing();
82     }
83     boolean isClosed() {
84         return 0 != (state & Closed);
85     }
86     boolean isClosing() {
87         return 0 != (state & Closing);
88     }
89     boolean isWriting() {
90         return 0 != (state & WriteTransaction);
91     }
92     void close() {
93         boolean acquired = false;
94         try {
95             acquired = lock.tryLock(1, TimeUnit.MINUTES);
96         } catch (InterruptedException e1) {
97         }
98         try {
99             if (null != transactionToken)
100                 transactionToken.close();
101             graphSession.stop();
102         } catch (DatabaseException e) {
103         } finally {
104             state = Closed;
105             session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);
106             if (acquired) {
107                 condition.signalAll();
108                 lock.unlock();
109             }
110         }
111     }
112     void incAsync()
113     {
114         lock.lock();
115         try {
116             boolean closed = isClosed();
117             boolean closing = isClosing();
118             if (closed || closing)
119                 throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);
120             ++asyncCount;
121         } finally {
122             lock.unlock();
123         }
124     }
125     void decAsync()
126     {
127         lock.lock();
128         try {
129             if (asyncCount < 1)
130                 throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);
131             if (0 == --asyncCount) {
132                 condition.signal();
133             }
134         } finally {
135             lock.unlock();
136         }
137     }
138     enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }
139     WaitStatus waitClosing(long timeout) {
140         lock.lock();
141         try {
142             if (isClosed())
143                 return WaitStatus.IsClosed;
144             else if (isClosing())
145                 return WaitStatus.IsClosing;
146             state |= Closing;
147             try {
148                 if (timeout < 0) { // Infinite timeout
149                     for (int i=0; i<100; i++)
150                     while (asyncCount > 0  || readCount > 0 ||
151                            (null != writeOwner && Thread.currentThread() != writeOwner)) {
152                         condition.signalAll();
153                         condition.awaitNanos(10000000); // nanos
154                     }
155                     while (asyncCount > 0  || readCount > 0 ||
156                             (null != writeOwner && Thread.currentThread() != writeOwner)) {
157                          condition.signalAll();
158                          condition.await();
159                      }
160                 } else if (timeout > 0) {
161                     boolean interrupted = false;
162                     while (!interrupted &&
163                            (asyncCount > 0  || readCount > 0 ||
164                             (null != writeOwner && Thread.currentThread() != writeOwner)))
165                         interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);
166                 }
167                 transactionToken.closing();
168             } catch (Exception e) {
169                 e.printStackTrace();
170             }
171             state &= All ^ Closing;
172             if (readCount == 0 && writeCount == 0 && asyncCount == 0)
173                 return WaitStatus.CanBeClosed;
174             else if (null != writeOwner && Thread.currentThread() == writeOwner)
175                 return WaitStatus.Deadlock;
176             return WaitStatus.Timeout;
177         } finally {
178             lock.unlock();
179         }
180     }
181     void startReadTransaction(int thread) throws DatabaseException {
182         lock.lock();
183         try {
184             assert(readCount == 0);
185             transactionToken.startReadTransaction(thread);
186             state |= ReadTransaction;
187             session.fireStartReadTransaction();
188             ++readCount;
189         } catch (Throwable e) {
190             throw new DatabaseException("Failed to start read transaction.", e);
191         } finally {
192             lock.unlock();
193         }
194     }
195     void stopReadTransaction() throws DatabaseException {
196         lock.lock();
197         try {
198             assert (!queryProvider.listening.hasScheduledUpdates());
199             assert (readCount == 1);
200             session.writeSupport.gc();
201             transactionToken.stopReadTransaction();
202             readCount = 0;
203             state &= All ^ ReadTransaction;
204             condition.signal();
205         } finally {
206             lock.unlock();
207         }
208     }
209
210     void startWriteTransaction(int thread)
211     throws DatabaseException {
212         lock.lock();
213         try {
214 //            System.out.println("stawt");
215             boolean closed = isClosed();
216             boolean closing = isClosing();
217             int ac = asyncCount;
218             int wc = writeCount;
219             if (closed || (closing && ac < 1) || wc < 0)
220                 throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);
221             if (writeCount == 0) {
222                 transactionToken.startWriteTransaction(thread);
223                 state |= WriteTransaction;
224                 writeOwner = Thread.currentThread();
225                 session.fireStartWriteTransaction();
226             }
227             ++writeCount;
228             assert(writeCount == 1);
229         } catch (Throwable e) {
230             throw new DatabaseException(e);
231         } finally {
232             lock.unlock();
233         }
234     }
235
236     void writeTransactionEnded() {
237         session.defaultClusterSet = null;
238         session.fireFinishWriteTransaction();
239     }
240
241     void stopWriteTransaction(ClusterStream clusterStream) {
242         if(!isAlive()) return;
243         lock.lock();
244         try {
245 //            System.out.println("stowt");
246             if (writeCount < 1)
247                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
248             else if (1 == writeCount) {
249                 boolean empty = session.clusterStream.reallyFlush();
250                 if (!empty) {
251                     String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"
252                             + "This is probably a serious error. Automatic cancel done as default recovery.";
253                     Logger.defaultLogInfo(msg);
254                     ClientChangesImpl cs = new ClientChangesImpl(session);
255                     SynchronizeContext context = new SynchronizeContext(session, cs, 1);
256                     transactionToken.cancelBegin(session.writeSupport, context, clusterStream);
257                 }
258                 transactionToken.stopWriteTransaction();
259                 state &= All ^ WriteTransaction;
260                 writeOwner = null;
261                 condition.signal();
262                 writeTransactionEnded();
263             }
264             --writeCount;
265         } catch (Throwable e) {
266             e.printStackTrace();
267             this.close(); // Everything is lost anyway.
268             throw new IllegalStateException(e);
269         } finally {
270             lock.unlock();
271         }
272     }
273
274     void cancelWriteTransaction(WriteGraphImpl graph) {
275         lock.lock();
276         try {
277             if (writeCount < 1)
278                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
279             VirtualGraph vg = graph.getProvider();
280             if (null == vg) { // Skip if virtual graph.
281                 boolean empty = session.clusterStream.reallyFlush();
282                 if (!empty) { // Something to cancel.
283                     // Send all local changes to server so it can calculate correct reverse change set.
284                     ClientChangesImpl cs = new ClientChangesImpl(session);
285                     SynchronizeContext context = new SynchronizeContext(session, cs, 1);
286                     // This will send cancel and fetch reverse change set from server.
287                     transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);
288                     try {
289                         final boolean undo = false;
290                         if (!context.isOk(undo)) // this is a blocking operation
291                             throw new InternalException("Cancel failed. This should never happen.");
292 //                        System.out.println("session cs: " + session.clientChanges);
293 //                        System.out.println("reverse cs: " + cs);
294                         queryProvider.propagateChangesInQueryCache(graph);
295                         queryProvider.listening.fireListeners(graph);
296                     } catch (DatabaseException e) {
297                         Logger.defaultLogError(e);
298                     }
299                     // This will send and accept the reverse change set.
300                     transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
301                 } else {
302                     queryProvider.propagateChangesInQueryCache(graph);
303                     queryProvider.listening.fireListeners(graph);
304                 }
305             }
306             session.writeSupport.clearMetadata();
307             if (--writeCount == 0) {
308                 transactionToken.stopWriteTransaction();
309                 state &= All ^ WriteTransaction;
310                 writeOwner = null;
311                 condition.signal();
312                 writeTransactionEnded();
313             }
314         } catch (Throwable e) {
315             e.printStackTrace();
316             this.close(); // Everything is lost anyway.
317             throw new IllegalStateException(e);
318         } finally {
319             lock.unlock();
320         }
321     }
322
323     void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {
324         assert(request != null);
325         graph.commitAccessorChanges(request);
326         boolean writeOnly = request instanceof WriteOnly;
327         lock.lock();
328         try {
329             VirtualGraph vg = graph.getProvider();
330             if (writeCount == 1) {
331
332                 if (vg != null && clusterStream.isDirty())
333                     new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();
334
335                 // This is needed even when the write targets a virtual graph -
336                 // deny can always remove a persistent statement. 
337                 clusterStream.reallyFlush();
338
339                 session.clientChanges = new ClientChangesImpl(session);
340
341
342 //                start = System.nanoTime();
343                 queryProvider.propagateChangesInQueryCache(graph);
344                 ReadGraphImpl listenerGraph = graph.forRecompute(null);
345                 listenerGraph.asyncBarrier.inc();
346                 queryProvider.listening.fireListeners(listenerGraph);
347                 listenerGraph.asyncBarrier.dec();
348 //                duration = System.nanoTime() - start;
349 //                System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
350
351                 // Process updates as long as pending primitives exist
352                 while (session.dirtyPrimitives) {
353                     session.dirtyPrimitives = false;
354                     queryProvider.propagateChangesInQueryCache(graph);
355                     queryProvider.listening.fireListeners(graph);
356                 }
357
358                 if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
359                     session.fireReactionsToCommit(graph, cs); // Does not throw exception!
360
361                 writeTransactionEnded();
362                 session.writeSupport.gc();
363             } else // Only the last writer can commit.
364                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
365 //            long start = System.nanoTime();
366 //            System.err.println("commit");
367
368 //            start = System.nanoTime();
369             if (null == vg)
370                 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);
371             else {
372                 // There should not be meta data because this is transient graph.
373                 // This is an insurance that this is the case for now.
374                 int n = session.writeSupport.clearMetadata();
375                 if (n > 0)
376                     if (SessionImplSocket.DEBUG)
377                         System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);
378             }
379             transactionToken.stopWriteTransaction();
380 //            long duration = System.nanoTime() - start;
381 //            System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");
382
383
384 //            long duration = System.nanoTime() - start;
385 //            System.err.println("commit2 " + 1e-9*duration);
386             state &= All ^ WriteTransaction;
387             writeCount = 0;
388             writeOwner = null;
389             condition.signal();
390         } catch (Throwable e) {
391             Logger.defaultLogError(e);
392             this.close(); // Everything is lost anyway.
393             throw new IllegalStateException(e);
394         } finally {
395             Disposable.safeDispose(cs);
396             lock.unlock();
397         }
398     }
399
400     void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
401         VirtualGraph vg = graph.getProvider();
402         if (null != vg)
403             return;
404         lock.lock();
405         try {
406             clusterStream.reallyFlush();
407             transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
408             // See below for explanation, the same reasoning should apply here too.
409             if (graph.writeSupport instanceof WriteOnlySupport)
410                 graph.writeSupport.flushCluster();
411         } catch (Throwable e) {
412             Logger.defaultLogError(e);
413             this.close(); // Everything is lost anyway.
414             throw new IllegalStateException(e);
415         } finally {
416             lock.unlock();
417         }
418     }
419     void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
420         VirtualGraph vg = graph.getProvider();
421         if (null != vg)
422             return;
423         lock.lock();
424         try {
425             transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
426             transactionToken.setCombine(true);
427             // This was added because test Issue3199Test2 failed because
428             // write only cluster does not need cluster when allocating resource index
429             // and the first claim after commitAndContinue2 caused cluster to be loaded
430             // from server and of course it's resource index was off by one.
431             if (graph.writeSupport instanceof WriteOnlySupport)
432                 graph.writeSupport.flushCluster();
433         } catch (Throwable e) {
434             Logger.defaultLogError(e);
435             this.close(); // Everything is lost anyway.
436             throw new IllegalStateException(e);
437         } finally {
438             lock.unlock();
439         }
440     }
441     Operation getLastOperation() {
442         return transactionToken.getLastOperation();
443     }
444     long getHeadRevisionId() {
445         return transactionToken.getHeadRevisionId();
446     }
447 }