]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java
Merge commit 'bd5bc6e45f700e755b61bd112631796631330ecb'
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / State.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package fi.vtt.simantics.procore.internal;\r
13 \r
14 import java.util.concurrent.TimeUnit;\r
15 import java.util.concurrent.locks.Condition;\r
16 import java.util.concurrent.locks.Lock;\r
17 import java.util.concurrent.locks.ReentrantLock;\r
18 \r
19 import org.simantics.db.ChangeSet;\r
20 import org.simantics.db.Operation;\r
21 import org.simantics.db.VirtualGraph;\r
22 import org.simantics.db.common.utils.Logger;\r
23 import org.simantics.db.exception.DatabaseException;\r
24 import org.simantics.db.exception.InternalException;\r
25 import org.simantics.db.exception.RuntimeDatabaseException;\r
26 import org.simantics.db.impl.graph.WriteGraphImpl;\r
27 import org.simantics.db.impl.query.QueryProcessor;\r
28 import org.simantics.db.request.WriteOnly;\r
29 import org.simantics.db.request.WriteTraits;\r
30 import org.simantics.db.service.LifecycleSupport.LifecycleState;\r
31 import org.simantics.db.service.TransactionPolicySupport;\r
32 \r
33 import fi.vtt.simantics.procore.internal.SessionImplSocket.WriteOnlySupport;\r
34 \r
35 class State {\r
36     static final int          Closed           = 1;\r
37     static final int          Closing          = 2;\r
38     static final int          WriteTransaction = 4;\r
39     static final int          ReadTransaction  = 8;\r
40     static final int          All              = 15;\r
41     private volatile int state = 0; // Must be volatile so we don't have to synchronize getState.\r
42     private SessionImplSocket session          = null;\r
43     private GraphSession      graphSession     = null;\r
44     private QueryProcessor    queryProvider    = null;\r
45     private Lock              lock             = new ReentrantLock();\r
46     private Condition condition = lock.newCondition(); // Tried this with tree conditions but it wasn't worth the effort.\r
47     private volatile int readCount = 0; // Must be volatile so we don't have to synchronize getReadCount.\r
48     private volatile int writeCount = 0; // Must be volatile so we don't have to synchronize getWriteCount.\r
49     private Thread            writeOwner       = null;\r
50     private int               asyncCount       = 1;\r
51     private TransactionToken  transactionToken = null;\r
52     void setCombine(boolean a) {\r
53         if (null != transactionToken)\r
54             transactionToken.setCombine(a);\r
55     }\r
56     void setGraphSession(SessionImplSocket session, GraphSession graphSession, QueryProcessor queryProvider, ClusterTable clusterTable) {\r
57         this.session = session;\r
58         this.graphSession = graphSession;\r
59         this.queryProvider = queryProvider;\r
60         resetTransactionPolicy();\r
61     }\r
62     void resetTransactionPolicy() {\r
63         TransactionPolicySupport tps = session.getService(TransactionPolicySupport.class);\r
64         this.transactionToken = new TransactionToken(tps, session, graphSession, this.transactionToken);\r
65     }\r
66     int getAsyncCount() {\r
67         return asyncCount;\r
68     }\r
69     int getReadCount() {\r
70         return readCount;\r
71     }\r
72     int getWriteCount() {\r
73         return writeCount;\r
74     }\r
75     boolean isWriteTransaction() {\r
76         return 0 != (state & WriteTransaction);\r
77     }\r
78     boolean isAlive() {\r
79         return !isClosed() && !isClosing();\r
80     }\r
81     boolean isClosed() {\r
82         return 0 != (state & Closed);\r
83     }\r
84     boolean isClosing() {\r
85         return 0 != (state & Closing);\r
86     }\r
87     boolean isWriting() {\r
88         return 0 != (state & WriteTransaction);\r
89     }\r
90     void close() {\r
91         boolean acquired = false;\r
92         try {\r
93             acquired = lock.tryLock(1, TimeUnit.MINUTES);\r
94         } catch (InterruptedException e1) {\r
95         }\r
96         try {\r
97             if (null != transactionToken)\r
98                 transactionToken.close();\r
99             graphSession.stop();\r
100         } catch (DatabaseException e) {\r
101         } finally {\r
102             state = Closed;\r
103             session.lifecycleSupport.fireListeners(LifecycleState.CLOSED);\r
104             if (acquired) {\r
105                 condition.signalAll();\r
106                 lock.unlock();\r
107             }\r
108         }\r
109     }\r
110     void incAsync()\r
111     {\r
112         lock.lock();\r
113         try {\r
114             boolean closed = isClosed();\r
115             boolean closing = isClosing();\r
116             if (closed || closing)\r
117                 throw new RuntimeDatabaseException(session + ": closed=" + closed + ", closing=" + closing);\r
118             ++asyncCount;\r
119         } finally {\r
120             lock.unlock();\r
121         }\r
122     }\r
123     void decAsync()\r
124     {\r
125         lock.lock();\r
126         try {\r
127             if (asyncCount < 1)\r
128                 throw new RuntimeDatabaseException(session + ": asyncCount=" + asyncCount);\r
129             if (0 == --asyncCount) {\r
130                 condition.signal();\r
131             }\r
132         } finally {\r
133             lock.unlock();\r
134         }\r
135     }\r
136     enum WaitStatus { IsClosed, IsClosing, CanBeClosed, Timeout, Deadlock }\r
137     WaitStatus waitClosing(long timeout) {\r
138         lock.lock();\r
139         try {\r
140             if (isClosed())\r
141                 return WaitStatus.IsClosed;\r
142             else if (isClosing())\r
143                 return WaitStatus.IsClosing;\r
144             state |= Closing;\r
145             try {\r
146                 if (timeout < 0) { // Infinite timeout\r
147                     for (int i=0; i<100; i++)\r
148                     while (asyncCount > 0  || readCount > 0 ||\r
149                            (null != writeOwner && Thread.currentThread() != writeOwner)) {\r
150                         condition.signalAll();\r
151                         condition.awaitNanos(10000000); // nanos\r
152                     }\r
153                     while (asyncCount > 0  || readCount > 0 ||\r
154                             (null != writeOwner && Thread.currentThread() != writeOwner)) {\r
155                          condition.signalAll();\r
156                          condition.await();\r
157                      }\r
158                 } else if (timeout > 0) {\r
159                     boolean interrupted = false;\r
160                     while (!interrupted &&\r
161                            (asyncCount > 0  || readCount > 0 ||\r
162                             (null != writeOwner && Thread.currentThread() != writeOwner)))\r
163                         interrupted = !condition.await(timeout, TimeUnit.MILLISECONDS);\r
164                 }\r
165                 transactionToken.closing();\r
166             } catch (Exception e) {\r
167                 e.printStackTrace();\r
168             }\r
169             state &= All ^ Closing;\r
170             if (readCount == 0 && writeCount == 0 && asyncCount == 0)\r
171                 return WaitStatus.CanBeClosed;\r
172             else if (null != writeOwner && Thread.currentThread() == writeOwner)\r
173                 return WaitStatus.Deadlock;\r
174             return WaitStatus.Timeout;\r
175         } finally {\r
176             lock.unlock();\r
177         }\r
178     }\r
179     void startReadTransaction(int thread) throws DatabaseException {\r
180         lock.lock();\r
181         try {\r
182             assert(readCount == 0);\r
183             transactionToken.startReadTransaction(thread);\r
184             state |= ReadTransaction;\r
185             session.fireStartReadTransaction();\r
186             ++readCount;\r
187         } catch (Throwable e) {\r
188             throw new DatabaseException("Failed to start read transaction.", e);\r
189         } finally {\r
190             lock.unlock();\r
191         }\r
192     }\r
193     void stopReadTransaction() throws DatabaseException {\r
194         lock.lock();\r
195         try {\r
196             assert (!queryProvider.hasScheduledUpdates());\r
197             assert (readCount == 1);\r
198             session.writeSupport.gc();\r
199             transactionToken.stopReadTransaction();\r
200             readCount = 0;\r
201             state &= All ^ ReadTransaction;\r
202             condition.signal();\r
203         } finally {\r
204             lock.unlock();\r
205         }\r
206     }\r
207 \r
208     void startWriteTransaction(int thread)\r
209     throws DatabaseException {\r
210         lock.lock();\r
211         try {\r
212 //            System.out.println("stawt");\r
213             boolean closed = isClosed();\r
214             boolean closing = isClosing();\r
215             int ac = asyncCount;\r
216             int wc = writeCount;\r
217             if (closed || (closing && ac < 1) || wc < 0)\r
218                 throw new DatabaseException(session + ": closed=" + closed + ", closing=" + closing + ", asyncCount=" + asyncCount + ", writeCount=" + writeCount);\r
219             if (writeCount == 0) {\r
220                 transactionToken.startWriteTransaction(thread);\r
221                 state |= WriteTransaction;\r
222                 writeOwner = Thread.currentThread();\r
223                 session.fireStartWriteTransaction();\r
224             }\r
225             ++writeCount;\r
226             assert(writeCount == 1);\r
227         } catch (Throwable e) {\r
228             throw new DatabaseException(e);\r
229         } finally {\r
230             lock.unlock();\r
231         }\r
232     }\r
233 \r
234     void writeTransactionEnded() {\r
235         session.defaultClusterSet = null;\r
236         session.fireFinishWriteTransaction();\r
237     }\r
238 \r
239     void stopWriteTransaction(ClusterStream clusterStream) {\r
240         if(!isAlive()) return;\r
241         lock.lock();\r
242         try {\r
243 //            System.out.println("stowt");\r
244             if (writeCount < 1)\r
245                 throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
246             else if (1 == writeCount) {\r
247                 boolean empty = session.clusterStream.reallyFlush();\r
248                 if (!empty) {\r
249                     String msg = "We have modified graph (on server) without accept/cancel acknowledgment.\n"\r
250                             + "This is probably a serious error. Automatic cancel done as default recovery.";\r
251                     Logger.defaultLogInfo(msg);\r
252                     ClientChangesImpl cs = new ClientChangesImpl(session);\r
253                     SynchronizeContext context = new SynchronizeContext(session, cs, 1);\r
254                     transactionToken.cancelBegin(session.writeSupport, context, clusterStream);\r
255                 }\r
256                 transactionToken.stopWriteTransaction();\r
257                 state &= All ^ WriteTransaction;\r
258                 writeOwner = null;\r
259                 condition.signal();\r
260                 writeTransactionEnded();\r
261             }\r
262             --writeCount;\r
263         } catch (Throwable e) {\r
264             e.printStackTrace();\r
265             this.close(); // Everything is lost anyway.\r
266             throw new IllegalStateException(e);\r
267         } finally {\r
268             lock.unlock();\r
269         }\r
270     }\r
271 \r
272     void cancelWriteTransaction(WriteGraphImpl graph) {\r
273         lock.lock();\r
274         try {\r
275             if (writeCount < 1)\r
276                 throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
277             VirtualGraph vg = graph.getProvider();\r
278             if (null == vg) { // Skip if virtual graph.\r
279                 boolean empty = session.clusterStream.reallyFlush();\r
280                 if (!empty) { // Something to cancel.\r
281                     // Send all local changes to server so it can calculate correct reverse change set.\r
282                     ClientChangesImpl cs = new ClientChangesImpl(session);\r
283                     SynchronizeContext context = new SynchronizeContext(session, cs, 1);\r
284                     // This will send cancel and fetch reverse change set from server.\r
285                     transactionToken.cancelBegin(session.writeSupport, context, session.clusterStream);\r
286                     try {\r
287                         final boolean undo = false;\r
288                         if (!context.isOk(undo)) // this is a blocking operation\r
289                             throw new InternalException("Cancel failed. This should never happen. Contact application support.");\r
290 //                        System.out.println("session cs: " + session.clientChanges);\r
291 //                        System.out.println("reverse cs: " + cs);\r
292                         queryProvider.performDirtyUpdates(graph);\r
293                         queryProvider.performScheduledUpdates(graph);\r
294                     } catch (DatabaseException e) {\r
295                         Logger.defaultLogError(e);\r
296                     }\r
297                     // This will send and accept the reverse change set.\r
298                     transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);\r
299                 }\r
300             }\r
301             session.writeSupport.clearMetadata();\r
302             if (--writeCount == 0) {\r
303                 transactionToken.stopWriteTransaction();\r
304                 state &= All ^ WriteTransaction;\r
305                 writeOwner = null;\r
306                 condition.signal();\r
307                 writeTransactionEnded();\r
308             }\r
309         } catch (Throwable e) {\r
310             e.printStackTrace();\r
311             this.close(); // Everything is lost anyway.\r
312             throw new IllegalStateException(e);\r
313         } finally {\r
314             lock.unlock();\r
315         }\r
316     }\r
317 \r
318     void commitWriteTransaction(WriteGraphImpl graph, ClusterStream clusterStream, ChangeSet cs, WriteTraits request, Operation op) {\r
319         assert(request != null);\r
320         graph.commitAccessorChanges(request);\r
321         boolean writeOnly = request instanceof WriteOnly;\r
322         lock.lock();\r
323         try {\r
324             VirtualGraph vg = graph.getProvider();\r
325             if (writeCount == 1) {\r
326 \r
327                 if (vg != null && clusterStream.isDirty())\r
328                     new Exception("Internal error: virtual transaction committed changes into core (" + request + ")").printStackTrace();\r
329 \r
330 //                long start = System.nanoTime();\r
331                 if (null == vg) {\r
332                     clusterStream.reallyFlush();\r
333                     // This was fired just before in handleUpdatesAndMetadata\r
334 //                    if (!writeOnly)\r
335 //                        session.fireMetadataListeners(graph, cs);\r
336                 } else\r
337                     clusterStream.clear();\r
338 \r
339 //                long duration = System.nanoTime() - start;\r
340 //                System.out.println("reallyFlush " + 1e-9*duration + "s. ");\r
341 \r
342                 session.clientChanges = new ClientChangesImpl(session);\r
343 \r
344 \r
345 //                start = System.nanoTime();\r
346                 queryProvider.performScheduledUpdates(graph);\r
347 //                duration = System.nanoTime() - start;\r
348 //                System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");\r
349 \r
350                 // Process updates as long as pending primitives exist\r
351                 while (session.dirtyPrimitives) {\r
352                     session.dirtyPrimitives = false;\r
353                     queryProvider.performDirtyUpdates(graph);\r
354                     queryProvider.performScheduledUpdates(graph);\r
355                 }\r
356 \r
357                 if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!\r
358                     session.fireReactionsToCommit(graph, cs); // Does not throw exception!\r
359 \r
360                 writeTransactionEnded();\r
361                 session.writeSupport.gc();\r
362             } else // Only the last writer can commit.\r
363                 throw new IllegalStateException(session + ": writeCount=" + writeCount);\r
364 //            long start = System.nanoTime();\r
365 //            System.err.println("commit");\r
366 \r
367 //            start = System.nanoTime();\r
368             if (null == vg)\r
369                 transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, op);\r
370             else {\r
371                 // There should not be meta data because this is transient graph.\r
372                 // This is an insurance that this is the case for now.\r
373                 int n = session.writeSupport.clearMetadata();\r
374                 if (n > 0)\r
375                     if (SessionImplSocket.DEBUG)\r
376                         System.out.println("DEBUG: Transient graph has accumulated metadata. Size=" + n);\r
377             }\r
378             transactionToken.stopWriteTransaction();\r
379 //            long duration = System.nanoTime() - start;\r
380 //            System.out.println("transactionToken.commitWriteTransaction " + 1e-9*duration + "s. ");\r
381 \r
382 \r
383 //            long duration = System.nanoTime() - start;\r
384 //            System.err.println("commit2 " + 1e-9*duration);\r
385             state &= All ^ WriteTransaction;\r
386             writeCount = 0;\r
387             writeOwner = null;\r
388             condition.signal();\r
389         } catch (Throwable e) {\r
390             Logger.defaultLogError(e);\r
391             this.close(); // Everything is lost anyway.\r
392             throw new IllegalStateException(e);\r
393         } finally {\r
394             lock.unlock();\r
395         }\r
396     }\r
397 \r
398     void commitAndContinue(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {\r
399         VirtualGraph vg = graph.getProvider();\r
400         if (null != vg)\r
401             return;\r
402         lock.lock();\r
403         try {\r
404             clusterStream.reallyFlush();\r
405             transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);\r
406             // See below for explanation, the same reasoning should apply here too.\r
407             if (graph.writeSupport instanceof WriteOnlySupport)\r
408                 graph.writeSupport.flushCluster();\r
409         } catch (Throwable e) {\r
410             Logger.defaultLogError(e);\r
411             this.close(); // Everything is lost anyway.\r
412             throw new IllegalStateException(e);\r
413         } finally {\r
414             lock.unlock();\r
415         }\r
416     }\r
417     void commitAndContinue2(WriteGraphImpl graph, ClusterStream clusterStream, WriteTraits request) {\r
418         VirtualGraph vg = graph.getProvider();\r
419         if (null != vg)\r
420             return;\r
421         lock.lock();\r
422         try {\r
423             transactionToken.commitWriteTransaction(graph.writeSupport, request, clusterStream, null);\r
424             transactionToken.setCombine(true);\r
425             // This was added because test Issue3199Test2 failed because\r
426             // write only cluster does not need cluster when allocating resource index\r
427             // and the first claim after commitAndContinue2 caused cluster to be loaded\r
428             // from server and of course it's resource index was off by one.\r
429             if (graph.writeSupport instanceof WriteOnlySupport)\r
430                 graph.writeSupport.flushCluster();\r
431         } catch (Throwable e) {\r
432             Logger.defaultLogError(e);\r
433             this.close(); // Everything is lost anyway.\r
434             throw new IllegalStateException(e);\r
435         } finally {\r
436             lock.unlock();\r
437         }\r
438     }\r
439     Operation getLastOperation() {\r
440         return transactionToken.getLastOperation();\r
441     }\r
442     long getHeadRevisionId() {\r
443         return transactionToken.getHeadRevisionId();\r
444     }\r
445 }\r