]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java
637db3f26f8e119173fbf29cbe54b46ed9b17002
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / graph / AsyncBarrierImpl.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.graph;
13
14 import java.util.Collection;
15 import java.util.concurrent.atomic.AtomicInteger;
16
17 import org.simantics.db.common.utils.Logger;
18 import org.simantics.db.exception.RuntimeDatabaseException;
19 import org.simantics.db.impl.query.CacheEntry;
20 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
21
22 public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
23
24     private static final long serialVersionUID = 4724463372850048672L;
25
26     static final int WAIT_TIME = 60000;
27
28     public static final boolean PRINT = false;
29
30     final AsyncBarrierImpl caller;
31
32     private final Runnable callback;
33
34     private final boolean needsToBlock;
35
36     public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry, Runnable callback, boolean needsToBlock) {
37         super(0);
38         this.caller = caller;
39         this.callback = callback;
40         this.needsToBlock = needsToBlock;
41         if (BarrierTracing.BOOKKEEPING) {
42             BarrierTracing.trace(this, entry);
43         }
44     }
45
46     public AsyncBarrier getBlockingBarrier() {
47         if(needsToBlock)
48             return this;
49         if(caller == null)
50             return null;
51         else return caller.getBlockingBarrier();
52     }
53
54     @Override
55     public boolean isBlocking() {
56         return needsToBlock;
57     }
58
59     @Override
60     public void inc() {
61
62         if(BarrierTracing.BOOKKEEPING) {
63             BarrierTracing.inc(this);
64         } else {
65             inc(null, null);
66         }
67
68     }
69
70     void inc(Object id, String info) {
71
72         if(PRINT) {
73             System.err.println("inc barrier[" + get() + "] " + this);
74             StackTraceElement[] elems = new Exception().getStackTrace();
75             for(int i=0;i<4;i++) System.err.println(elems[i]);
76         }
77
78         if (incrementAndGet() == 1) {
79             if (caller != null) {
80                 if(BarrierTracing.BOOKKEEPING) {
81                     caller.inc(this, "Child");
82                 } else {
83                     caller.inc(null, null);
84                 }
85             }
86         }
87
88     }
89
90     @Override
91     public void dec() {
92
93         if(PRINT) {
94             System.err.println("dec barrier[" + get() + "] " + this);
95             StackTraceElement[] elems = new Exception().getStackTrace();
96             for(int i=0;i<3;i++) System.err.println(elems[i]);
97         }
98
99         int count = decrementAndGet();
100         if (count < 1) {
101             if(BarrierTracing.BOOKKEEPING) {
102                 BarrierTracing.dec(this, count);
103             }
104             if (count == 0) {
105                 if (caller != null) {
106                     caller.dec();
107                 }
108             }
109             if (count < 0) {
110                 Logger.defaultLogError(
111                         "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
112                         new Exception());
113             }
114             assert (count >= 0);
115
116             if(callback != null)
117                 callback.run();
118
119         }
120
121     }
122
123     public static String report(AsyncBarrierImpl barrier) {
124         CacheEntry<?> e = BarrierTracing.entryMap.get(barrier);
125         if(e != null) return e.toString();
126         else return "Barrier@" + System.identityHashCode(barrier);
127     }
128
129     public static void printReverse(AsyncBarrierImpl barrier, int indent) {
130
131         if (barrier.get() == 0)
132             return;
133         for (int i = 0; i < indent; i++)
134             System.err.print(" ");
135         System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
136
137         Collection<AsyncBarrierImpl> children = BarrierTracing.reverseLookup.get(barrier);
138         if (children != null) {
139             for (AsyncBarrierImpl child : children)
140                 printReverse(child, indent + 2);
141         }
142         
143     }
144
145     public void waitBarrier(Object request, ReadGraphImpl impl) {
146
147         if (get() > 0) {
148
149             long waitCount = 0;
150
151             while (get() != 0) {
152
153                 boolean executed = impl.performPending();
154                 if(executed) waitCount = 0;
155
156                 ++waitCount;
157                 if(waitCount > 100) Thread.yield();
158                 if(waitCount > 1000) {
159                     try {
160                         Thread.sleep(1);
161                     } catch (InterruptedException e) {
162                         e.printStackTrace();
163                     }
164                 }
165                 if(waitCount > WAIT_TIME) {
166
167                     System.err.println("AsyncBarrierImpl.waitBarrier("
168                             + request
169                             + ") is taking long to execute, so far "
170                             + (waitCount / 1000) + " s.");
171
172                     if (BarrierTracing.BOOKKEEPING) {
173                         synchronized (BarrierTracing.reverseLookup) {
174                             printReverse(this, 0);
175                         }
176                         BarrierTracing.printBAPS();
177                     }
178
179                     throw new RuntimeDatabaseException("Request timed out.");
180
181                 }
182
183             }
184
185         }
186
187     }
188
189     public void restart() {
190         assertReady();
191         if(BarrierTracing.BOOKKEEPING) {
192             BarrierTracing.restart(this);
193         }
194     }
195
196     public void assertReady() {
197         int current = get();
198         if (current != 0)
199             throw new AssertionError("Barrier was not finished (pending="
200                     + current + ").");
201     }
202
203     public void report() {
204         // System.out.println("Barrier log:");
205         // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
206         // System.out.println(entry.getKey() + " " + entry.getValue());
207         // }
208         // System.out.println("SyncIntProcedure log:");
209         // for(Map.Entry<String, Integer> entry :
210         // SyncIntProcedure.counters.entrySet()) {
211         // System.out.println(entry.getKey() + " " + entry.getValue());
212         // }
213     }
214
215     @Override
216     public String toString() {
217         return report(this);
218         //              return "AsyncBarrierImpl@" + System.identityHashCode(this)
219         //                              + " - counter = " + get() + " - caller = " + caller;
220     }
221
222
223 }