1 /*******************************************************************************
2 * Copyright (c) 2007, 2018 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.graph;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.HashMap;
18 import java.util.concurrent.atomic.AtomicInteger;
20 import org.simantics.db.common.utils.Logger;
21 import org.simantics.db.exception.RuntimeDatabaseException;
22 import org.simantics.db.impl.query.CacheEntry;
23 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
25 final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
27 private static final long serialVersionUID = 4724463372850048672L;
29 static final HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>> reverseLookup = new HashMap<>();
30 public static final HashMap<AsyncBarrierImpl, Debugger> debuggerMap = new HashMap<>();
31 static final HashMap<AsyncBarrierImpl, CacheEntry<?>> entryMap = new HashMap<>();
32 static final HashMap<AsyncBarrierImpl, Boolean> restartMap = new HashMap<>();
34 static final int WAIT_TIME = 600;
36 public static final boolean BOOKKEEPING = false;
37 public static final boolean PRINT = false;
38 static final boolean RESTART_GUARD = true;
40 final public AsyncBarrierImpl caller;
42 //private final Semaphore sema = new Semaphore(0);
44 public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry) {
47 synchronized (entryMap) {
48 entryMap.put(this, entry);
50 synchronized (debuggerMap) {
51 debuggerMap.put(this, new Debugger());
53 synchronized (reverseLookup) {
54 Collection<AsyncBarrierImpl> barriers = reverseLookup
56 if (barriers == null) {
57 barriers = new ArrayList<AsyncBarrierImpl>();
58 reverseLookup.put(caller, barriers);
66 public class Debugger {
68 public Map<AsyncBarrierImpl, String> infos = new HashMap<>();
70 public synchronized void inc(AsyncBarrierImpl id, String info) {
73 String exist = infos.get(id);
75 throw new IllegalStateException("Already existing info " + id + " " + info);
79 public synchronized void dec(AsyncBarrierImpl id) {
82 String exist = infos.get(id);
84 System.err.println("No data for " + id);
91 public synchronized String toString() {
92 StringBuilder b = new StringBuilder();
93 for (String s : infos.values()) {
94 b.append("info " + s + "\r\n");
99 public boolean isEmpty() {
100 return infos.isEmpty();
108 inc(this, new Exception().getStackTrace()[2].toString());
113 if(restartMap.containsKey(this))
114 throw new IllegalStateException("Unplanned restart");
118 private void inc(Object id, String info) {
122 // new Exception("inc " + get() + " " + this).printStackTrace();
126 // Debugger debugger = debuggerMap.get(this);
127 // if (debugger != null)
128 // debugger.inc(id, info);
133 System.err.println("inc barrier[" + get() + "] " + this);
134 StackTraceElement[] elems = new Exception().getStackTrace();
135 for(int i=0;i<4;i++) System.err.println(elems[i]);
139 if (incrementAndGet() == 1) {
140 if (caller != null) {
142 caller.inc(this, "Child");
144 caller.inc(null, null);
153 System.err.println("dec barrier[" + get() + "] " + this);
154 StackTraceElement[] elems = new Exception().getStackTrace();
155 for(int i=0;i<3;i++) System.err.println(elems[i]);
158 int count = decrementAndGet();
166 Logger.defaultLogError(
167 "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
175 public void dec(Object id) {
179 new Exception("dec" + get() + " " + this).printStackTrace();
183 // Debugger debugger = debuggerMap.get(this);
184 // if (debugger != null) {
186 // if(debugger.isEmpty())
187 // debuggerMap.remove(this);
191 int count = decrementAndGet();
194 debuggerMap.remove(this);
198 restartMap.put(this, true);
201 Logger.defaultLogError(
202 "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
210 public static String report(AsyncBarrierImpl barrier) {
211 CacheEntry<?> e = entryMap.get(barrier);
212 if(e != null) return e.toString();
213 else return "Barrier@" + System.identityHashCode(barrier);
216 public static void printReverse(AsyncBarrierImpl barrier, int indent) {
218 if (barrier.get() == 0)
220 for (int i = 0; i < indent; i++)
221 System.err.print(" ");
222 System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
223 // if (BOOKKEEPING) {
224 // Debugger debugger = debuggerMap.get(barrier);
225 // debugger.toErr(indent + 2);
228 Collection<AsyncBarrierImpl> children = reverseLookup.get(barrier);
229 if (children != null) {
230 for (AsyncBarrierImpl child : children)
231 printReverse(child, indent + 2);
236 public void waitBarrier(Object request, ReadGraphImpl impl) {
244 boolean executed = impl.performPending();
245 if(executed) waitCount = 0;
248 if(waitCount > 100) Thread.yield();
249 if(waitCount > 1000) {
252 } catch (InterruptedException e) {
256 if(waitCount > WAIT_TIME*1000) {
258 System.err.println("AsyncBarrierImpl.waitBarrier("
260 + ") is taking long to execute, so far "
261 + (waitCount / 1000) + " s.");
265 synchronized (reverseLookup) {
266 printReverse(this, 0);
271 // if(Development.DEVELOPMENT) {
272 // impl.processor.threadLocks[0].lock();
273 // System.err.println("-queues=" + impl.processor.queues[0].size());
274 // impl.processor.threadLocks[0].unlock();
275 // System.err.println("-own=" + impl.processor.ownTasks[0].size());
276 // System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size());
277 // for(SessionTask task : impl.processor.ownSyncTasks[0]) {
278 // System.err.println("--" + task);
282 throw new RuntimeDatabaseException("Request timed out.");
293 public void restart() {
296 // sema.drainPermits();
298 restartMap.remove(this);
300 debuggerMap.put(this, new Debugger());
303 public void assertReady() {
306 throw new AssertionError("Barrier was not finished (pending="
310 public void report() {
311 // System.out.println("Barrier log:");
312 // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
313 // System.out.println(entry.getKey() + " " + entry.getValue());
315 // System.out.println("SyncIntProcedure log:");
316 // for(Map.Entry<String, Integer> entry :
317 // SyncIntProcedure.counters.entrySet()) {
318 // System.out.println(entry.getKey() + " " + entry.getValue());
323 public String toString() {
325 // return "AsyncBarrierImpl@" + System.identityHashCode(this)
326 // + " - counter = " + get() + " - caller = " + caller;