]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java
AsyncBarrier.dec runs into refcounting problem
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / graph / AsyncBarrierImpl.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.graph;
13
14 import java.util.Collection;
15 import java.util.concurrent.atomic.AtomicInteger;
16
17 import org.simantics.db.exception.RuntimeDatabaseException;
18 import org.simantics.db.impl.query.CacheEntry;
19 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
20 import org.slf4j.LoggerFactory;
21
22 public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
23
24     private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AsyncBarrierImpl.class);
25
26     private static final long serialVersionUID = 4724463372850048672L;
27
28     static final int WAIT_TIME = 60000;
29
30     public static final boolean PRINT = false;
31
32     final AsyncBarrierImpl caller;
33
34     private final Runnable callback;
35
36     private final boolean needsToBlock;
37
38     public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry, Runnable callback, boolean needsToBlock) {
39         super(0);
40         this.caller = caller;
41         this.callback = callback;
42         this.needsToBlock = needsToBlock;
43         if (BarrierTracing.BOOKKEEPING) {
44             BarrierTracing.trace(this, entry);
45         }
46     }
47
48     public AsyncBarrier getBlockingBarrier() {
49         if(needsToBlock)
50             return this;
51         if(caller == null)
52             return null;
53         else return caller.getBlockingBarrier();
54     }
55
56     @Override
57     public boolean isBlocking() {
58         return needsToBlock;
59     }
60
61     @Override
62     public void inc() {
63
64         if(BarrierTracing.BOOKKEEPING) {
65             BarrierTracing.inc(this);
66         } else {
67             inc(null, null);
68         }
69
70     }
71
72     void inc(Object id, String info) {
73
74         if(PRINT) {
75             System.err.println("inc barrier[" + get() + "] " + this);
76             StackTraceElement[] elems = new Exception().getStackTrace();
77             for(int i=0;i<4;i++) System.err.println(elems[i]);
78         }
79
80         if (incrementAndGet() == 1) {
81             if (caller != null) {
82                 if(BarrierTracing.BOOKKEEPING) {
83                     caller.inc(this, "Child");
84                 } else {
85                     caller.inc(null, null);
86                 }
87             }
88         }
89
90     }
91
92     @Override
93     public void dec() {
94
95         if(PRINT) {
96             System.err.println("dec barrier[" + get() + "] " + this);
97             StackTraceElement[] elems = new Exception().getStackTrace();
98             for(int i=0;i<3;i++) System.err.println(elems[i]);
99         }
100
101         int count = decrementAndGet();
102         if (count < 1) {
103             if(BarrierTracing.BOOKKEEPING) {
104                 BarrierTracing.dec(this, count);
105             }
106             if (count == 0) {
107                 if (caller != null) {
108                     caller.dec();
109                 }
110             }
111             if (count < 0) {
112                 LOGGER.error(
113                         "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
114                         new Exception());
115             }
116             assert (count >= 0);
117
118             if(callback != null)
119                 callback.run();
120
121         }
122
123     }
124
125     public static String report(AsyncBarrierImpl barrier) {
126         CacheEntry<?> e = BarrierTracing.entryMap.get(barrier);
127         if(e != null) return e.toString();
128         else return "Barrier@" + System.identityHashCode(barrier);
129     }
130
131     public static void printReverse(AsyncBarrierImpl barrier, int indent) {
132
133         if (barrier.get() == 0)
134             return;
135         for (int i = 0; i < indent; i++)
136             System.err.print(" ");
137         System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
138
139         Collection<AsyncBarrierImpl> children = BarrierTracing.reverseLookup.get(barrier);
140         if (children != null) {
141             for (AsyncBarrierImpl child : children)
142                 printReverse(child, indent + 2);
143         }
144         
145     }
146
147     public void waitBarrier(Object request, ReadGraphImpl impl) {
148
149         if (get() > 0) {
150
151             long waitCount = 0;
152
153             while (get() != 0) {
154
155                 boolean executed = impl.performPending();
156                 if(executed) waitCount = 0;
157
158                 ++waitCount;
159                 if(waitCount > 100) Thread.yield();
160                 if(waitCount > 1000) {
161                     try {
162                         Thread.sleep(1);
163                     } catch (InterruptedException e) {
164                         e.printStackTrace();
165                     }
166                 }
167                 if(waitCount > WAIT_TIME) {
168
169                     System.err.println("AsyncBarrierImpl.waitBarrier("
170                             + request
171                             + ") is taking long to execute, so far "
172                             + (waitCount / 1000) + " s.");
173
174                     if (BarrierTracing.BOOKKEEPING) {
175                         synchronized (BarrierTracing.reverseLookup) {
176                             printReverse(this, 0);
177                         }
178                         BarrierTracing.printBAPS();
179                     }
180
181                     throw new RuntimeDatabaseException("Request timed out.");
182
183                 }
184
185             }
186
187         }
188
189     }
190
191     public void restart() {
192         assertReady();
193         if(BarrierTracing.BOOKKEEPING) {
194             BarrierTracing.restart(this);
195         }
196     }
197
198     public void assertReady() {
199         int current = get();
200         if (current != 0)
201             throw new AssertionError("Barrier was not finished (pending="
202                     + current + ").");
203     }
204
205     public void report() {
206         // System.out.println("Barrier log:");
207         // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
208         // System.out.println(entry.getKey() + " " + entry.getValue());
209         // }
210         // System.out.println("SyncIntProcedure log:");
211         // for(Map.Entry<String, Integer> entry :
212         // SyncIntProcedure.counters.entrySet()) {
213         // System.out.println(entry.getKey() + " " + entry.getValue());
214         // }
215     }
216
217     @Override
218     public String toString() {
219         return report(this);
220         //              return "AsyncBarrierImpl@" + System.identityHashCode(this)
221         //                              + " - counter = " + get() + " - caller = " + caller;
222     }
223
224
225 }