X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fgraph%2FAsyncBarrierImpl.java;h=f1e5b5cce10cc4758207a257c2f6cfeb4f0a3372;hb=HEAD;hp=232e5cbd7e358b4cfb7171c5e4c1e62f71fa3ab9;hpb=782843a248c4f70d28e8146098f9a77977cc6b3c;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java index 232e5cbd7..f1e5b5cce 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java @@ -11,319 +11,215 @@ *******************************************************************************/ package org.simantics.db.impl.graph; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.impl.query.CacheEntry; import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier; +import org.slf4j.LoggerFactory; + +public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier { + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AsyncBarrierImpl.class); + + private static final long serialVersionUID = 4724463372850048672L; + + static final int WAIT_TIME = 60000; + + public static final boolean PRINT = false; + + final AsyncBarrierImpl caller; + + private final Runnable callback; + + private final boolean needsToBlock; + + public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry entry, Runnable callback, boolean needsToBlock) { + super(0); + this.caller = caller; + this.callback = callback; + this.needsToBlock = needsToBlock; + if (BarrierTracing.BOOKKEEPING) { + BarrierTracing.trace(this, entry); + } + } + + public AsyncBarrier getBlockingBarrier() { + if(needsToBlock) + return this; + if(caller == null) + return null; + else return caller.getBlockingBarrier(); + } + + @Override + public boolean isBlocking() { + return needsToBlock; + } + + @Override + public void inc() { + + if(BarrierTracing.BOOKKEEPING) { + BarrierTracing.inc(this); + } else { + inc(null, null); + } + + } + + void inc(Object id, String info) { + + if(PRINT) { + System.err.println("inc barrier[" + get() + "] " + this); + StackTraceElement[] elems = new Exception().getStackTrace(); + for(int i=0;i<4;i++) System.err.println(elems[i]); + } + + if (incrementAndGet() == 1) { + if (caller != null) { + if(BarrierTracing.BOOKKEEPING) { + caller.inc(this, "Child"); + } else { + caller.inc(null, null); + } + } + } + + } + + @Override + public void dec() { + + if(PRINT) { + System.err.println("dec barrier[" + get() + "] " + this); + StackTraceElement[] elems = new Exception().getStackTrace(); + for(int i=0;i<3;i++) System.err.println(elems[i]); + } + + int count = decrementAndGet(); + if (count < 1) { + if(BarrierTracing.BOOKKEEPING) { + BarrierTracing.dec(this, count); + } + if (count == 0) { + if (caller != null) { + caller.dec(); + } + } + if (count < 0) { + LOGGER.error( + "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.", + new Exception()); + } + assert (count >= 0); + + if(callback != null) + callback.run(); + + } + + } + + public static String report(AsyncBarrierImpl barrier) { + CacheEntry e = BarrierTracing.entryMap.get(barrier); + if(e != null) return e.toString(); + else return "Barrier@" + System.identityHashCode(barrier); + } + + public static void printReverse(AsyncBarrierImpl barrier, int indent) { + + if (barrier.get() == 0) + return; + for (int i = 0; i < indent; i++) + System.err.print(" "); + System.err.println("[" + barrier.get() + " requests]: " + report(barrier)); + + Collection children = BarrierTracing.reverseLookup.get(barrier); + if (children != null) { + for (AsyncBarrierImpl child : children) + printReverse(child, indent + 2); + } + + } + + public void waitBarrier(Object request, ReadGraphImpl impl) { + + if (get() > 0) { + + long waitCount = 0; + + while (get() != 0) { + + boolean executed = impl.performPending(); + if(executed) waitCount = 0; + + ++waitCount; + if(waitCount > 100) Thread.yield(); + if(waitCount > 1000) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if(waitCount > WAIT_TIME) { + + System.err.println("AsyncBarrierImpl.waitBarrier(" + + request + + ") is taking long to execute, so far " + + (waitCount / 1000) + " s."); + + if (BarrierTracing.BOOKKEEPING) { + synchronized (BarrierTracing.reverseLookup) { + printReverse(this, 0); + } + BarrierTracing.printBAPS(); + } + + throw new RuntimeDatabaseException("Request timed out."); + + } + + } + + } + + } + + public void restart() { + assertReady(); + if(BarrierTracing.BOOKKEEPING) { + BarrierTracing.restart(this); + } + } + + public void assertReady() { + int current = get(); + if (current != 0) + throw new AssertionError("Barrier was not finished (pending=" + + current + ")."); + } + + public void report() { + // System.out.println("Barrier log:"); + // for(Map.Entry entry : sources.entrySet()) { + // System.out.println(entry.getKey() + " " + entry.getValue()); + // } + // System.out.println("SyncIntProcedure log:"); + // for(Map.Entry entry : + // SyncIntProcedure.counters.entrySet()) { + // System.out.println(entry.getKey() + " " + entry.getValue()); + // } + } + + @Override + public String toString() { + return report(this); + // return "AsyncBarrierImpl@" + System.identityHashCode(this) + // + " - counter = " + get() + " - caller = " + caller; + } -final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier { - - private static final long serialVersionUID = 4724463372850048672L; - - static final HashMap> reverseLookup = new HashMap>(); - public static final HashMap debuggerMap = new HashMap(); - static final HashMap entryMap = new HashMap(); - static final HashMap restartMap = new HashMap(); - - static final int WAIT_TIME = 600; - - public static final boolean BOOKKEEPING = true; - public static final boolean PRINT = false; - static final boolean RESTART_GUARD = true; - - final public AsyncBarrierImpl caller; - - //private final Semaphore sema = new Semaphore(0); - - public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry entry) { - super(0); - if (BOOKKEEPING) { - synchronized (entryMap) { - entryMap.put(this, entry); - } - synchronized (debuggerMap) { - debuggerMap.put(this, new Debugger()); - } - synchronized (reverseLookup) { - Collection barriers = reverseLookup - .get(caller); - if (barriers == null) { - barriers = new ArrayList(); - reverseLookup.put(caller, barriers); - } - barriers.add(this); - } - } - this.caller = caller; - } - - public class Debugger { - - public Map infos = new HashMap<>(); - - public synchronized void inc(AsyncBarrierImpl id, String info) { - if (id == null) - return; - String exist = infos.get(id); - if (exist != null) - throw new IllegalStateException("Already existing info " + id + " " + info); - infos.put(id, exist); - } - - public synchronized void dec(AsyncBarrierImpl id) { - if (id == null) - return; - String exist = infos.get(id); - if (exist == null) { - System.err.println("No data for " + id); - } else { - infos.remove(id); - } - } - - @Override - public synchronized String toString() { - StringBuilder b = new StringBuilder(); - for (String s : infos.values()) { - b.append("info " + s + "\r\n"); - } - return b.toString(); - } - - public boolean isEmpty() { - return infos.isEmpty(); - } - - } - - public void inc() { - - if (BOOKKEEPING) - inc(this, new Exception().getStackTrace()[2].toString()); - else - inc(null, null); - - if (RESTART_GUARD) - if(restartMap.containsKey(this)) - throw new IllegalStateException("Unplanned restart"); - - } - - private void inc(Object id, String info) { - - // if (PRINT) { - // if (get() < 5) - // new Exception("inc " + get() + " " + this).printStackTrace(); - // } - - if (BOOKKEEPING) { -// Debugger debugger = debuggerMap.get(this); -// if (debugger != null) -// debugger.inc(id, info); - } - - if(PRINT) { - - System.err.println("inc barrier[" + get() + "] " + this); - StackTraceElement[] elems = new Exception().getStackTrace(); - for(int i=0;i<4;i++) System.err.println(elems[i]); - - } - - if (incrementAndGet() == 1) { - if (caller != null) { - if (BOOKKEEPING) - caller.inc(this, "Child"); - else - caller.inc(null, null); - } - } - - } - - public void dec() { - - if(PRINT) { - System.err.println("dec barrier[" + get() + "] " + this); - StackTraceElement[] elems = new Exception().getStackTrace(); - for(int i=0;i<3;i++) System.err.println(elems[i]); - } - - int count = decrementAndGet(); - if (count < 1) { - if (count == 0) { - if (caller != null) - caller.dec(this); - // sema.release(); - } - if (count < 0) { - Logger.defaultLogError( - "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.", - new Exception()); - } - assert (count >= 0); - } - - } - - public void dec(Object id) { - - if (PRINT) { - if (get() < 5) - new Exception("dec" + get() + " " + this).printStackTrace(); - } - - if (BOOKKEEPING) { -// Debugger debugger = debuggerMap.get(this); -// if (debugger != null) { -// debugger.dec(id); -// if(debugger.isEmpty()) -// debuggerMap.remove(this); -// } - } - - int count = decrementAndGet(); - if (count < 1) { - if (count == 0) { - debuggerMap.remove(this); - if (caller != null) - caller.dec(this); - if (RESTART_GUARD) - restartMap.put(this, true); - } - if (count < 0) { - Logger.defaultLogError( - "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.", - new Exception()); - System.exit(-1); - } - assert (count >= 0); - } - } - - public static String report(AsyncBarrierImpl barrier) { - CacheEntry e = entryMap.get(barrier); - if(e != null) return e.toString(); - else return "Barrier@" + System.identityHashCode(barrier); - } - - public static void printReverse(AsyncBarrierImpl barrier, int indent) { - - if (barrier.get() == 0) - return; - for (int i = 0; i < indent; i++) - System.err.print(" "); - System.err.println("[" + barrier.get() + " requests]: " + report(barrier)); -// if (BOOKKEEPING) { -// Debugger debugger = debuggerMap.get(barrier); -// debugger.toErr(indent + 2); -// } - - Collection children = reverseLookup.get(barrier); - if (children != null) { - for (AsyncBarrierImpl child : children) - printReverse(child, indent + 2); - } - - } - - public void waitBarrier(Object request, ReadGraphImpl impl) { - - if (get() > 0) { - - long waitCount = 0; - - while (get() != 0) { - - boolean executed = impl.performPending(); - if(executed) waitCount = 0; - - ++waitCount; - if(waitCount > 100) Thread.yield(); - if(waitCount > 1000) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - if(waitCount > WAIT_TIME*1000) { - - System.err.println("AsyncBarrierImpl.waitBarrier(" - + request - + ") is taking long to execute, so far " - + (waitCount / 1000) + " s."); - - if (BOOKKEEPING) { - - synchronized (reverseLookup) { - printReverse(this, 0); - } - - } - -// if(Development.DEVELOPMENT) { -// impl.processor.threadLocks[0].lock(); -// System.err.println("-queues=" + impl.processor.queues[0].size()); -// impl.processor.threadLocks[0].unlock(); -// System.err.println("-own=" + impl.processor.ownTasks[0].size()); -// System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size()); -// for(SessionTask task : impl.processor.ownSyncTasks[0]) { -// System.err.println("--" + task); -// } -// } - - throw new RuntimeDatabaseException("Request timed out."); - //waitCount = 0; - - } - - } - - } - - } - - public void restart() { - assertReady(); - // log.clear(); - // sema.drainPermits(); - if (RESTART_GUARD) - restartMap.remove(this); - if (BOOKKEEPING) - debuggerMap.put(this, new Debugger()); - } - - public void assertReady() { - int current = get(); - if (current != 0) - throw new AssertionError("Barrier was not finished (pending=" - + current + ")."); - } - - public void report() { - // System.out.println("Barrier log:"); - // for(Map.Entry entry : sources.entrySet()) { - // System.out.println(entry.getKey() + " " + entry.getValue()); - // } - // System.out.println("SyncIntProcedure log:"); - // for(Map.Entry entry : - // SyncIntProcedure.counters.entrySet()) { - // System.out.println(entry.getKey() + " " + entry.getValue()); - // } - } - - @Override - public String toString() { - return report(this); -// return "AsyncBarrierImpl@" + System.identityHashCode(this) -// + " - counter = " + get() + " - caller = " + caller; - } }