]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / graph / AsyncBarrierImpl.java
index 232e5cbd7e358b4cfb7171c5e4c1e62f71fa3ab9..f1e5b5cce10cc4758207a257c2f6cfeb4f0a3372 100644 (file)
  *******************************************************************************/
 package org.simantics.db.impl.graph;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.simantics.db.common.utils.Logger;
 import org.simantics.db.exception.RuntimeDatabaseException;
 import org.simantics.db.impl.query.CacheEntry;
 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
+import org.slf4j.LoggerFactory;
+
+public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
+
+    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AsyncBarrierImpl.class);
+
+    private static final long serialVersionUID = 4724463372850048672L;
+
+    static final int WAIT_TIME = 60000;
+
+    public static final boolean PRINT = false;
+
+    final AsyncBarrierImpl caller;
+
+    private final Runnable callback;
+
+    private final boolean needsToBlock;
+
+    public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry, Runnable callback, boolean needsToBlock) {
+        super(0);
+        this.caller = caller;
+        this.callback = callback;
+        this.needsToBlock = needsToBlock;
+        if (BarrierTracing.BOOKKEEPING) {
+            BarrierTracing.trace(this, entry);
+        }
+    }
+
+    public AsyncBarrier getBlockingBarrier() {
+        if(needsToBlock)
+            return this;
+        if(caller == null)
+            return null;
+        else return caller.getBlockingBarrier();
+    }
+
+    @Override
+    public boolean isBlocking() {
+        return needsToBlock;
+    }
+
+    @Override
+    public void inc() {
+
+        if(BarrierTracing.BOOKKEEPING) {
+            BarrierTracing.inc(this);
+        } else {
+            inc(null, null);
+        }
+
+    }
+
+    void inc(Object id, String info) {
+
+        if(PRINT) {
+            System.err.println("inc barrier[" + get() + "] " + this);
+            StackTraceElement[] elems = new Exception().getStackTrace();
+            for(int i=0;i<4;i++) System.err.println(elems[i]);
+        }
+
+        if (incrementAndGet() == 1) {
+            if (caller != null) {
+                if(BarrierTracing.BOOKKEEPING) {
+                    caller.inc(this, "Child");
+                } else {
+                    caller.inc(null, null);
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public void dec() {
+
+        if(PRINT) {
+            System.err.println("dec barrier[" + get() + "] " + this);
+            StackTraceElement[] elems = new Exception().getStackTrace();
+            for(int i=0;i<3;i++) System.err.println(elems[i]);
+        }
+
+        int count = decrementAndGet();
+        if (count < 1) {
+            if(BarrierTracing.BOOKKEEPING) {
+                BarrierTracing.dec(this, count);
+            }
+            if (count == 0) {
+                if (caller != null) {
+                    caller.dec();
+                }
+            }
+            if (count < 0) {
+                LOGGER.error(
+                        "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
+                        new Exception());
+            }
+            assert (count >= 0);
+
+            if(callback != null)
+                callback.run();
+
+        }
+
+    }
+
+    public static String report(AsyncBarrierImpl barrier) {
+        CacheEntry<?> e = BarrierTracing.entryMap.get(barrier);
+        if(e != null) return e.toString();
+        else return "Barrier@" + System.identityHashCode(barrier);
+    }
+
+    public static void printReverse(AsyncBarrierImpl barrier, int indent) {
+
+        if (barrier.get() == 0)
+            return;
+        for (int i = 0; i < indent; i++)
+            System.err.print(" ");
+        System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
+
+        Collection<AsyncBarrierImpl> children = BarrierTracing.reverseLookup.get(barrier);
+        if (children != null) {
+            for (AsyncBarrierImpl child : children)
+                printReverse(child, indent + 2);
+        }
+        
+    }
+
+    public void waitBarrier(Object request, ReadGraphImpl impl) {
+
+        if (get() > 0) {
+
+            long waitCount = 0;
+
+            while (get() != 0) {
+
+                boolean executed = impl.performPending();
+                if(executed) waitCount = 0;
+
+                ++waitCount;
+                if(waitCount > 100) Thread.yield();
+                if(waitCount > 1000) {
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                if(waitCount > WAIT_TIME) {
+
+                    System.err.println("AsyncBarrierImpl.waitBarrier("
+                            + request
+                            + ") is taking long to execute, so far "
+                            + (waitCount / 1000) + " s.");
+
+                    if (BarrierTracing.BOOKKEEPING) {
+                        synchronized (BarrierTracing.reverseLookup) {
+                            printReverse(this, 0);
+                        }
+                        BarrierTracing.printBAPS();
+                    }
+
+                    throw new RuntimeDatabaseException("Request timed out.");
+
+                }
+
+            }
+
+        }
+
+    }
+
+    public void restart() {
+        assertReady();
+        if(BarrierTracing.BOOKKEEPING) {
+            BarrierTracing.restart(this);
+        }
+    }
+
+    public void assertReady() {
+        int current = get();
+        if (current != 0)
+            throw new AssertionError("Barrier was not finished (pending="
+                    + current + ").");
+    }
+
+    public void report() {
+        // System.out.println("Barrier log:");
+        // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
+        // System.out.println(entry.getKey() + " " + entry.getValue());
+        // }
+        // System.out.println("SyncIntProcedure log:");
+        // for(Map.Entry<String, Integer> entry :
+        // SyncIntProcedure.counters.entrySet()) {
+        // System.out.println(entry.getKey() + " " + entry.getValue());
+        // }
+    }
+
+    @Override
+    public String toString() {
+        return report(this);
+        //             return "AsyncBarrierImpl@" + System.identityHashCode(this)
+        //                             + " - counter = " + get() + " - caller = " + caller;
+    }
 
-final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
-
-       private static final long serialVersionUID = 4724463372850048672L;
-
-       static final HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>> reverseLookup = new HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>>();
-       public static final HashMap<AsyncBarrierImpl, Debugger> debuggerMap = new HashMap<AsyncBarrierImpl, Debugger>();
-       static final HashMap<AsyncBarrierImpl, CacheEntry> entryMap = new HashMap<AsyncBarrierImpl, CacheEntry>();
-       static final HashMap<AsyncBarrierImpl, Boolean> restartMap = new HashMap<AsyncBarrierImpl, Boolean>();
-
-       static final int WAIT_TIME = 600;
-
-       public static final boolean BOOKKEEPING = true;
-       public static final boolean PRINT = false;
-       static final boolean RESTART_GUARD = true;
-
-       final public AsyncBarrierImpl caller;
-
-       //private final Semaphore sema = new Semaphore(0);
-
-       public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry entry) {
-               super(0);
-               if (BOOKKEEPING) {
-                       synchronized (entryMap) {
-                               entryMap.put(this, entry);
-                       }
-                       synchronized (debuggerMap) {
-                               debuggerMap.put(this, new Debugger());
-                       }
-                       synchronized (reverseLookup) {
-                               Collection<AsyncBarrierImpl> barriers = reverseLookup
-                                               .get(caller);
-                               if (barriers == null) {
-                                       barriers = new ArrayList<AsyncBarrierImpl>();
-                                       reverseLookup.put(caller, barriers);
-                               }
-                               barriers.add(this);
-                       }
-               }
-               this.caller = caller;
-       }
-
-       public class Debugger {
-               
-               public Map<AsyncBarrierImpl, String> infos = new HashMap<>();
-
-               public synchronized void inc(AsyncBarrierImpl id, String info) {
-                       if (id == null)
-                               return;
-                       String exist = infos.get(id);
-                       if (exist != null)
-                               throw new IllegalStateException("Already existing info " + id + " " + info);
-                       infos.put(id, exist);
-               }
-
-               public synchronized void dec(AsyncBarrierImpl id) {
-                       if (id == null)
-                               return;
-                       String exist = infos.get(id);
-                       if (exist == null) {
-                               System.err.println("No data for " + id);
-                       } else {
-                               infos.remove(id);
-                       }
-               }
-
-               @Override
-               public synchronized String toString() {
-                       StringBuilder b = new StringBuilder();
-                       for (String s : infos.values()) {
-                               b.append("info " + s + "\r\n");
-                       }
-                       return b.toString();
-               }
-               
-               public boolean isEmpty() {
-                       return infos.isEmpty();
-               }
-
-       }
-
-       public void inc() {
-
-               if (BOOKKEEPING)
-                       inc(this, new Exception().getStackTrace()[2].toString());
-               else
-                       inc(null, null);
-               
-               if (RESTART_GUARD)
-                       if(restartMap.containsKey(this))
-                               throw new IllegalStateException("Unplanned restart");
-
-       }
-
-       private void inc(Object id, String info) {
-
-               //              if (PRINT) {
-               //                      if (get() < 5)
-               //                              new Exception("inc " + get() + " " + this).printStackTrace();
-               //              }
-
-               if (BOOKKEEPING) {
-//                     Debugger debugger = debuggerMap.get(this);
-//                     if (debugger != null)
-//                             debugger.inc(id, info);
-               }
-
-               if(PRINT) {
-
-                       System.err.println("inc barrier[" + get() + "] " + this);
-                       StackTraceElement[] elems = new Exception().getStackTrace();
-                       for(int i=0;i<4;i++) System.err.println(elems[i]);
-
-               }
-
-               if (incrementAndGet() == 1) {
-                       if (caller != null) {
-                               if (BOOKKEEPING)
-                                       caller.inc(this, "Child");
-                               else
-                                       caller.inc(null, null);
-                       }
-               }
-
-       }
-
-       public void dec() {
-
-               if(PRINT) {
-                       System.err.println("dec barrier[" + get() + "] " + this);
-                       StackTraceElement[] elems = new Exception().getStackTrace();
-                       for(int i=0;i<3;i++) System.err.println(elems[i]);
-               }
-
-               int count = decrementAndGet();
-               if (count < 1) {
-                       if (count == 0) {
-                               if (caller != null)
-                                       caller.dec(this);
-                               //                              sema.release();
-                       }
-                       if (count < 0) {
-                               Logger.defaultLogError(
-                                               "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
-                                               new Exception());
-                       }
-                       assert (count >= 0);
-               }
-
-       }
-
-       public void dec(Object id) {
-
-               if (PRINT) {
-                       if (get() < 5)
-                               new Exception("dec" + get() + " " + this).printStackTrace();
-               }
-
-               if (BOOKKEEPING) {
-//                     Debugger debugger = debuggerMap.get(this);
-//                     if (debugger != null) {
-//                             debugger.dec(id);
-//                             if(debugger.isEmpty())
-//                                     debuggerMap.remove(this);
-//                     }
-               }
-
-               int count = decrementAndGet();
-               if (count < 1) {
-                       if (count == 0) {
-                               debuggerMap.remove(this);
-                               if (caller != null)
-                                       caller.dec(this);
-                               if (RESTART_GUARD)
-                                       restartMap.put(this, true);
-                       }
-                       if (count < 0) {
-                               Logger.defaultLogError(
-                                               "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
-                                               new Exception());
-                               System.exit(-1);
-                       }
-                       assert (count >= 0);
-               }
-       }
-
-       public static String report(AsyncBarrierImpl barrier) {
-               CacheEntry e = entryMap.get(barrier);
-               if(e != null) return e.toString();
-               else return "Barrier@" + System.identityHashCode(barrier);
-       }
-       
-       public static void printReverse(AsyncBarrierImpl barrier, int indent) {
-
-               if (barrier.get() == 0)
-                       return;
-               for (int i = 0; i < indent; i++)
-                       System.err.print(" ");
-               System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
-//             if (BOOKKEEPING) {
-//                     Debugger debugger = debuggerMap.get(barrier);
-//                     debugger.toErr(indent + 2);
-//             }
-
-               Collection<AsyncBarrierImpl> children = reverseLookup.get(barrier);
-               if (children != null) {
-                       for (AsyncBarrierImpl child : children)
-                               printReverse(child, indent + 2);
-               }
-
-       }
-
-       public void waitBarrier(Object request, ReadGraphImpl impl) {
-
-               if (get() > 0) {
-
-                       long waitCount = 0;
-
-                       while (get() != 0) {
-
-                               boolean executed = impl.performPending();
-                               if(executed) waitCount = 0;
-                               
-                               ++waitCount;
-                               if(waitCount > 100) Thread.yield();
-                               if(waitCount > 1000) {
-                                       try {
-                                               Thread.sleep(1);
-                                       } catch (InterruptedException e) {
-                                               e.printStackTrace();
-                                       }
-                               }
-                               if(waitCount > WAIT_TIME*1000) {
-
-                                       System.err.println("AsyncBarrierImpl.waitBarrier("
-                                                       + request
-                                                       + ") is taking long to execute, so far "
-                                                       + (waitCount / 1000) + " s.");
-
-                                       if (BOOKKEEPING) {
-
-                                               synchronized (reverseLookup) {
-                                                       printReverse(this, 0);
-                                               }
-
-                                       }
-
-//                                     if(Development.DEVELOPMENT) {
-//                                             impl.processor.threadLocks[0].lock();
-//                                             System.err.println("-queues=" + impl.processor.queues[0].size());
-//                                             impl.processor.threadLocks[0].unlock();
-//                                             System.err.println("-own=" + impl.processor.ownTasks[0].size());
-//                                             System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size());
-//                                             for(SessionTask task : impl.processor.ownSyncTasks[0]) {
-//                                                     System.err.println("--" + task);
-//                                             }
-//                                     }
-
-                                       throw new RuntimeDatabaseException("Request timed out.");
-                                       //waitCount = 0;
-
-                               }
-
-                       }
-
-               }
-
-       }
-
-       public void restart() {
-               assertReady();
-               // log.clear();
-               //              sema.drainPermits();
-               if (RESTART_GUARD)
-                       restartMap.remove(this);
-               if (BOOKKEEPING)
-                       debuggerMap.put(this, new Debugger());
-       }
-
-       public void assertReady() {
-               int current = get();
-               if (current != 0)
-                       throw new AssertionError("Barrier was not finished (pending="
-                                       + current + ").");
-       }
-
-       public void report() {
-               // System.out.println("Barrier log:");
-               // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
-               // System.out.println(entry.getKey() + " " + entry.getValue());
-               // }
-               // System.out.println("SyncIntProcedure log:");
-               // for(Map.Entry<String, Integer> entry :
-               // SyncIntProcedure.counters.entrySet()) {
-               // System.out.println(entry.getKey() + " " + entry.getValue());
-               // }
-       }
-
-       @Override
-       public String toString() {
-               return report(this);
-//             return "AsyncBarrierImpl@" + System.identityHashCode(this)
-//                             + " - counter = " + get() + " - caller = " + caller;
-       }
 
 }