/******************************************************************************* * Copyright (c) 2007, 2018 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.impl.graph; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier; final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier { private static final long serialVersionUID = 4724463372850048672L; static final HashMap> reverseLookup = new HashMap>(); static final HashMap debuggerMap = new HashMap(); static final HashMap restartMap = new HashMap(); static final int WAIT_TIME = 600; public static final boolean BOOKKEEPING = false; public static final boolean PRINT = false; static final boolean RESTART_GUARD = false; final private AsyncBarrierImpl caller; //private final Semaphore sema = new Semaphore(0); public AsyncBarrierImpl(AsyncBarrierImpl caller) { super(0); if (BOOKKEEPING) { synchronized (debuggerMap) { debuggerMap.put(this, new Debugger()); } synchronized (reverseLookup) { Collection barriers = reverseLookup .get(caller); if (barriers == null) { barriers = new ArrayList(); reverseLookup.put(caller, barriers); } barriers.add(this); } } this.caller = caller; } public class Debugger { public HashMap> infos = new HashMap>(); public synchronized void inc(Object id, String info) { if (id == null) return; ArrayList exist = infos.get(id); if (exist == null) { exist = new ArrayList(); infos.put(id, exist); } else { // System.err.println("Appending " + id + " += " + info); } exist.add(info); // String exist = infos.put(id, info); // if(exist != null) System.err.println("replacing " + exist + // " => " + info + " for " + id); } public synchronized void dec(Object id) { if (id == null) return; ArrayList exist = infos.get(id); if (exist == null) { System.err.println("No data for " + id); } else { exist.remove(0); if (exist.isEmpty()) infos.remove(id); } } @Override public synchronized String toString() { StringBuilder b = new StringBuilder(); for (ArrayList ss : infos.values()) { for (String s : ss) b.append("info " + s + "\r\n"); } return b.toString(); } public synchronized void toErr(int indent) { char[] spaces = new char[indent]; Arrays.fill(spaces, ' '); for (ArrayList ss : infos.values()) { for (String s : ss) { if (!s.startsWith("#")) continue; StringBuilder b = new StringBuilder(); b.append(spaces); b.append(s); System.err.println(b.toString()); } } } } public void inc() { if (BOOKKEEPING) inc(new Object(), new Exception().getStackTrace()[1].toString()); else inc(null, null); } public void inc(String debug) { if (BOOKKEEPING) inc(new Object(), new Exception().getStackTrace()[1].toString()); else inc(null, null); } public void inc(Object id, String info) { // if (PRINT) { // if (get() < 5) // new Exception("inc " + get() + " " + this).printStackTrace(); // } if (BOOKKEEPING) { Debugger debugger = debuggerMap.get(this); if (debugger != null) debugger.inc(id, info); // StackTraceElement[] tr = new Exception().getStackTrace(); // if(tr.length == 4) // debugger.inc(new String[] { debug, tr[2].toString(), // tr[3].toString() }); // else if(tr.length == 5) // debugger.inc(new String[] { debug, tr[2].toString(), // tr[3].toString(), tr[4].toString() }); // else if(tr.length == 6) // debugger.inc(new String[] { debug, tr[2].toString(), // tr[3].toString(), tr[4].toString(), tr[5].toString() }); // else // debugger.inc(new String[] { debug, tr[2].toString(), // tr[3].toString(), tr[4].toString(), tr[5].toString(), // tr[6].toString() }); } // new Exception().printStackTrace(); if(PRINT) { System.err.println("inc barrier[" + get() + "] " + this); StackTraceElement[] elems = new Exception().getStackTrace(); for(int i=0;i<4;i++) System.err.println(elems[i]); } if (incrementAndGet() == 1) { if (caller != null) { if (BOOKKEEPING) caller.inc(this, "Child"); else caller.inc(null, null); } } } public void dec() { if(PRINT) { System.err.println("dec barrier[" + get() + "] " + this); StackTraceElement[] elems = new Exception().getStackTrace(); for(int i=0;i<3;i++) System.err.println(elems[i]); } int count = decrementAndGet(); if (count < 1) { if (count == 0) { if (caller != null) caller.dec(this); // sema.release(); } if (count < 0) { Logger.defaultLogError( "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.", new Exception()); } assert (count >= 0); } } public void dec(Object id) { if (PRINT) { if (get() < 5) new Exception("dec" + get() + " " + this).printStackTrace(); } if (BOOKKEEPING) { Debugger debugger = debuggerMap.get(this); if (debugger != null) debugger.dec(id); // StackTraceElement[] tr = new Exception().getStackTrace(); // if(tr.length == 3) // debugger.dec(new String[] { debug, tr[2].toString() }); // else if(tr.length == 4) // debugger.dec(new String[] { debug, tr[2].toString(), // tr[3].toString() }); // else // debugger.dec(new String[] { debug, tr[2].toString(), // tr[3].toString(), tr[4].toString() }); } // System.err.println("barrier " + this); // StackTraceElement[] elems = new Exception().getStackTrace(); // for(int i=0;i<3;i++) System.err.println(elems[i]); // new Exception().printStackTrace(); int count = decrementAndGet(); if (count < 1) { if (count == 0) { if (caller != null) caller.dec(this); if (RESTART_GUARD) restartMap.put(this, true); // sema.release(); // if(DEBUGGER) { // debuggerMap.remove(this); // } // if(REVERSE_LOOKUP) { // reverseLookup.remove(this); // } } if (count < 0) { Logger.defaultLogError( "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.", new Exception()); // String message = ; // System.out.println(message); // if (DEBUGGER) { // JOptionPane.showMessageDialog(null, message); // System.out.println(debugger); // } // sema.release(); System.exit(-1); } assert (count >= 0); } } private static void printReverse(AsyncBarrierImpl barrier, int indent) { if (barrier.get() == 0) return; for (int i = 0; i < indent; i++) System.err.print(" "); System.err.println("[" + barrier.get() + " requests]: " + barrier); if (BOOKKEEPING) { Debugger debugger = debuggerMap.get(barrier); debugger.toErr(indent + 2); } Collection children = reverseLookup.get(barrier); if (children != null) { for (AsyncBarrierImpl child : children) printReverse(child, indent + 2); } } public void waitBarrier(Object request, ReadGraphImpl impl) { if (get() > 0) { long waitCount = 0; while (get() != 0) { boolean executed = impl.performPending(); if(executed) waitCount = 0; ++waitCount; if(waitCount > 100) Thread.yield(); if(waitCount > 1000) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } if(waitCount > WAIT_TIME*1000) { System.err.println("AsyncBarrierImpl.waitBarrier(" + request + ") is taking long to execute, so far " + (waitCount / 1000) + " s."); if (BOOKKEEPING) { synchronized (reverseLookup) { printReverse(this, 0); } } // if(Development.DEVELOPMENT) { // impl.processor.threadLocks[0].lock(); // System.err.println("-queues=" + impl.processor.queues[0].size()); // impl.processor.threadLocks[0].unlock(); // System.err.println("-own=" + impl.processor.ownTasks[0].size()); // System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size()); // for(SessionTask task : impl.processor.ownSyncTasks[0]) { // System.err.println("--" + task); // } // } throw new RuntimeDatabaseException("Request timed out."); //waitCount = 0; } } } } public void restart() { assertReady(); // log.clear(); // sema.drainPermits(); if (RESTART_GUARD) restartMap.remove(this); if (BOOKKEEPING) debuggerMap.put(this, new Debugger()); } public void assertReady() { int current = get(); if (current != 0) throw new AssertionError("Barrier was not finished (pending=" + current + ")."); } public void report() { // System.out.println("Barrier log:"); // for(Map.Entry entry : sources.entrySet()) { // System.out.println(entry.getKey() + " " + entry.getValue()); // } // System.out.println("SyncIntProcedure log:"); // for(Map.Entry entry : // SyncIntProcedure.counters.entrySet()) { // System.out.println(entry.getKey() + " " + entry.getValue()); // } } @Override public String toString() { return "AsyncBarrierImpl@" + System.identityHashCode(this) + " - counter = " + get() + " - caller = " + caller; } }