]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java
Multiple readers in db client
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryCache.java
index 48d5646c27eaa3aaeee3ac636f78e165d968e7c9..35ebdbc65ba4ce82e5e7e7dced301310c30f8b17 100644 (file)
@@ -19,9 +19,8 @@ import org.simantics.db.request.Read;
 
 public class QueryCache extends QueryCacheBase {
 
-    // Using QueryChaching breaks Diagram Editor (and probably something else). 
-       private static final boolean SINGLE = false;
-    
+    private static final boolean SINGLE = true;
+
     public QueryCache(QuerySupport querySupport, int threads) {
         super(querySupport, threads);
     }
@@ -32,18 +31,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (Objects)objectsMap.get(r1,r2);
             if(existing == null) {
                 existing = new Objects(r1,r2);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 objectsMap.put(keyR2(r1,r2), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -68,12 +68,14 @@ public class QueryCache extends QueryCacheBase {
         }
         Objects entry = (Objects)cache.getOrCreateObjects(graph, r1,r2);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           Objects.computeForEach(graph, r1,r2, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -89,18 +91,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (Statements)statementsMap.get(r1,r2);
             if(existing == null) {
                 existing = new Statements(r1,r2);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 statementsMap.put(keyR2(r1,r2), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -125,12 +128,14 @@ public class QueryCache extends QueryCacheBase {
         }
         Statements entry = (Statements)cache.getOrCreateStatements(graph, r1,r2);
         TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           Statements.computeForEach(graph, r1,r2, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -146,18 +151,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (DirectObjects)directObjectsMap.get(r1,r2);
             if(existing == null) {
                 existing = new DirectObjects(r1,r2);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 directObjectsMap.put(keyR2(r1,r2), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -182,12 +188,14 @@ public class QueryCache extends QueryCacheBase {
         }
         DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph, r1,r2);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           DirectObjects.computeForEach(graph, r1,r2, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -203,18 +211,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (RelationInfoQuery)relationInfoQueryMap.get(r);
             if(existing == null) {
                 existing = new RelationInfoQuery(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 relationInfoQueryMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -239,12 +248,14 @@ public class QueryCache extends QueryCacheBase {
         }
         RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph, r);
         InternalProcedure<RelationInfo> procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           RelationInfoQuery.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -260,18 +271,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (URIToResource)uRIToResourceMap.get(id);
             if(existing == null) {
                 existing = new URIToResource(id);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 uRIToResourceMap.put(keyID(id), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -296,12 +308,14 @@ public class QueryCache extends QueryCacheBase {
         }
         URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph, id);
         InternalProcedure<Integer> procedure_ = procedure != null ? procedure : emptyProcedureURIToResource;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           URIToResource.computeForEach(graph, id, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -317,18 +331,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (ValueQuery)valueQueryMap.get(r);
             if(existing == null) {
                 existing = new ValueQuery(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 valueQueryMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -353,12 +368,14 @@ public class QueryCache extends QueryCacheBase {
         }
         ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph, r);
         InternalProcedure<byte[]> procedure_ = procedure != null ? procedure : emptyProcedureValueQuery;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           ValueQuery.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -374,18 +391,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (OrderedSet)orderedSetMap.get(r);
             if(existing == null) {
                 existing = new OrderedSet(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 orderedSetMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -410,12 +428,14 @@ public class QueryCache extends QueryCacheBase {
         }
         OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           OrderedSet.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -431,18 +451,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (PrincipalTypes)principalTypesMap.get(r);
             if(existing == null) {
                 existing = new PrincipalTypes(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 principalTypesMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -467,12 +488,14 @@ public class QueryCache extends QueryCacheBase {
         }
         PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           PrincipalTypes.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -488,18 +511,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (DirectPredicates)directPredicatesMap.get(r);
             if(existing == null) {
                 existing = new DirectPredicates(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 directPredicatesMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -524,12 +548,14 @@ public class QueryCache extends QueryCacheBase {
         }
         DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           DirectPredicates.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -545,18 +571,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (Predicates)predicatesMap.get(r);
             if(existing == null) {
                 existing = new Predicates(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 predicatesMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -581,12 +608,14 @@ public class QueryCache extends QueryCacheBase {
         }
         Predicates entry = (Predicates)cache.getOrCreatePredicates(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedurePredicates;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           Predicates.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -596,38 +625,39 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read<?> r, boolean needsToBlock) throws DatabaseException {
+    private final ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read<?> r, boolean needsToBlock) throws DatabaseException {
         ReadEntry existing = null;
         synchronized(readEntryMap) {
             existing = (ReadEntry)readEntryMap.get(r);
             if(existing == null) {
                 existing = new ReadEntry(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 readEntryMap.put(id(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
         if(existing.isPending()) {
-            if(needsToBlock) waitPending(graph, existing);
-            else return null;
+          if(needsToBlock)
+            waitPending(graph, existing);
+          else {
+            return null;
+          }
         }
         return existing;
     }
-
     
     void remove(ReadEntry entry) {
         synchronized(readEntryMap) {
-            readEntryMap.remove(entry.request);
+            readEntryMap.remove(entry.id);
         }
     }
     
-    public static Object runnerReadEntry(ReadGraphImpl graph, Read<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, final boolean needsToBlock) throws DatabaseException {
+    public static Object runnerReadEntry(ReadGraphImpl graph, Read<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
         if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
             if (SINGLE) {
@@ -636,33 +666,39 @@ public class QueryCache extends QueryCacheBase {
                     return e.performFromCache(graph, procedure);
                 }
             }
-            return ReadEntry.computeForEach(graph, r, null, procedure);
+            return ReadEntry.computeForEach(graph, r, null, procedure, needsToBlock);
         }
         ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock);
         if(entry == null) {
-            graph.processor.schedule(new SessionTask(graph) {
-                @Override
-                public void run0(int thread) {
-                    try {
-                        runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock);
-                    } catch (DatabaseException e) {
-                        Logger.defaultLogError(e);
-                    }
-                }
-            });
-            return null;
+          graph.asyncBarrier.inc();  
+          graph.processor.scheduleNow(new SessionTask() {
+            @Override
+            public void run0(int thread) {
+              try {
+                runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+                graph.asyncBarrier.dec();  
+              } catch (DatabaseException e) {
+                Logger.defaultLogError(e);
+              }
+            }
+          });
+          return null;
         }
         AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureReadEntry;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) return entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          Object result = entry.performFromCache(graph, procedure_);
+          graph.processor.listening.registerFirstKnown(listener, result);
+          return result;
+        }
         else {
           assert(entry.isPending());
-          Object result = ReadEntry.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          Object result = ReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock);
+          graph.processor.listening.registerFirstKnown(listener, result);
           return result;
         }
     }
-
     
     private ReadEntry peekReadEntry(Read<?> r) {
         synchronized(readEntryMap) {
@@ -670,38 +706,49 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, boolean needsToBlock) throws DatabaseException {
+    AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException {
         AsyncReadEntry existing = null;
         synchronized(asyncReadEntryMap) {
             existing = (AsyncReadEntry)asyncReadEntryMap.get(r);
             if(existing == null) {
                 existing = new AsyncReadEntry(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 asyncReadEntryMap.put(id(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
         if(existing.isPending()) {
-            if(needsToBlock) waitPending(graph, existing);
-            else return null;
+            if(needsToBlock)
+                waitPending(graph, existing);
+            else {
+                existing.executeWhenResultIsAvailable(graph.processor, new SessionTask(graph) {
+                    @Override
+                    public void run0(int thread) {
+                        try {
+                            runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+                        } catch (DatabaseException e) {
+                            Logger.defaultLogError(e);
+                        }
+                    }
+                });
+                return null;
+            }
         }
         return existing;
     }
-
     
     void remove(AsyncReadEntry entry) {
         synchronized(asyncReadEntryMap) {
-            asyncReadEntryMap.remove(entry.request);
+            asyncReadEntryMap.remove(entry.id);
         }
     }
     
-    public static Object runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, final boolean needsToBlock) throws DatabaseException {
+    public static Object runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
         if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
             if (SINGLE) {
@@ -712,31 +759,26 @@ public class QueryCache extends QueryCacheBase {
             }
             return AsyncReadEntry.computeForEach(graph, r, null, procedure, needsToBlock);
         }
-        AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock);
+        AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
         if(entry == null) {
-            graph.processor.schedule(new SessionTask(graph) {
-                @Override
-                public void run0(int thread) {
-                    try {
-                        runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
-                    } catch (DatabaseException e) {
-                        Logger.defaultLogError(e);
-                    }
-                }
-            });
+            // Entry was pending and this request has been queued  
             return null;
         }
         AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) return entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          Object result = entry.performFromCache(graph, procedure_);
+          graph.processor.listening.registerFirstKnown(listener, result);
+          return result;
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           Object result = AsyncReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          graph.processor.listening.registerFirstKnown(listener, result);
           return result;
         }
     }
-
     
     private AsyncReadEntry peekAsyncReadEntry(AsyncRead<?> r) {
         synchronized(asyncReadEntryMap) {
@@ -750,18 +792,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (Types)typesMap.get(r);
             if(existing == null) {
                 existing = new Types(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 typesMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -786,12 +829,14 @@ public class QueryCache extends QueryCacheBase {
         }
         Types entry = (Types)cache.getOrCreateTypes(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypes;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           Types.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -807,18 +852,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (ChildMap)childMapMap.get(r);
             if(existing == null) {
                 existing = new ChildMap(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 childMapMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -843,12 +889,14 @@ public class QueryCache extends QueryCacheBase {
         }
         ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph, r);
         InternalProcedure<ObjectResourceIdMap<String>> procedure_ = procedure != null ? procedure : emptyProcedureChildMap;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           ChildMap.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -864,18 +912,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (TypeHierarchy)typeHierarchyMap.get(r);
             if(existing == null) {
                 existing = new TypeHierarchy(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 typeHierarchyMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -900,12 +949,14 @@ public class QueryCache extends QueryCacheBase {
         }
         TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           TypeHierarchy.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -921,18 +972,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (SuperTypes)superTypesMap.get(r);
             if(existing == null) {
                 existing = new SuperTypes(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 superTypesMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -957,12 +1009,14 @@ public class QueryCache extends QueryCacheBase {
         }
         SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           SuperTypes.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -978,18 +1032,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (SuperRelations)superRelationsMap.get(r);
             if(existing == null) {
                 existing = new SuperRelations(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 superRelationsMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -1014,12 +1069,14 @@ public class QueryCache extends QueryCacheBase {
         }
         SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           SuperRelations.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1035,18 +1092,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (AssertedPredicates)assertedPredicatesMap.get(r);
             if(existing == null) {
                 existing = new AssertedPredicates(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 assertedPredicatesMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -1060,12 +1118,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1081,18 +1141,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (AssertedStatements)assertedStatementsMap.get(r1,r2);
             if(existing == null) {
                 existing = new AssertedStatements(r1,r2);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 assertedStatementsMap.put(keyR2(r1,r2), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -1106,12 +1167,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph, r1,r2);
         TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1127,18 +1190,19 @@ public class QueryCache extends QueryCacheBase {
             existing = (DirectSuperRelations)directSuperRelationsMap.get(r);
             if(existing == null) {
                 existing = new DirectSuperRelations(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 directSuperRelationsMap.put(keyR(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
@@ -1152,12 +1216,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1173,24 +1239,25 @@ public class QueryCache extends QueryCacheBase {
             existing = (MultiReadEntry)multiReadEntryMap.get(r);
             if(existing == null) {
                 existing = new MultiReadEntry(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 multiReadEntryMap.put(id(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
     void remove(MultiReadEntry entry) {
         synchronized(multiReadEntryMap) {
-            multiReadEntryMap.remove(entry.request);
+            multiReadEntryMap.remove(entry.id);
         }
     }
     
@@ -1198,12 +1265,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph, r);
         SyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1219,24 +1288,25 @@ public class QueryCache extends QueryCacheBase {
             existing = (AsyncMultiReadEntry)asyncMultiReadEntryMap.get(r);
             if(existing == null) {
                 existing = new AsyncMultiReadEntry(r);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 asyncMultiReadEntryMap.put(id(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
     
     void remove(AsyncMultiReadEntry entry) {
         synchronized(asyncMultiReadEntryMap) {
-            asyncMultiReadEntryMap.remove(entry.request);
+            asyncMultiReadEntryMap.remove(entry.id);
         }
     }
     
@@ -1244,12 +1314,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph, r);
         AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1265,25 +1337,25 @@ public class QueryCache extends QueryCacheBase {
             existing = (ExternalReadEntry)externalReadEntryMap.get(r);
             if(existing == null) {
                 existing = new ExternalReadEntry(r, graph);
-                existing.clearResult(querySupport);
-                existing.setPending();
+                existing.setPending(querySupport);
                 externalReadEntryMap.put(id(r), existing);
                 size++;
                 return existing;
             }
             if(existing.requiresComputation()) {
-                existing.setPending();
+                existing.setPending(querySupport);
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) {
+          waitPending(graph, existing);
+        }
         return existing;
     }
-
     
     void remove(ExternalReadEntry entry) {
         synchronized(externalReadEntryMap) {
-            externalReadEntryMap.remove(entry.request);
+            externalReadEntryMap.remove(entry.id);
         }
     }
     
@@ -1291,12 +1363,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph, r);
         AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }