]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java
Resolve "A tag with an associated listener causes issues in DelayedWrite
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / State.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.locks.Condition;
16 import java.util.concurrent.locks.Lock;
17 import java.util.concurrent.locks.ReentrantLock;
18
19 import org.simantics.db.ChangeSet;
20 import org.simantics.db.Disposable;
21 import org.simantics.db.Operation;
22 import org.simantics.db.VirtualGraph;
23 import org.simantics.db.common.utils.Logger;
24 import org.simantics.db.exception.DatabaseException;
25 import org.simantics.db.exception.InternalException;
26 import org.simantics.db.exception.RuntimeDatabaseException;
27 import org.simantics.db.impl.graph.ReadGraphImpl;
28 import org.simantics.db.impl.graph.WriteGraphImpl;
29 import org.simantics.db.impl.query.QueryProcessor;
30 import org.simantics.db.request.WriteOnly;
31 import org.simantics.db.request.WriteTraits;
32 import org.simantics.db.service.LifecycleSupport.LifecycleState;
33 import org.simantics.db.service.TransactionPolicySupport;
34
35 import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;
36
37 class State {
38     static final int          Closed           = 1;
39     static final int          Closing          = 2;
40     static final int          WriteTransaction = 4;
41     static final int          ReadTransaction  = 8;
42     static final int          All              = 15;
43     private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.
44     private SessionImplSocket session          = null;
45     private GraphSession      graphSession     = null;
46     private QueryProcessor    queryProvider    = null;
47     private Lock              lock             = new ReentrantLock();
48     private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.
49     private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.
50     private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.
51     private Thread            writeOwner       = null;
52     private volatile int asyncCount = 1; // Must be volatile so we don't have to synchronize getAsyncCount.
53     private TransactionToken  transactionToken = null;
54     void setCombine(boolean a) {
55         if (null != transactionToken)
56             transactionToken.setCombine(a);
57     }
58     void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {
59         this.session = session;
60         this.graphSession = graphSession;
61         this.queryProvider = queryProvider;
62         resetTransactionPolicy();
63     }
64     void resetTransactionPolicy() {
65         TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);
66         this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);
67     }
68     int getAsyncCount() {
69         return asyncCount;
70     }
71     int getReadCount() {
72         return readCount;
73     }
74     int getWriteCount() {
75         return writeCount;
76     }
77     boolean isWriteTransaction() {
78         return 0 != (state & WriteTransaction);
79     }
80     boolean isAlive() {
81         return !isClosed() && !isClosing();
82     }
83     boolean isClosed() {
84         return 0 != (state & Closed);
85     }
86     boolean isClosing() {
87         return 0 != (state & Closing);
88     }
89     boolean isWriting() {
90         return 0 != (state & WriteTransaction);
91     }
92     void close() {
93         boolean acquired = false;
94         try {
95             acquired = lock.tryLock(1, TimeUnit.MINUTES);
96         } catch (InterruptedException e1) {
97         }
98         try {
99             if (null != transactionToken)
100                 transactionToken.close();
101             graphSession.stop();
102         } catch (DatabaseException e) {
103         } finally {
104             state = Closed;
105             session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);
106             if (acquired) {
107                 condition.signalAll();
108                 lock.unlock();
109             }
110         }
111     }
112     void incAsync()
113     {
114         lock.lock();
115         try {
116             boolean closed = isClosed();
117             boolean closing = isClosing();
118             if (closed || closing)
119                 throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);
120             ++asyncCount;
121         } finally {
122             lock.unlock();
123         }
124     }
125     void decAsync()
126     {
127         lock.lock();
128         try {
129             if (asyncCount < 1)
130                 throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);
131             if (0 == --asyncCount) {
132                 condition.signal();
133             }
134         } finally {
135             lock.unlock();
136         }
137     }
138     enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }
139     WaitStatus waitClosing(long timeout) {
140         lock.lock();
141         try {
142             if (isClosed())
143                 return WaitStatus.IsClosed;
144             else if (isClosing())
145                 return WaitStatus.IsClosing;
146             state |= Closing;
147             try {
148                 if (timeout < 0) { // Infinite timeout
149                     for (int i=0; i<100; i++)
150                     while (asyncCount > 0  || readCount > 0 ||
151                            (null != writeOwner && Thread.currentThread() != writeOwner)) {
152                         condition.signalAll();
153                         condition.awaitNanos(10000000); // nanos
154                     }
155                     while (asyncCount > 0  || readCount > 0 ||
156                             (null != writeOwner && Thread.currentThread() != writeOwner)) {
157                          condition.signalAll();
158                          condition.await();
159                      }
160                 } else if (timeout > 0) {
161                     boolean interrupted = false;
162                     while (!interrupted &&
163                            (asyncCount > 0  || readCount > 0 ||
164                             (null != writeOwner && Thread.currentThread() != writeOwner)))
165                         interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);
166                 }
167                 transactionToken.closing();
168             } catch (Exception e) {
169                 e.printStackTrace();
170             }
171             state &= All ^ Closing;
172             if (readCount == 0 && writeCount == 0 && asyncCount == 0)
173                 return WaitStatus.CanBeClosed;
174             else if (null != writeOwner && Thread.currentThread() == writeOwner)
175                 return WaitStatus.Deadlock;
176             return WaitStatus.Timeout;
177         } finally {
178             lock.unlock();
179         }
180     }
181     void startReadTransaction(int thread) throws DatabaseException {
182         lock.lock();
183         try {
184             assert(readCount == 0);
185             transactionToken.startReadTransaction(thread);
186             state |= ReadTransaction;
187             session.fireStartReadTransaction();
188             ++readCount;
189         } catch (Throwable e) {
190             throw new DatabaseException("Failed to start read transaction.", e);
191         } finally {
192             lock.unlock();
193         }
194     }
195     void stopReadTransaction() throws DatabaseException {
196         lock.lock();
197         try {
198             assert (!queryProvider.listening.hasScheduledUpdates());
199             assert (readCount == 1);
200             session.writeSupport.gc();
201             transactionToken.stopReadTransaction();
202             readCount = 0;
203             state &= All ^ ReadTransaction;
204             condition.signal();
205         } finally {
206             lock.unlock();
207         }
208     }
209
210     void startWriteTransaction(int thread)
211     throws DatabaseException {
212         lock.lock();
213         try {
214 //            System.out.println("stawt");
215             boolean closed = isClosed();
216             boolean closing = isClosing();
217             int ac = asyncCount;
218             int wc = writeCount;
219             if (closed || (closing && ac < 1) || wc < 0)
220                 throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);
221             if (writeCount == 0) {
222                 transactionToken.startWriteTransaction(thread);
223                 state |= WriteTransaction;
224                 writeOwner = Thread.currentThread();
225                 session.fireStartWriteTransaction();
226             }
227             ++writeCount;
228             assert(writeCount == 1);
229         } catch (Throwable e) {
230             throw new DatabaseException(e);
231         } finally {
232             lock.unlock();
233         }
234     }
235
236     void writeTransactionEnded() {
237         session.defaultClusterSet = null;
238         session.fireFinishWriteTransaction();
239     }
240
241     void stopWriteTransaction(ClusterStream clusterStream) {
242         if(!isAlive()) return;
243         lock.lock();
244         try {
245 //            System.out.println("stowt");
246             if (writeCount < 1)
247                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
248             else if (1 == writeCount) {
249                 boolean empty = session.clusterStream.reallyFlush();
250                 if (!empty) {
251                     String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"
252                             + "This is probably a serious error. Automatic cancel done as default recovery.";
253                     Logger.defaultLogInfo(msg);
254                     ClientChangesImpl cs = new ClientChangesImpl(session);
255                     SynchronizeContext context = new SynchronizeContext(session, cs, 1);
256                     transactionToken.cancelBegin(session.writeSupport, context, clusterStream);
257                 }
258                 transactionToken.stopWriteTransaction();
259                 state &= All ^ WriteTransaction;
260                 writeOwner = null;
261                 condition.signal();
262                 writeTransactionEnded();
263             }
264             --writeCount;
265         } catch (Throwable e) {
266             e.printStackTrace();
267             this.close(); // Everything is lost anyway.
268             throw new IllegalStateException(e);
269         } finally {
270             lock.unlock();
271         }
272     }
273
274     void cancelWriteTransaction(WriteGraphImpl graph) {
275         lock.lock();
276         try {
277             if (writeCount < 1)
278                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
279             VirtualGraph vg = graph.getProvider();
280             if (null == vg) { // Skip if virtual graph.
281                 boolean empty = session.clusterStream.reallyFlush();
282                 if (!empty) { // Something to cancel.
283                     // Send all local changes to server so it can calculate correct reverse change set.
284                     ClientChangesImpl cs = new ClientChangesImpl(session);
285                     SynchronizeContext context = new SynchronizeContext(session, cs, 1);
286                     // This will send cancel and fetch reverse change set from server.
287                     transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);
288                     try {
289                         final boolean undo = false;
290                         if (!context.isOk(undo)) // this is a blocking operation
291                             throw new InternalException("Cancel failed. This should never happen.");
292 //                        System.out.println("session cs: " + session.clientChanges);
293 //                        System.out.println("reverse cs: " + cs);
294                         queryProvider.propagateChangesInQueryCache(graph);
295                         queryProvider.listening.fireListeners(graph);
296                     } catch (DatabaseException e) {
297                         Logger.defaultLogError(e);
298                     }
299                     // This will send and accept the reverse change set.
300                     transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
301                 } else {
302                     queryProvider.propagateChangesInQueryCache(graph);
303                     queryProvider.listening.fireListeners(graph);
304                 }
305             }
306             session.writeSupport.clearMetadata();
307             if (--writeCount == 0) {
308                 transactionToken.stopWriteTransaction();
309                 state &= All ^ WriteTransaction;
310                 writeOwner = null;
311                 condition.signal();
312                 writeTransactionEnded();
313             }
314         } catch (Throwable e) {
315             e.printStackTrace();
316             this.close(); // Everything is lost anyway.
317             throw new IllegalStateException(e);
318         } finally {
319             lock.unlock();
320         }
321     }
322
323     void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {
324         assert(request != null);
325         graph.commitAccessorChanges(request);
326         boolean writeOnly = request instanceof WriteOnly;
327         lock.lock();
328         try {
329             VirtualGraph vg = graph.getProvider();
330             if (writeCount == 1) {
331
332                 if (vg != null && clusterStream.isDirty())
333                     new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();
334
335                 // This is needed even when the write targets a virtual graph -
336                 // deny can always remove a persistent statement. 
337                 clusterStream.reallyFlush();
338
339                 session.clientChanges = new ClientChangesImpl(session);
340
341
342 //                start = System.nanoTime();
343                 queryProvider.propagateChangesInQueryCache(graph);
344                 queryProvider.listening.fireListeners(graph);
345
346 //                ReadGraphImpl listenerGraph = graph.forRecompute(null);
347 //                listenerGraph.asyncBarrier.inc();
348 //                queryProvider.listening.fireListeners(listenerGraph);
349 //                listenerGraph.asyncBarrier.dec();
350                 
351 //                duration = System.nanoTime() - start;
352 //                System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
353
354                 // Process updates as long as pending primitives exist
355                 while (session.dirtyPrimitives) {
356                     session.dirtyPrimitives = false;
357                     queryProvider.propagateChangesInQueryCache(graph);
358                     queryProvider.listening.fireListeners(graph);
359                 }
360
361                 if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
362                     session.fireReactionsToCommit(graph, cs); // Does not throw exception!
363
364                 writeTransactionEnded();
365                 session.writeSupport.gc();
366             } else // Only the last writer can commit.
367                 throw new IllegalStateException(session + ": writeCount=" + writeCount);
368 //            long start = System.nanoTime();
369 //            System.err.println("commit");
370
371 //            start = System.nanoTime();
372             if (null == vg)
373                 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);
374             else {
375                 // There should not be meta data because this is transient graph.
376                 // This is an insurance that this is the case for now.
377                 int n = session.writeSupport.clearMetadata();
378                 if (n > 0)
379                     if (SessionImplSocket.DEBUG)
380                         System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);
381             }
382             transactionToken.stopWriteTransaction();
383 //            long duration = System.nanoTime() - start;
384 //            System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");
385
386
387 //            long duration = System.nanoTime() - start;
388 //            System.err.println("commit2 " + 1e-9*duration);
389             state &= All ^ WriteTransaction;
390             writeCount = 0;
391             writeOwner = null;
392             condition.signal();
393         } catch (Throwable e) {
394             Logger.defaultLogError(e);
395             this.close(); // Everything is lost anyway.
396             throw new IllegalStateException(e);
397         } finally {
398             Disposable.safeDispose(cs);
399             lock.unlock();
400         }
401     }
402
403     void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
404         VirtualGraph vg = graph.getProvider();
405         if (null != vg)
406             return;
407         lock.lock();
408         try {
409             clusterStream.reallyFlush();
410             transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
411             // See below for explanation, the same reasoning should apply here too.
412             if (graph.writeSupport instanceof WriteOnlySupport)
413                 graph.writeSupport.flushCluster();
414         } catch (Throwable e) {
415             Logger.defaultLogError(e);
416             this.close(); // Everything is lost anyway.
417             throw new IllegalStateException(e);
418         } finally {
419             lock.unlock();
420         }
421     }
422     void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {
423         VirtualGraph vg = graph.getProvider();
424         if (null != vg)
425             return;
426         lock.lock();
427         try {
428             transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);
429             transactionToken.setCombine(true);
430             // This was added because test Issue3199Test2 failed because
431             // write only cluster does not need cluster when allocating resource index
432             // and the first claim after commitAndContinue2 caused cluster to be loaded
433             // from server and of course it's resource index was off by one.
434             if (graph.writeSupport instanceof WriteOnlySupport)
435                 graph.writeSupport.flushCluster();
436         } catch (Throwable e) {
437             Logger.defaultLogError(e);
438             this.close(); // Everything is lost anyway.
439             throw new IllegalStateException(e);
440         } finally {
441             lock.unlock();
442         }
443     }
444     Operation getLastOperation() {
445         return transactionToken.getLastOperation();
446     }
447     long getHeadRevisionId() {
448         return transactionToken.getHeadRevisionId();
449     }
450 }