af0604ae6c6abe6d3dee3a573509b4a304471b54
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / graph / AsyncBarrierImpl.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.graph;
13
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.concurrent.atomic.AtomicInteger;
19
20 import org.simantics.db.common.utils.Logger;
21 import org.simantics.db.exception.RuntimeDatabaseException;
22 import org.simantics.db.impl.query.CacheEntry;
23 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
24
25 final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
26
27         private static final long serialVersionUID = 4724463372850048672L;
28
29         static final HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>> reverseLookup = new HashMap<>();
30         public static final HashMap<AsyncBarrierImpl, Debugger> debuggerMap = new HashMap<>();
31         static final HashMap<AsyncBarrierImpl, CacheEntry<?>> entryMap = new HashMap<>();
32         static final HashMap<AsyncBarrierImpl, Boolean> restartMap = new HashMap<>();
33
34         static final int WAIT_TIME = 600;
35
36         public static final boolean BOOKKEEPING = false;
37         public static final boolean PRINT = false;
38         static final boolean RESTART_GUARD = true;
39
40         final public AsyncBarrierImpl caller;
41
42         //private final Semaphore sema = new Semaphore(0);
43
44         public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry) {
45                 super(0);
46                 if (BOOKKEEPING) {
47                         synchronized (entryMap) {
48                                 entryMap.put(this, entry);
49                         }
50                         synchronized (debuggerMap) {
51                                 debuggerMap.put(this, new Debugger());
52                         }
53                         synchronized (reverseLookup) {
54                                 Collection<AsyncBarrierImpl> barriers = reverseLookup
55                                                 .get(caller);
56                                 if (barriers == null) {
57                                         barriers = new ArrayList<AsyncBarrierImpl>();
58                                         reverseLookup.put(caller, barriers);
59                                 }
60                                 barriers.add(this);
61                         }
62                 }
63                 this.caller = caller;
64         }
65
66         public class Debugger {
67                 
68                 public Map<AsyncBarrierImpl, String> infos = new HashMap<>();
69
70                 public synchronized void inc(AsyncBarrierImpl id, String info) {
71                         if (id == null)
72                                 return;
73                         String exist = infos.get(id);
74                         if (exist != null)
75                                 throw new IllegalStateException("Already existing info " + id + " " + info);
76                         infos.put(id, exist);
77                 }
78
79                 public synchronized void dec(AsyncBarrierImpl id) {
80                         if (id == null)
81                                 return;
82                         String exist = infos.get(id);
83                         if (exist == null) {
84                                 System.err.println("No data for " + id);
85                         } else {
86                                 infos.remove(id);
87                         }
88                 }
89
90                 @Override
91                 public synchronized String toString() {
92                         StringBuilder b = new StringBuilder();
93                         for (String s : infos.values()) {
94                                 b.append("info " + s + "\r\n");
95                         }
96                         return b.toString();
97                 }
98                 
99                 public boolean isEmpty() {
100                         return infos.isEmpty();
101                 }
102
103         }
104
105         public void inc() {
106
107                 if (BOOKKEEPING)
108                         inc(this, new Exception().getStackTrace()[2].toString());
109                 else
110                         inc(null, null);
111                 
112                 if (RESTART_GUARD)
113                         if(restartMap.containsKey(this))
114                                 throw new IllegalStateException("Unplanned restart");
115
116         }
117
118         private void inc(Object id, String info) {
119
120                 //              if (PRINT) {
121                 //                      if (get() < 5)
122                 //                              new Exception("inc " + get() + " " + this).printStackTrace();
123                 //              }
124
125                 if (BOOKKEEPING) {
126 //                      Debugger debugger = debuggerMap.get(this);
127 //                      if (debugger != null)
128 //                              debugger.inc(id, info);
129                 }
130
131                 if(PRINT) {
132
133                         System.err.println("inc barrier[" + get() + "] " + this);
134                         StackTraceElement[] elems = new Exception().getStackTrace();
135                         for(int i=0;i<4;i++) System.err.println(elems[i]);
136
137                 }
138
139                 if (incrementAndGet() == 1) {
140                         if (caller != null) {
141                                 if (BOOKKEEPING)
142                                         caller.inc(this, "Child");
143                                 else
144                                         caller.inc(null, null);
145                         }
146                 }
147
148         }
149
150         public void dec() {
151
152                 if(PRINT) {
153                         System.err.println("dec barrier[" + get() + "] " + this);
154                         StackTraceElement[] elems = new Exception().getStackTrace();
155                         for(int i=0;i<3;i++) System.err.println(elems[i]);
156                 }
157
158                 int count = decrementAndGet();
159                 if (count < 1) {
160                         if (count == 0) {
161                                 if (caller != null)
162                                         caller.dec(this);
163                                 //                              sema.release();
164                         }
165                         if (count < 0) {
166                                 Logger.defaultLogError(
167                                                 "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
168                                                 new Exception());
169                         }
170                         assert (count >= 0);
171                 }
172
173         }
174
175         public void dec(Object id) {
176
177                 if (PRINT) {
178                         if (get() < 5)
179                                 new Exception("dec" + get() + " " + this).printStackTrace();
180                 }
181
182                 if (BOOKKEEPING) {
183 //                      Debugger debugger = debuggerMap.get(this);
184 //                      if (debugger != null) {
185 //                              debugger.dec(id);
186 //                              if(debugger.isEmpty())
187 //                                      debuggerMap.remove(this);
188 //                      }
189                 }
190
191                 int count = decrementAndGet();
192                 if (count < 1) {
193                         if (count == 0) {
194                                 debuggerMap.remove(this);
195                                 if (caller != null)
196                                         caller.dec(this);
197                                 if (RESTART_GUARD)
198                                         restartMap.put(this, true);
199                         }
200                         if (count < 0) {
201                                 Logger.defaultLogError(
202                                                 "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
203                                                 new Exception());
204                                 System.exit(-1);
205                         }
206                         assert (count >= 0);
207                 }
208         }
209
210         public static String report(AsyncBarrierImpl barrier) {
211                 CacheEntry<?> e = entryMap.get(barrier);
212                 if(e != null) return e.toString();
213                 else return "Barrier@" + System.identityHashCode(barrier);
214         }
215         
216         public static void printReverse(AsyncBarrierImpl barrier, int indent) {
217
218                 if (barrier.get() == 0)
219                         return;
220                 for (int i = 0; i < indent; i++)
221                         System.err.print(" ");
222                 System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
223 //              if (BOOKKEEPING) {
224 //                      Debugger debugger = debuggerMap.get(barrier);
225 //                      debugger.toErr(indent + 2);
226 //              }
227
228                 Collection<AsyncBarrierImpl> children = reverseLookup.get(barrier);
229                 if (children != null) {
230                         for (AsyncBarrierImpl child : children)
231                                 printReverse(child, indent + 2);
232                 }
233
234         }
235
236         public void waitBarrier(Object request, ReadGraphImpl impl) {
237
238                 if (get() > 0) {
239
240                         long waitCount = 0;
241
242                         while (get() != 0) {
243
244                                 boolean executed = impl.performPending();
245                                 if(executed) waitCount = 0;
246                                 
247                                 ++waitCount;
248                                 if(waitCount > 100) Thread.yield();
249                                 if(waitCount > 1000) {
250                                         try {
251                                                 Thread.sleep(1);
252                                         } catch (InterruptedException e) {
253                                                 e.printStackTrace();
254                                         }
255                                 }
256                                 if(waitCount > WAIT_TIME*1000) {
257
258                                         System.err.println("AsyncBarrierImpl.waitBarrier("
259                                                         + request
260                                                         + ") is taking long to execute, so far "
261                                                         + (waitCount / 1000) + " s.");
262
263                                         if (BOOKKEEPING) {
264
265                                                 synchronized (reverseLookup) {
266                                                         printReverse(this, 0);
267                                                 }
268
269                                         }
270
271 //                                      if(Development.DEVELOPMENT) {
272 //                                              impl.processor.threadLocks[0].lock();
273 //                                              System.err.println("-queues=" + impl.processor.queues[0].size());
274 //                                              impl.processor.threadLocks[0].unlock();
275 //                                              System.err.println("-own=" + impl.processor.ownTasks[0].size());
276 //                                              System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size());
277 //                                              for(SessionTask task : impl.processor.ownSyncTasks[0]) {
278 //                                                      System.err.println("--" + task);
279 //                                              }
280 //                                      }
281
282                                         throw new RuntimeDatabaseException("Request timed out.");
283                                         //waitCount = 0;
284
285                                 }
286
287                         }
288
289                 }
290
291         }
292
293         public void restart() {
294                 assertReady();
295                 // log.clear();
296                 //              sema.drainPermits();
297                 if (RESTART_GUARD)
298                         restartMap.remove(this);
299                 if (BOOKKEEPING)
300                         debuggerMap.put(this, new Debugger());
301         }
302
303         public void assertReady() {
304                 int current = get();
305                 if (current != 0)
306                         throw new AssertionError("Barrier was not finished (pending="
307                                         + current + ").");
308         }
309
310         public void report() {
311                 // System.out.println("Barrier log:");
312                 // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
313                 // System.out.println(entry.getKey() + " " + entry.getValue());
314                 // }
315                 // System.out.println("SyncIntProcedure log:");
316                 // for(Map.Entry<String, Integer> entry :
317                 // SyncIntProcedure.counters.entrySet()) {
318                 // System.out.println(entry.getKey() + " " + entry.getValue());
319                 // }
320         }
321
322         @Override
323         public String toString() {
324                 return report(this);
325 //              return "AsyncBarrierImpl@" + System.identityHashCode(this)
326 //                              + " - counter = " + get() + " - caller = " + caller;
327         }
328
329 }