-final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
-
- private static final long serialVersionUID = 4724463372850048672L;
-
- static final HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>> reverseLookup = new HashMap<>();
- public static final HashMap<AsyncBarrierImpl, Debugger> debuggerMap = new HashMap<>();
- static final HashMap<AsyncBarrierImpl, CacheEntry<?>> entryMap = new HashMap<>();
- static final HashMap<AsyncBarrierImpl, Boolean> restartMap = new HashMap<>();
-
- static final int WAIT_TIME = 600;
-
- public static final boolean BOOKKEEPING = false;
- public static final boolean PRINT = false;
- static final boolean RESTART_GUARD = true;
-
- final public AsyncBarrierImpl caller;
-
- //private final Semaphore sema = new Semaphore(0);
-
- public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry) {
- super(0);
- if (BOOKKEEPING) {
- synchronized (entryMap) {
- entryMap.put(this, entry);
- }
- synchronized (debuggerMap) {
- debuggerMap.put(this, new Debugger());
- }
- synchronized (reverseLookup) {
- Collection<AsyncBarrierImpl> barriers = reverseLookup
- .get(caller);
- if (barriers == null) {
- barriers = new ArrayList<AsyncBarrierImpl>();
- reverseLookup.put(caller, barriers);
- }
- barriers.add(this);
- }
- }
- this.caller = caller;
- }
-
- public class Debugger {
-
- public Map<AsyncBarrierImpl, String> infos = new HashMap<>();
-
- public synchronized void inc(AsyncBarrierImpl id, String info) {
- if (id == null)
- return;
- String exist = infos.get(id);
- if (exist != null)
- throw new IllegalStateException("Already existing info " + id + " " + info);
- infos.put(id, exist);
- }
-
- public synchronized void dec(AsyncBarrierImpl id) {
- if (id == null)
- return;
- String exist = infos.get(id);
- if (exist == null) {
- System.err.println("No data for " + id);
- } else {
- infos.remove(id);
- }
- }
-
- @Override
- public synchronized String toString() {
- StringBuilder b = new StringBuilder();
- for (String s : infos.values()) {
- b.append("info " + s + "\r\n");
- }
- return b.toString();
- }
-
- public boolean isEmpty() {
- return infos.isEmpty();
- }
-
- }
-
- public void inc() {
-
- if (BOOKKEEPING)
- inc(this, new Exception().getStackTrace()[2].toString());
- else
- inc(null, null);
-
- if (RESTART_GUARD)
- if(restartMap.containsKey(this))
- throw new IllegalStateException("Unplanned restart");
-
- }
-
- private void inc(Object id, String info) {
-
- // if (PRINT) {
- // if (get() < 5)
- // new Exception("inc " + get() + " " + this).printStackTrace();
- // }
-
- if (BOOKKEEPING) {
-// Debugger debugger = debuggerMap.get(this);
-// if (debugger != null)
-// debugger.inc(id, info);
- }
-
- if(PRINT) {
-
- System.err.println("inc barrier[" + get() + "] " + this);
- StackTraceElement[] elems = new Exception().getStackTrace();
- for(int i=0;i<4;i++) System.err.println(elems[i]);
-
- }
-
- if (incrementAndGet() == 1) {
- if (caller != null) {
- if (BOOKKEEPING)
- caller.inc(this, "Child");
- else
- caller.inc(null, null);
- }
- }
-
- }
-
- public void dec() {
-
- if(PRINT) {
- System.err.println("dec barrier[" + get() + "] " + this);
- StackTraceElement[] elems = new Exception().getStackTrace();
- for(int i=0;i<3;i++) System.err.println(elems[i]);
- }
-
- int count = decrementAndGet();
- if (count < 1) {
- if (count == 0) {
- if (caller != null)
- caller.dec(this);
- // sema.release();
- }
- if (count < 0) {
- Logger.defaultLogError(
- "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
- new Exception());
- }
- assert (count >= 0);
- }
-
- }
-
- public void dec(Object id) {
-
- if (PRINT) {
- if (get() < 5)
- new Exception("dec" + get() + " " + this).printStackTrace();
- }
-
- if (BOOKKEEPING) {
-// Debugger debugger = debuggerMap.get(this);
-// if (debugger != null) {
-// debugger.dec(id);
-// if(debugger.isEmpty())
-// debuggerMap.remove(this);
-// }
- }
-
- int count = decrementAndGet();
- if (count < 1) {
- if (count == 0) {
- debuggerMap.remove(this);
- if (caller != null)
- caller.dec(this);
- if (RESTART_GUARD)
- restartMap.put(this, true);
- }
- if (count < 0) {
- Logger.defaultLogError(
- "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
- new Exception());
- System.exit(-1);
- }
- assert (count >= 0);
- }
- }
-
- public static String report(AsyncBarrierImpl barrier) {
- CacheEntry<?> e = entryMap.get(barrier);
- if(e != null) return e.toString();
- else return "Barrier@" + System.identityHashCode(barrier);
- }
-
- public static void printReverse(AsyncBarrierImpl barrier, int indent) {
-
- if (barrier.get() == 0)
- return;
- for (int i = 0; i < indent; i++)
- System.err.print(" ");
- System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
-// if (BOOKKEEPING) {
-// Debugger debugger = debuggerMap.get(barrier);
-// debugger.toErr(indent + 2);
-// }
-
- Collection<AsyncBarrierImpl> children = reverseLookup.get(barrier);
- if (children != null) {
- for (AsyncBarrierImpl child : children)
- printReverse(child, indent + 2);
- }
-
- }
-
- public void waitBarrier(Object request, ReadGraphImpl impl) {
-
- if (get() > 0) {
-
- long waitCount = 0;
-
- while (get() != 0) {
-
- boolean executed = impl.performPending();
- if(executed) waitCount = 0;
-
- ++waitCount;
- if(waitCount > 100) Thread.yield();
- if(waitCount > 1000) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- if(waitCount > WAIT_TIME*1000) {
-
- System.err.println("AsyncBarrierImpl.waitBarrier("
- + request
- + ") is taking long to execute, so far "
- + (waitCount / 1000) + " s.");
-
- if (BOOKKEEPING) {
-
- synchronized (reverseLookup) {
- printReverse(this, 0);
- }
-
- }
-
-// if(Development.DEVELOPMENT) {
-// impl.processor.threadLocks[0].lock();
-// System.err.println("-queues=" + impl.processor.queues[0].size());
-// impl.processor.threadLocks[0].unlock();
-// System.err.println("-own=" + impl.processor.ownTasks[0].size());
-// System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size());
-// for(SessionTask task : impl.processor.ownSyncTasks[0]) {
-// System.err.println("--" + task);
-// }
-// }
-
- throw new RuntimeDatabaseException("Request timed out.");
- //waitCount = 0;
-
- }
-
- }
-
- }
-
- }
-
- public void restart() {
- assertReady();
- // log.clear();
- // sema.drainPermits();
- if (RESTART_GUARD)
- restartMap.remove(this);
- if (BOOKKEEPING)
- debuggerMap.put(this, new Debugger());
- }
-
- public void assertReady() {
- int current = get();
- if (current != 0)
- throw new AssertionError("Barrier was not finished (pending="
- + current + ").");
- }
-
- public void report() {
- // System.out.println("Barrier log:");
- // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
- // System.out.println(entry.getKey() + " " + entry.getValue());
- // }
- // System.out.println("SyncIntProcedure log:");
- // for(Map.Entry<String, Integer> entry :
- // SyncIntProcedure.counters.entrySet()) {
- // System.out.println(entry.getKey() + " " + entry.getValue());
- // }
- }
-
- @Override
- public String toString() {
- return report(this);
-// return "AsyncBarrierImpl@" + System.identityHashCode(this)
-// + " - counter = " + get() + " - caller = " + caller;
- }