/******************************************************************************* * Copyright (c) 2007, 2018 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.impl.graph; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.impl.query.CacheEntry; import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier; import org.slf4j.LoggerFactory; public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AsyncBarrierImpl.class); private static final long serialVersionUID = 4724463372850048672L; static final int WAIT_TIME = 60000; public static final boolean PRINT = false; final AsyncBarrierImpl caller; private final Runnable callback; private final boolean needsToBlock; public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry entry, Runnable callback, boolean needsToBlock) { super(0); this.caller = caller; this.callback = callback; this.needsToBlock = needsToBlock; if (BarrierTracing.BOOKKEEPING) { BarrierTracing.trace(this, entry); } } public AsyncBarrier getBlockingBarrier() { if(needsToBlock) return this; if(caller == null) return null; else return caller.getBlockingBarrier(); } @Override public boolean isBlocking() { return needsToBlock; } @Override public void inc() { if(BarrierTracing.BOOKKEEPING) { BarrierTracing.inc(this); } else { inc(null, null); } } void inc(Object id, String info) { if(PRINT) { System.err.println("inc barrier[" + get() + "] " + this); StackTraceElement[] elems = new Exception().getStackTrace(); for(int i=0;i<4;i++) System.err.println(elems[i]); } if (incrementAndGet() == 1) { if (caller != null) { if(BarrierTracing.BOOKKEEPING) { caller.inc(this, "Child"); } else { caller.inc(null, null); } } } } @Override public void dec() { if(PRINT) { System.err.println("dec barrier[" + get() + "] " + this); StackTraceElement[] elems = new Exception().getStackTrace(); for(int i=0;i<3;i++) System.err.println(elems[i]); } int count = decrementAndGet(); if (count < 1) { if(BarrierTracing.BOOKKEEPING) { BarrierTracing.dec(this, count); } if (count == 0) { if (caller != null) { caller.dec(); } } if (count < 0) { LOGGER.error( "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.", new Exception()); } assert (count >= 0); if(callback != null) callback.run(); } } public static String report(AsyncBarrierImpl barrier) { CacheEntry e = BarrierTracing.entryMap.get(barrier); if(e != null) return e.toString(); else return "Barrier@" + System.identityHashCode(barrier); } public static void printReverse(AsyncBarrierImpl barrier, int indent) { if (barrier.get() == 0) return; for (int i = 0; i < indent; i++) System.err.print(" "); System.err.println("[" + barrier.get() + " requests]: " + report(barrier)); Collection children = BarrierTracing.reverseLookup.get(barrier); if (children != null) { for (AsyncBarrierImpl child : children) printReverse(child, indent + 2); } } public void waitBarrier(Object request, ReadGraphImpl impl) { if (get() > 0) { long waitCount = 0; while (get() != 0) { boolean executed = impl.performPending(); if(executed) waitCount = 0; ++waitCount; if(waitCount > 100) Thread.yield(); if(waitCount > 1000) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } if(waitCount > WAIT_TIME) { System.err.println("AsyncBarrierImpl.waitBarrier(" + request + ") is taking long to execute, so far " + (waitCount / 1000) + " s."); if (BarrierTracing.BOOKKEEPING) { synchronized (BarrierTracing.reverseLookup) { printReverse(this, 0); } BarrierTracing.printBAPS(); } throw new RuntimeDatabaseException("Request timed out."); } } } } public void restart() { assertReady(); if(BarrierTracing.BOOKKEEPING) { BarrierTracing.restart(this); } } public void assertReady() { int current = get(); if (current != 0) throw new AssertionError("Barrier was not finished (pending=" + current + ")."); } public void report() { // System.out.println("Barrier log:"); // for(Map.Entry entry : sources.entrySet()) { // System.out.println(entry.getKey() + " " + entry.getValue()); // } // System.out.println("SyncIntProcedure log:"); // for(Map.Entry entry : // SyncIntProcedure.counters.entrySet()) { // System.out.println(entry.getKey() + " " + entry.getValue()); // } } @Override public String toString() { return report(this); // return "AsyncBarrierImpl@" + System.identityHashCode(this) // + " - counter = " + get() + " - caller = " + caller; } }