1 /*******************************************************************************
2 * Copyright (c) 2007, 2018 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.graph;
14 import java.util.Collection;
15 import java.util.concurrent.atomic.AtomicInteger;
17 import org.simantics.db.common.utils.Logger;
18 import org.simantics.db.exception.RuntimeDatabaseException;
19 import org.simantics.db.impl.query.CacheEntry;
20 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
21 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
23 final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
25 private static final long serialVersionUID = 4724463372850048672L;
27 static final int WAIT_TIME = 60000;
29 public static final boolean PRINT = false;
31 final public AsyncBarrierImpl caller;
33 final public Runnable callback;
35 public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry, Runnable callback) {
38 this.callback = callback;
39 if (BarrierTracing.BOOKKEEPING) {
40 BarrierTracing.trace(this, entry);
47 if(BarrierTracing.BOOKKEEPING) {
48 BarrierTracing.inc(this);
55 void inc(Object id, String info) {
58 System.err.println("inc barrier[" + get() + "] " + this);
59 StackTraceElement[] elems = new Exception().getStackTrace();
60 for(int i=0;i<4;i++) System.err.println(elems[i]);
63 if (incrementAndGet() == 1) {
65 if(BarrierTracing.BOOKKEEPING) {
66 caller.inc(this, "Child");
68 caller.inc(null, null);
78 System.err.println("dec barrier[" + get() + "] " + this);
79 StackTraceElement[] elems = new Exception().getStackTrace();
80 for(int i=0;i<3;i++) System.err.println(elems[i]);
83 int count = decrementAndGet();
85 if(BarrierTracing.BOOKKEEPING) {
86 BarrierTracing.dec(this, count);
94 Logger.defaultLogError(
95 "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
107 public static String report(AsyncBarrierImpl barrier) {
108 CacheEntry<?> e = BarrierTracing.entryMap.get(barrier);
109 if(e != null) return e.toString();
110 else return "Barrier@" + System.identityHashCode(barrier);
113 public static void printReverse(AsyncBarrierImpl barrier, int indent) {
115 if (barrier.get() == 0)
117 for (int i = 0; i < indent; i++)
118 System.err.print(" ");
119 System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
120 // if (BOOKKEEPING) {
121 // Debugger debugger = debuggerMap.get(barrier);
122 // debugger.toErr(indent + 2);
125 Collection<AsyncBarrierImpl> children = BarrierTracing.reverseLookup.get(barrier);
126 if (children != null) {
127 for (AsyncBarrierImpl child : children)
128 printReverse(child, indent + 2);
133 public void waitBarrier(Object request, ReadGraphImpl impl) {
141 boolean executed = impl.performPending();
142 if(executed) waitCount = 0;
145 if(waitCount > 100) Thread.yield();
146 if(waitCount > 1000) {
149 } catch (InterruptedException e) {
153 if(waitCount > WAIT_TIME) {
155 System.err.println("AsyncBarrierImpl.waitBarrier("
157 + ") is taking long to execute, so far "
158 + (waitCount / 1000) + " s.");
160 if (BarrierTracing.BOOKKEEPING) {
161 synchronized (BarrierTracing.reverseLookup) {
162 printReverse(this, 0);
164 BarrierTracing.printBAPS();
167 for(SessionTask t : impl.processor.freeScheduling) {
168 System.err.println("Pending task:" + t);
171 // if(Development.DEVELOPMENT) {
172 // impl.processor.threadLocks[0].lock();
173 // System.err.println("-queues=" + impl.processor.queues[0].size());
174 // impl.processor.threadLocks[0].unlock();
175 // System.err.println("-own=" + impl.processor.ownTasks[0].size());
176 // System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size());
177 // for(SessionTask task : impl.processor.ownSyncTasks[0]) {
178 // System.err.println("--" + task);
182 throw new RuntimeDatabaseException("Request timed out.");
193 public void restart() {
195 if(BarrierTracing.BOOKKEEPING) {
196 BarrierTracing.restart(this);
200 public void assertReady() {
203 throw new AssertionError("Barrier was not finished (pending="
207 public void report() {
208 // System.out.println("Barrier log:");
209 // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
210 // System.out.println(entry.getKey() + " " + entry.getValue());
212 // System.out.println("SyncIntProcedure log:");
213 // for(Map.Entry<String, Integer> entry :
214 // SyncIntProcedure.counters.entrySet()) {
215 // System.out.println(entry.getKey() + " " + entry.getValue());
220 public String toString() {
222 // return "AsyncBarrierImpl@" + System.identityHashCode(this)
223 // + " - counter = " + get() + " - caller = " + caller;