1 /*******************************************************************************
2 * Copyright (c) 2007, 2018 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.graph;
14 import java.util.Collection;
15 import java.util.concurrent.atomic.AtomicInteger;
17 import org.simantics.db.exception.RuntimeDatabaseException;
18 import org.simantics.db.impl.query.CacheEntry;
19 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
20 import org.slf4j.LoggerFactory;
22 public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
24 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AsyncBarrierImpl.class);
26 private static final long serialVersionUID = 4724463372850048672L;
28 static final int WAIT_TIME = 60000;
30 public static final boolean PRINT = BarrierTracing.BOOKKEEPING && true;
32 final AsyncBarrierImpl caller;
34 private final Runnable callback;
36 private final boolean needsToBlock;
38 public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry, Runnable callback, boolean needsToBlock) {
41 this.callback = callback;
42 this.needsToBlock = needsToBlock;
43 if (BarrierTracing.BOOKKEEPING) {
44 BarrierTracing.trace(this, entry);
48 public AsyncBarrier getBlockingBarrier() {
53 else return caller.getBlockingBarrier();
57 public boolean isBlocking() {
64 if(BarrierTracing.BOOKKEEPING) {
65 BarrierTracing.inc(this);
72 void inc(Object id, String info) {
75 LOGGER.warn("inc barrier[" + get() + "] " + this);
76 StackTraceElement[] elems = new Exception().getStackTrace();
77 for(int i=0;i<4;i++) LOGGER.warn("{}", elems[i]);
80 if (incrementAndGet() == 1) {
82 if(BarrierTracing.BOOKKEEPING) {
83 caller.inc(this, "Child");
85 caller.inc(null, null);
96 LOGGER.warn("dec barrier[" + get() + "] " + this);
97 StackTraceElement[] elems = new Exception().getStackTrace();
98 for(int i=0;i<3;i++) LOGGER.warn("{}", elems[i]);
101 int count = decrementAndGet();
103 if(BarrierTracing.BOOKKEEPING) {
104 BarrierTracing.dec(this, count);
107 if (caller != null) {
113 "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
125 public static String report(AsyncBarrierImpl barrier) {
126 CacheEntry<?> e = BarrierTracing.entryMap.get(barrier);
127 if(e != null) return e.toString();
128 else return "[" + barrier.get() + " requests]: Barrier@" + System.identityHashCode(barrier);
131 public static void printReverse(AsyncBarrierImpl barrier, int indent) {
133 if (barrier.get() == 0)
135 for (int i = 0; i < indent; i++)
136 System.err.print(" ");
137 System.err.println(report(barrier));
139 Collection<AsyncBarrierImpl> children = BarrierTracing.reverseLookup.get(barrier);
140 if (children != null) {
141 for (AsyncBarrierImpl child : children)
142 printReverse(child, indent + 2);
147 public void waitBarrier(Object request, ReadGraphImpl impl) {
155 boolean executed = impl.performPending();
156 if(executed) waitCount = 0;
159 if(waitCount > 100) Thread.yield();
160 if(waitCount > 1000) {
163 } catch (InterruptedException e) {
167 if(waitCount > WAIT_TIME) {
169 LOGGER.warn("AsyncBarrierImpl.waitBarrier("
171 + ") is taking long to execute, so far "
172 + (waitCount / 1000) + " s. this.get() = " + get());
174 if (BarrierTracing.BOOKKEEPING) {
175 synchronized (BarrierTracing.reverseLookup) {
176 printReverse(this, 0);
178 BarrierTracing.printBAPS();
181 throw new RuntimeDatabaseException("Request timed out.");
191 public void restart() {
193 if(BarrierTracing.BOOKKEEPING) {
194 BarrierTracing.restart(this);
198 public void assertReady() {
201 throw new AssertionError("Barrier was not finished (pending="
205 public void report() {
206 // System.out.println("Barrier log:");
207 // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
208 // System.out.println(entry.getKey() + " " + entry.getValue());
210 // System.out.println("SyncIntProcedure log:");
211 // for(Map.Entry<String, Integer> entry :
212 // SyncIntProcedure.counters.entrySet()) {
213 // System.out.println(entry.getKey() + " " + entry.getValue());
218 public String toString() {
220 // return "AsyncBarrierImpl@" + System.identityHashCode(this)
221 // + " - counter = " + get() + " - caller = " + caller;