X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fgraph%2FAsyncBarrierImpl.java;fp=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fgraph%2FAsyncBarrierImpl.java;h=c50dfa0949aacff0675c97754bef446d4cddcd10;hp=0000000000000000000000000000000000000000;hb=969bd23cab98a79ca9101af33334000879fb60c5;hpb=866dba5cd5a3929bbeae85991796acb212338a08 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java new file mode 100644 index 000000000..c50dfa094 --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java @@ -0,0 +1,383 @@ +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.db.impl.graph; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.simantics.db.common.utils.Logger; +import org.simantics.db.exception.RuntimeDatabaseException; +import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier; +import org.simantics.db.impl.query.QueryProcessor.SessionTask; +import org.simantics.utils.Development; + +final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier { + + private static final long serialVersionUID = 4724463372850048672L; + + static final HashMap> reverseLookup = new HashMap>(); + static final HashMap debuggerMap = new HashMap(); + static final HashMap restartMap = new HashMap(); + + static final int WAIT_TIME = 600; + + public static final boolean BOOKKEEPING = false; + public static final boolean PRINT = false; + static final boolean RESTART_GUARD = false; + + final private AsyncBarrierImpl caller; + + public AsyncBarrierImpl(AsyncBarrierImpl caller) { + super(0); + if (BOOKKEEPING) { + synchronized (debuggerMap) { + debuggerMap.put(this, new Debugger()); + } + synchronized (reverseLookup) { + Collection barriers = reverseLookup + .get(caller); + if (barriers == null) { + barriers = new ArrayList(); + reverseLookup.put(caller, barriers); + } + barriers.add(this); + } + } + this.caller = caller; + } + + public class Debugger { + public HashMap> infos = new HashMap>(); + + public synchronized void inc(Object id, String info) { + if (id == null) + return; + ArrayList exist = infos.get(id); + if (exist == null) { + exist = new ArrayList(); + infos.put(id, exist); + } else { + // System.err.println("Appending " + id + " += " + info); + } + exist.add(info); + // String exist = infos.put(id, info); + // if(exist != null) System.err.println("replacing " + exist + + // " => " + info + " for " + id); + } + + public synchronized void dec(Object id) { + if (id == null) + return; + ArrayList exist = infos.get(id); + if (exist == null) { + System.err.println("No data for " + id); + } else { + exist.remove(0); + if (exist.isEmpty()) + infos.remove(id); + } + } + + @Override + public synchronized String toString() { + StringBuilder b = new StringBuilder(); + for (ArrayList ss : infos.values()) { + for (String s : ss) + b.append("info " + s + "\r\n"); + } + return b.toString(); + } + + public synchronized void toErr(int indent) { + char[] spaces = new char[indent]; + Arrays.fill(spaces, ' '); + for (ArrayList ss : infos.values()) { + for (String s : ss) { + if (!s.startsWith("#")) + continue; + StringBuilder b = new StringBuilder(); + b.append(spaces); + b.append(s); + System.err.println(b.toString()); + } + } + } + } + + public void inc() { + + if (BOOKKEEPING) + inc(new Object(), new Exception().getStackTrace()[1].toString()); + else + inc(null, null); + + } + + public void inc(String debug) { + + if (BOOKKEEPING) + inc(new Object(), new Exception().getStackTrace()[1].toString()); + else + inc(null, null); + + } + + public void inc(Object id, String info) { + + // if (PRINT) { + // if (get() < 5) + // new Exception("inc " + get() + " " + this).printStackTrace(); + // } + + if (BOOKKEEPING) { + Debugger debugger = debuggerMap.get(this); + if (debugger != null) + debugger.inc(id, info); + // StackTraceElement[] tr = new Exception().getStackTrace(); + // if(tr.length == 4) + // debugger.inc(new String[] { debug, tr[2].toString(), + // tr[3].toString() }); + // else if(tr.length == 5) + // debugger.inc(new String[] { debug, tr[2].toString(), + // tr[3].toString(), tr[4].toString() }); + // else if(tr.length == 6) + // debugger.inc(new String[] { debug, tr[2].toString(), + // tr[3].toString(), tr[4].toString(), tr[5].toString() }); + // else + // debugger.inc(new String[] { debug, tr[2].toString(), + // tr[3].toString(), tr[4].toString(), tr[5].toString(), + // tr[6].toString() }); + } + + // new Exception().printStackTrace(); + + if(PRINT) { + + System.err.println("inc barrier[" + get() + "] " + this); + StackTraceElement[] elems = new Exception().getStackTrace(); + for(int i=0;i<4;i++) System.err.println(elems[i]); + + } + + if (incrementAndGet() == 1) { + if (caller != null) { + if (BOOKKEEPING) + caller.inc(this, "Child"); + else + caller.inc(null, null); + } + } + + } + + public void dec() { + + if(PRINT) { + System.err.println("dec barrier[" + get() + "] " + this); + StackTraceElement[] elems = new Exception().getStackTrace(); + for(int i=0;i<3;i++) System.err.println(elems[i]); + } + + int count = decrementAndGet(); + if (count < 1) { + if (count == 0) { + if (caller != null) + caller.dec(this); + // sema.release(); + } + if (count < 0) { + Logger.defaultLogError( + "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.", + new Exception()); + } + assert (count >= 0); + } + + } + + public void dec(Object id) { + + if (PRINT) { + if (get() < 5) + new Exception("dec" + get() + " " + this).printStackTrace(); + } + + if (BOOKKEEPING) { + Debugger debugger = debuggerMap.get(this); + if (debugger != null) + debugger.dec(id); + // StackTraceElement[] tr = new Exception().getStackTrace(); + // if(tr.length == 3) + // debugger.dec(new String[] { debug, tr[2].toString() }); + // else if(tr.length == 4) + // debugger.dec(new String[] { debug, tr[2].toString(), + // tr[3].toString() }); + // else + // debugger.dec(new String[] { debug, tr[2].toString(), + // tr[3].toString(), tr[4].toString() }); + } + + // System.err.println("barrier " + this); + // StackTraceElement[] elems = new Exception().getStackTrace(); + // for(int i=0;i<3;i++) System.err.println(elems[i]); + // new Exception().printStackTrace(); + + int count = decrementAndGet(); + if (count < 1) { + if (count == 0) { + if (caller != null) + caller.dec(this); + if (RESTART_GUARD) + restartMap.put(this, true); + // sema.release(); + // if(DEBUGGER) { + // debuggerMap.remove(this); + // } + // if(REVERSE_LOOKUP) { + // reverseLookup.remove(this); + // } + } + if (count < 0) { + Logger.defaultLogError( + "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.", + new Exception()); + // String message = ; + // System.out.println(message); + // if (DEBUGGER) { + // JOptionPane.showMessageDialog(null, message); + // System.out.println(debugger); + // } + // sema.release(); + System.exit(-1); + } + assert (count >= 0); + } + } + + private static void printReverse(AsyncBarrierImpl barrier, int indent) { + + if (barrier.get() == 0) + return; + for (int i = 0; i < indent; i++) + System.err.print(" "); + System.err.println("[" + barrier.get() + " requests]: " + barrier); + if (BOOKKEEPING) { + Debugger debugger = debuggerMap.get(barrier); + debugger.toErr(indent + 2); + } + + Collection children = reverseLookup.get(barrier); + if (children != null) { + for (AsyncBarrierImpl child : children) + printReverse(child, indent + 2); + } + + } + + public void waitBarrier(Object request, ReadGraphImpl impl) { + + if (get() > 0) { + + long waitCount = 0; + + while (get() != 0) { + + boolean executed = impl.processor.resume(impl); + if(executed) waitCount = 0; + + ++waitCount; + if(waitCount > 100) Thread.yield(); + if(waitCount > 1000) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if(waitCount > WAIT_TIME*1000) { + + System.err.println("AsyncBarrierImpl.waitBarrier(" + + request + + ") is taking long to execute, so far " + + (waitCount / 1000) + " s."); + + if (BOOKKEEPING) { + + synchronized (reverseLookup) { + printReverse(this, 0); + } + + } + + if(Development.DEVELOPMENT) { + + impl.processor.threadLocks[0].lock(); + System.err.println("-queues=" + impl.processor.queues[0].size()); + impl.processor.threadLocks[0].unlock(); + System.err.println("-own=" + impl.processor.ownTasks[0].size()); + System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size()); + for(SessionTask task : impl.processor.ownSyncTasks[0]) { + System.err.println("--" + task); + } + + } + + throw new RuntimeDatabaseException("Request timed out."); + //waitCount = 0; + + } + + } + + } + + } + + public void restart() { + assertReady(); + // log.clear(); + // sema.drainPermits(); + if (RESTART_GUARD) + restartMap.remove(this); + if (BOOKKEEPING) + debuggerMap.put(this, new Debugger()); + } + + public void assertReady() { + int current = get(); + if (current != 0) + throw new AssertionError("Barrier was not finished (pending=" + + current + ")."); + } + + public void report() { + // System.out.println("Barrier log:"); + // for(Map.Entry entry : sources.entrySet()) { + // System.out.println(entry.getKey() + " " + entry.getValue()); + // } + // System.out.println("SyncIntProcedure log:"); + // for(Map.Entry entry : + // SyncIntProcedure.counters.entrySet()) { + // System.out.println(entry.getKey() + " " + entry.getValue()); + // } + } + + @Override + public String toString() { + return "AsyncBarrierImpl@" + System.identityHashCode(this) + + " - counter = " + get() + " - caller = " + caller; + } + +}