Fixes based on feedback 08/2008/2
authorAntti Villberg <antti.villberg@semantum.fi>
Thu, 16 Aug 2018 07:30:24 +0000 (10:30 +0300)
committerTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Fri, 17 Aug 2018 08:40:49 +0000 (08:40 +0000)
gitlab #5

Change-Id: I39ecea8bca9bc518ec746bf3e306aca09e9f9237

bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ExternalReadEntry.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java

index ad9f1643829fe506ac57125edaf2bc724a249d05..2d3e2804c6ebf4d630984892195c6fe2e2e4b0fa 100644 (file)
@@ -14,6 +14,7 @@ package org.simantics.db.impl;
 import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.common.utils.Logger;
 import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.graph.AsyncBarrierImpl;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.procedure.AsyncProcedure;
 
@@ -22,50 +23,53 @@ public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
     private static final Object NO_RESULT = new Object();
 
     private final Object key;
-    private final ReadGraphImpl graph;
+    private final AsyncBarrierImpl barrier;
+    private final ReadGraphImpl procedureGraph;
     private final AsyncProcedure<Result> procedure;
 
     private Object result = NO_RESULT;
     private Throwable exception = null;
 
-    public BlockingAsyncProcedure(ReadGraphImpl graph, AsyncProcedure<Result> procedure, Object key) {
+    public BlockingAsyncProcedure(AsyncBarrierImpl barrier, ReadGraphImpl procedureGraph, AsyncProcedure<Result> procedure, Object key) {
         this.procedure = procedure;
         this.key = key;
-        this.graph = graph;
-        this.graph.asyncBarrier.inc();
+        this.barrier = barrier;
+        this.barrier.inc();
+        this.procedureGraph = procedureGraph;
     }
 
     @Override
-    public void execute(AsyncReadGraph graph, Result result) {
+    public void execute(AsyncReadGraph graph_, Result result) {
         this.result = result;
-        this.graph.asyncBarrier.dec();
         try {
-            if(procedure != null) procedure.execute(graph, result);
+            if(procedure != null) procedure.execute(procedureGraph, result);
         } catch (Throwable throwable) {
             Logger.defaultLogError("AsyncProcedure.execute threw for " + procedure, throwable);
+        } finally {
+            barrier.dec();
         }
     }
 
     @Override
-    public void exception(AsyncReadGraph graph, Throwable t) {
+    public void exception(AsyncReadGraph graph_, Throwable t) {
         this.exception = t;
         try {
-            if(procedure != null) procedure.exception(graph, t);
+            if(procedure != null) procedure.exception(procedureGraph, t);
         } catch (Throwable throwable) {
             Logger.defaultLogError("AsyncProcedure.exception threw for " + procedure, throwable);
         } finally {
-            this.graph.asyncBarrier.dec();
+            barrier.dec();
         }
     }
 
     public void waitBarrier() {
-        graph.asyncBarrier.waitBarrier(key, graph);
+        barrier.waitBarrier(key, procedureGraph);
     }
 
     @SuppressWarnings("unchecked")
     public Result get() throws DatabaseException {
 
-        graph.asyncBarrier.waitBarrier(key, graph);
+        barrier.waitBarrier(key, procedureGraph);
 
         if(exception != null) {
             if(exception instanceof DatabaseException) throw (DatabaseException)exception;
@@ -77,7 +81,7 @@ public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
     }
 
     public boolean isDone() {
-        return graph.asyncBarrier.get() == 0;
+        return barrier.get() == 0;
     }
 
     @SuppressWarnings("unchecked")
index 229c3d6a9e965b5b14167896334c2e83b1b2508a..232e5cbd7e358b4cfb7171c5e4c1e62f71fa3ab9 100644 (file)
 package org.simantics.db.impl.graph;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.simantics.db.common.utils.Logger;
 import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.impl.query.CacheEntry;
 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
 
 final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
@@ -26,22 +27,26 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie
        private static final long serialVersionUID = 4724463372850048672L;
 
        static final HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>> reverseLookup = new HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>>();
-       static final HashMap<AsyncBarrierImpl, Debugger> debuggerMap = new HashMap<AsyncBarrierImpl, Debugger>();
+       public static final HashMap<AsyncBarrierImpl, Debugger> debuggerMap = new HashMap<AsyncBarrierImpl, Debugger>();
+       static final HashMap<AsyncBarrierImpl, CacheEntry> entryMap = new HashMap<AsyncBarrierImpl, CacheEntry>();
        static final HashMap<AsyncBarrierImpl, Boolean> restartMap = new HashMap<AsyncBarrierImpl, Boolean>();
 
        static final int WAIT_TIME = 600;
 
-       public static final boolean BOOKKEEPING = false;
+       public static final boolean BOOKKEEPING = true;
        public static final boolean PRINT = false;
-       static final boolean RESTART_GUARD = false;
+       static final boolean RESTART_GUARD = true;
 
-       final private AsyncBarrierImpl caller;
+       final public AsyncBarrierImpl caller;
 
        //private final Semaphore sema = new Semaphore(0);
 
-       public AsyncBarrierImpl(AsyncBarrierImpl caller) {
+       public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry entry) {
                super(0);
                if (BOOKKEEPING) {
+                       synchronized (entryMap) {
+                               entryMap.put(this, entry);
+                       }
                        synchronized (debuggerMap) {
                                debuggerMap.put(this, new Debugger());
                        }
@@ -59,82 +64,58 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie
        }
 
        public class Debugger {
-               public HashMap<Object, ArrayList<String>> infos = new HashMap<Object, ArrayList<String>>();
+               
+               public Map<AsyncBarrierImpl, String> infos = new HashMap<>();
 
-               public synchronized void inc(Object id, String info) {
+               public synchronized void inc(AsyncBarrierImpl id, String info) {
                        if (id == null)
                                return;
-                       ArrayList<String> exist = infos.get(id);
-                       if (exist == null) {
-                               exist = new ArrayList<String>();
-                               infos.put(id, exist);
-                       } else {
-                               // System.err.println("Appending " + id + " += " + info);
-                       }
-                       exist.add(info);
-                       // String exist = infos.put(id, info);
-                       // if(exist != null) System.err.println("replacing " + exist +
-                       // " => " + info + " for " + id);
+                       String exist = infos.get(id);
+                       if (exist != null)
+                               throw new IllegalStateException("Already existing info " + id + " " + info);
+                       infos.put(id, exist);
                }
 
-               public synchronized void dec(Object id) {
+               public synchronized void dec(AsyncBarrierImpl id) {
                        if (id == null)
                                return;
-                       ArrayList<String> exist = infos.get(id);
+                       String exist = infos.get(id);
                        if (exist == null) {
                                System.err.println("No data for " + id);
                        } else {
-                               exist.remove(0);
-                               if (exist.isEmpty())
-                                       infos.remove(id);
+                               infos.remove(id);
                        }
                }
 
                @Override
                public synchronized String toString() {
                        StringBuilder b = new StringBuilder();
-                       for (ArrayList<String> ss : infos.values()) {
-                               for (String s : ss)
-                                       b.append("info " + s + "\r\n");
+                       for (String s : infos.values()) {
+                               b.append("info " + s + "\r\n");
                        }
                        return b.toString();
                }
-
-               public synchronized void toErr(int indent) {
-                       char[] spaces = new char[indent];
-                       Arrays.fill(spaces, ' ');
-                       for (ArrayList<String> ss : infos.values()) {
-                               for (String s : ss) {
-                                       if (!s.startsWith("#"))
-                                               continue;
-                                       StringBuilder b = new StringBuilder();
-                                       b.append(spaces);
-                                       b.append(s);
-                                       System.err.println(b.toString());
-                               }
-                       }
+               
+               public boolean isEmpty() {
+                       return infos.isEmpty();
                }
-       }
-
-       public void inc() {
-
-               if (BOOKKEEPING)
-                       inc(new Object(), new Exception().getStackTrace()[1].toString());
-               else
-                       inc(null, null);
 
        }
 
-       public void inc(String debug) {
+       public void inc() {
 
                if (BOOKKEEPING)
-                       inc(new Object(), new Exception().getStackTrace()[1].toString());
+                       inc(this, new Exception().getStackTrace()[2].toString());
                else
                        inc(null, null);
+               
+               if (RESTART_GUARD)
+                       if(restartMap.containsKey(this))
+                               throw new IllegalStateException("Unplanned restart");
 
        }
 
-       public void inc(Object id, String info) {
+       private void inc(Object id, String info) {
 
                //              if (PRINT) {
                //                      if (get() < 5)
@@ -142,27 +123,11 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie
                //              }
 
                if (BOOKKEEPING) {
-                       Debugger debugger = debuggerMap.get(this);
-                       if (debugger != null)
-                               debugger.inc(id, info);
-                       // StackTraceElement[] tr = new Exception().getStackTrace();
-                       // if(tr.length == 4)
-                       // debugger.inc(new String[] { debug, tr[2].toString(),
-                       // tr[3].toString() });
-                       // else if(tr.length == 5)
-                       // debugger.inc(new String[] { debug, tr[2].toString(),
-                       // tr[3].toString(), tr[4].toString() });
-                       // else if(tr.length == 6)
-                       // debugger.inc(new String[] { debug, tr[2].toString(),
-                       // tr[3].toString(), tr[4].toString(), tr[5].toString() });
-                       // else
-                       // debugger.inc(new String[] { debug, tr[2].toString(),
-                       // tr[3].toString(), tr[4].toString(), tr[5].toString(),
-                       // tr[6].toString() });
+//                     Debugger debugger = debuggerMap.get(this);
+//                     if (debugger != null)
+//                             debugger.inc(id, info);
                }
 
-               //              new Exception().printStackTrace();
-
                if(PRINT) {
 
                        System.err.println("inc barrier[" + get() + "] " + this);
@@ -215,68 +180,50 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie
                }
 
                if (BOOKKEEPING) {
-                       Debugger debugger = debuggerMap.get(this);
-                       if (debugger != null)
-                               debugger.dec(id);
-                       // StackTraceElement[] tr = new Exception().getStackTrace();
-                       // if(tr.length == 3)
-                       // debugger.dec(new String[] { debug, tr[2].toString() });
-                       // else if(tr.length == 4)
-                       // debugger.dec(new String[] { debug, tr[2].toString(),
-                       // tr[3].toString() });
-                       // else
-                       // debugger.dec(new String[] { debug, tr[2].toString(),
-                       // tr[3].toString(), tr[4].toString() });
+//                     Debugger debugger = debuggerMap.get(this);
+//                     if (debugger != null) {
+//                             debugger.dec(id);
+//                             if(debugger.isEmpty())
+//                                     debuggerMap.remove(this);
+//                     }
                }
 
-               //              System.err.println("barrier " + this);
-               //              StackTraceElement[] elems = new Exception().getStackTrace();
-               //              for(int i=0;i<3;i++) System.err.println(elems[i]);
-               //              new Exception().printStackTrace();
-
                int count = decrementAndGet();
                if (count < 1) {
                        if (count == 0) {
+                               debuggerMap.remove(this);
                                if (caller != null)
                                        caller.dec(this);
                                if (RESTART_GUARD)
                                        restartMap.put(this, true);
-                               //                              sema.release();
-                               // if(DEBUGGER) {
-                               // debuggerMap.remove(this);
-                               // }
-                               // if(REVERSE_LOOKUP) {
-                               // reverseLookup.remove(this);
-                               // }
                        }
                        if (count < 0) {
                                Logger.defaultLogError(
                                                "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
                                                new Exception());
-                               // String message = ;
-                               // System.out.println(message);
-                               // if (DEBUGGER) {
-                               // JOptionPane.showMessageDialog(null, message);
-                               // System.out.println(debugger);
-                               // }
-                               // sema.release();
                                System.exit(-1);
                        }
                        assert (count >= 0);
                }
        }
 
-       private static void printReverse(AsyncBarrierImpl barrier, int indent) {
+       public static String report(AsyncBarrierImpl barrier) {
+               CacheEntry e = entryMap.get(barrier);
+               if(e != null) return e.toString();
+               else return "Barrier@" + System.identityHashCode(barrier);
+       }
+       
+       public static void printReverse(AsyncBarrierImpl barrier, int indent) {
 
                if (barrier.get() == 0)
                        return;
                for (int i = 0; i < indent; i++)
                        System.err.print(" ");
-               System.err.println("[" + barrier.get() + " requests]: " + barrier);
-               if (BOOKKEEPING) {
-                       Debugger debugger = debuggerMap.get(barrier);
-                       debugger.toErr(indent + 2);
-               }
+               System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
+//             if (BOOKKEEPING) {
+//                     Debugger debugger = debuggerMap.get(barrier);
+//                     debugger.toErr(indent + 2);
+//             }
 
                Collection<AsyncBarrierImpl> children = reverseLookup.get(barrier);
                if (children != null) {
@@ -374,8 +321,9 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie
 
        @Override
        public String toString() {
-               return "AsyncBarrierImpl@" + System.identityHashCode(this)
-                               + " - counter = " + get() + " - caller = " + caller;
+               return report(this);
+//             return "AsyncBarrierImpl@" + System.identityHashCode(this)
+//                             + " - counter = " + get() + " - caller = " + caller;
        }
 
 }
index c95cd93d94c836e4455a98fbed12b97176ffb47d..932e68c8feffdcc0d48f5494c864e465a58ee53e 100644 (file)
@@ -2048,21 +2048,6 @@ public class ReadGraphImpl implements AsyncReadGraph {
 
        }
 
-       final private <T> void syncRequest(final AsyncRead<T> request, final AsyncReadProcedure<T> procedure) throws DatabaseException {
-
-               assert (request != null);
-
-               ListenerBase listener = getListenerBase(procedure);
-               assert(listener == null);
-
-               BlockingAsyncProcedure<T> ap = new BlockingAsyncProcedure<>(this, procedure, request);
-
-               QueryCache.runnerAsyncReadEntry(this, request, parent, listener, ap, true);
-
-               ap.get();
-
-       }
-
        @Override
        public <T> T syncRequest(AsyncRead<T> request,
                        final SyncProcedure<T> procedure) throws DatabaseException {
@@ -5644,7 +5629,7 @@ public class ReadGraphImpl implements AsyncReadGraph {
                this.parentGraph = parentGraph;
                this.parent = parent;
                this.processor = support;
-               this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null);
+               this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent);
        }
 
        ReadGraphImpl(ReadGraphImpl graph, CacheEntry parent) {
index e6d42750d37b9caf62196bd50c043b802093d7e6..f409b40c8c330069023ede4686c6363ba76ae948 100644 (file)
@@ -13,7 +13,6 @@ package org.simantics.db.impl.query;
 
 import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.exception.DatabaseException;
-import org.simantics.db.exception.RuntimeDatabaseException;
 import org.simantics.db.impl.BlockingAsyncProcedure;
 import org.simantics.db.impl.DebugPolicy;
 import org.simantics.db.impl.graph.AsyncBarrierImpl;
@@ -72,7 +71,7 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
 
                 try {
 
-                    BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(graph, new AsyncProcedure<T>() {
+                    BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(graph.asyncBarrier, graph, new AsyncProcedure<T>() {
 
                         @Override
                         public void execute(AsyncReadGraph graph, T result) {
@@ -154,10 +153,11 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
 
         ReadGraphImpl queryGraph = graph.withParent(entry);
         
-        BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph, null, request);
+        BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph.asyncBarrier, graph, null, request);
         
         class AsyncTask extends SessionTask {
 
+               int counter = 0;
             T result;
             DatabaseException exception;
             
@@ -181,9 +181,21 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
                         exception = dbe;
                     } finally {
                         if (entry != null)
-                            entry.performFromCache(queryGraph, procedure_);
+                            entry.performFromCache(graph, procedure_);
                     }
                 } else {
+                       if(counter++ > 10000) {
+                               AsyncBarrierImpl.printReverse(queryGraph.asyncBarrier, 2);
+                               AsyncBarrierImpl caller = queryGraph.asyncBarrier.caller;
+                               while(caller != null) {
+                                       System.err.println("called by " + AsyncBarrierImpl.report(caller));
+                                       caller = caller.caller;
+                               }
+                               for(AsyncBarrierImpl ab : AsyncBarrierImpl.debuggerMap.keySet()) {
+                               AsyncBarrierImpl.printReverse(ab, 2);
+                               }
+                               throw new IllegalStateException("Eternal loop in queries.");
+                       }
                     graph.processor.schedule(this);            
                 }
             }
index d61049744a3cbdbdb09c85040833ffc0e8329d91..661076e8604bacd47504ed86607df88bca54eee8 100644 (file)
@@ -17,14 +17,17 @@ import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.DebugPolicy;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.procedure.Listener;
 import org.simantics.db.request.ExternalRead;
 import org.simantics.db.request.RequestFlags;
 
-final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> {
+final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements Listener<T> {
 
     final LinkedList<T> items = new LinkedList<T>();
 
     protected ExternalRead<T> request;
+    protected ReadGraphImpl graph;
+    protected boolean registered = false;
 
     @Override
     int makeHash() {
@@ -44,6 +47,7 @@ final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>
     public void discard() {
         request.unregistered();
         request = null;
+        graph = null;
         super.discard();
     }
 
@@ -56,49 +60,15 @@ final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>
         result = REQUIRES_COMPUTATION;
     }
     
-    public ExternalReadEntry(ExternalRead<T> request) {
+    public ExternalReadEntry(ExternalRead<T> request, ReadGraphImpl graph) {
         assert request != null;
         this.request = request;
-    }
-    
-    final public void queue(T item) {
-       synchronized(items) {
-               items.addLast(item);
-               // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
-               // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
-       }
-    }
-    
-    final public void addOrSet(QueryProcessor processor, Object item) {
-
-        try {
-        
-            assert(isPending());
-
-            //ArrayList<Procedure<T>> p = null;
-
-            synchronized(this) {
-
-                setResult(item);
-                setReady();
-//                p = procs;
-//                procs = null;
-
-            }
-
-//            if(p != null)
-//                for(Procedure proc : p) {
-//                    proc.execute((T)item);
-//                }
-
-        } catch (Throwable t) {
-            t.printStackTrace();
-        }
-        
+        this.graph = graph;
     }
     
     @Override
     public void except(Throwable t) {
+       
         if(DebugPolicy.QUERY_STATE) System.out.println("[QUERY STATE]: excepted " + this);
         if(statusOrException != DISCARDED) {
             statusOrException = EXCEPTED;
@@ -106,14 +76,23 @@ final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>
         } else {
             result = t;
         }
+        
         assert(isExcepted());
+        
     }
     
     @Override
     public void setResult(Object result) {
+
         super.setResult(result);
         assert(!(result instanceof Throwable));
         assert(!isExcepted());
+        
+    }
+    
+    @Override
+    public void setReady() {
+       super.setReady();
     }
 
     @Override
@@ -126,7 +105,6 @@ final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>
 
                                synchronized(items) {
 
-
                                        // Update
                                        if(!items.isEmpty()) {
                                            setReady();
@@ -192,7 +170,59 @@ final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>
     }
 
     public Object compute(ReadGraphImpl graph, AsyncProcedure<T> procedure) throws DatabaseException {
-        return graph.processor.cache.performQuery(graph, request, this, procedure);
+       
+               try {
+                       
+                       ReadGraphImpl queryGraph = graph.withParent(this);
+
+                       if(!registered) {
+                               request.register(graph, this);
+                               registered = true;
+                       }
+                       
+                       queryGraph.asyncBarrier.waitBarrier(request, graph);
+
+               } catch (Throwable t) {
+
+                       except(t);
+
+               }
+
+               performFromCache(graph, procedure);
+               
+               return getResult();
+
     }
 
+       @Override
+       public void execute(T result) {
+
+       if(this.result == REQUIRES_COMPUTATION) {
+               
+                       setResult(result);
+                       setReady();
+
+       } else {
+
+               synchronized(items) {
+                       items.addLast(result);
+                               graph.processor.updatePrimitive(request);
+                       // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
+                       // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
+               }
+               
+       }
+               
+       }
+
+       @Override
+       public void exception(Throwable t) {
+               except(t);
+       }
+
+       @Override
+       public boolean isDisposed() {
+               return registered && (isDiscarded() || !graph.processor.isBound(this));
+       }
+    
 }
index 86f759362bdd3a5ec216949d467ffd3a8b8749c3..57c1c68c76df0ca930a4655d40e83e18b1ddc9db 100644 (file)
@@ -1008,7 +1008,7 @@ public class QueryCache extends QueryCacheBase {
         synchronized(externalReadEntryMap) {
             existing = (ExternalReadEntry)externalReadEntryMap.get(r);
             if(existing == null) {
-                existing = new ExternalReadEntry(r);
+                existing = new ExternalReadEntry(r, graph);
                 existing.clearResult(querySupport);
                 existing.setPending();
                 externalReadEntryMap.put(id(r), existing);
index d7a235fc813d4f302e719bb66647cd2cef972d91..6abaf6b6518e7b9239dad4830aa9acadd65492c5 100644 (file)
@@ -189,71 +189,6 @@ public class QueryCacheBase {
 //             
 //     }
 
-       public <T> Object performQuery(ReadGraphImpl parentGraph, final ExternalRead<T> query, final CacheEntryBase entry_, AsyncProcedure procedure_) throws DatabaseException {
-
-               ExternalReadEntry entry = (ExternalReadEntry)entry_;
-               AsyncProcedure<T> procedure = (AsyncProcedure<T>)procedure_;
-               
-               try {
-
-                       query.register(parentGraph, new Listener<T>() {
-
-                               AtomicBoolean used = new AtomicBoolean(false);
-
-                               @Override
-                               public void execute(T result) {
-
-                                       // Just for safety
-                                       if(entry.isDiscarded()) return;
-
-                                       if(used.compareAndSet(false, true)) {
-                                               //entry.setPending();
-                                               entry.addOrSet(parentGraph.processor, result);
-                                               procedure.execute(parentGraph, result);
-                                       } else {
-                                               entry.queue(result);
-                                               parentGraph.processor.updatePrimitive(query);
-                                       }
-
-                               }
-
-                               @Override
-                               public void exception(Throwable t) {
-
-                                       entry.except(t);
-
-                                       if(used.compareAndSet(false, true)) {
-                                               procedure.exception(parentGraph, t);
-                                       } else {
-//                                             entry.queue(result);
-                                               parentGraph.processor.updatePrimitive(query);
-                                       }
-
-                               }
-
-                               @Override
-                               public String toString() {
-                                       return procedure.toString();
-                               }
-
-                               @Override
-                               public boolean isDisposed() {
-                                       return entry.isDiscarded() || !parentGraph.processor.isBound(entry);
-                               }
-
-                       });
-
-                       return entry.getResult();
-
-               } catch (Throwable t) {
-
-                       entry.except(t);
-                       procedure.exception(parentGraph, t);
-                       return entry.getResult();
-
-               }
-
-       }
 
        public <T> Object performQuery(ReadGraphImpl parentGraph, final AsyncMultiRead<T> query, final CacheEntryBase entry_, Object procedure_) throws DatabaseException {
 
index 8a634d1cfb05504a7ffa4d86db2ed12c308aa2e1..a08011fed1172615a2f937cb9de08bc099f3f1de 100644 (file)
@@ -1637,7 +1637,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 //                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
 //                                procedure, "request");
 
-                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
+                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
 
                        try {