X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryCache.java;h=1467015512e38f81e6639fa8077f1808ec37865d;hb=de8b6e194da6d56c410101d9c72763078b0e8717;hp=49e02dece32049884f7a3907e85f960f2bf27f64;hpb=3850fec72035293b9a4ede780d01aedc5fbc9056;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java index 49e02dece..146701551 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java @@ -2,9 +2,11 @@ package org.simantics.db.impl.query; import org.simantics.db.ObjectResourceIdMap; import org.simantics.db.RelationInfo; +import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.procedure.InternalProcedure; +import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.procedure.AsyncMultiProcedure; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.procedure.ListenerBase; @@ -14,15 +16,13 @@ import org.simantics.db.request.ExternalRead; import org.simantics.db.request.MultiRead; import org.simantics.db.request.Read; -import gnu.trove.map.hash.TObjectIntHashMap; - public class QueryCache extends QueryCacheBase { public QueryCache(QuerySupport querySupport, int threads) { super(querySupport, threads); } - Objects getOrCreateObjects(int r1, int r2) throws DatabaseException { + Objects getOrCreateObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException { Objects existing = null; synchronized(objectsMap) { existing = (Objects)objectsMap.get(r1,r2); @@ -39,7 +39,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -49,23 +49,24 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerObjects(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, IntProcedure procedure) throws DatabaseException { + public static void runnerObjects(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r1,r2)) { Objects.computeForEach(graph, r1,r2, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureObjects; - Objects entry = (Objects)cache.getOrCreateObjects(r1,r2); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + Objects entry = (Objects)cache.getOrCreateObjects(graph.processor, r1,r2); + IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - Objects.computeForEach(graph, r1,r2, entry, procedure); + assert(entry.isPending()); + Objects.computeForEach(graph, r1,r2, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - Statements getOrCreateStatements(int r1, int r2) throws DatabaseException { + Statements getOrCreateStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException { Statements existing = null; synchronized(statementsMap) { existing = (Statements)statementsMap.get(r1,r2); @@ -82,7 +83,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -92,23 +93,24 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, TripleIntProcedure procedure) throws DatabaseException { + public static void runnerStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final TripleIntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r1,r2)) { Statements.computeForEach(graph, r1,r2, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureStatements; - Statements entry = (Statements)cache.getOrCreateStatements(r1,r2); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + Statements entry = (Statements)cache.getOrCreateStatements(graph.processor, r1,r2); + TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - Statements.computeForEach(graph, r1,r2, entry, procedure); + assert(entry.isPending()); + Statements.computeForEach(graph, r1,r2, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - DirectObjects getOrCreateDirectObjects(int r1, int r2) throws DatabaseException { + DirectObjects getOrCreateDirectObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException { DirectObjects existing = null; synchronized(directObjectsMap) { existing = (DirectObjects)directObjectsMap.get(r1,r2); @@ -125,7 +127,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -135,23 +137,24 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerDirectObjects(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, IntProcedure procedure) throws DatabaseException { + public static void runnerDirectObjects(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r1,r2)) { DirectObjects.computeForEach(graph, r1,r2, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureDirectObjects; - DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(r1,r2); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph.processor, r1,r2); + IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - DirectObjects.computeForEach(graph, r1,r2, entry, procedure); + assert(entry.isPending()); + DirectObjects.computeForEach(graph, r1,r2, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - RelationInfoQuery getOrCreateRelationInfoQuery(int r) throws DatabaseException { + RelationInfoQuery getOrCreateRelationInfoQuery(QueryProcessor processor, int r) throws DatabaseException { RelationInfoQuery existing = null; synchronized(relationInfoQueryMap) { existing = (RelationInfoQuery)relationInfoQueryMap.get(r); @@ -168,7 +171,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -178,23 +181,24 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerRelationInfoQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { + public static void runnerRelationInfoQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { RelationInfoQuery.computeForEach(graph, r, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureRelationInfoQuery; - RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph.processor, r); + InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - RelationInfoQuery.computeForEach(graph, r, entry, procedure); + assert(entry.isPending()); + RelationInfoQuery.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - URIToResource getOrCreateURIToResource(String id) throws DatabaseException { + URIToResource getOrCreateURIToResource(QueryProcessor processor, String id) throws DatabaseException { URIToResource existing = null; synchronized(uRIToResourceMap) { existing = (URIToResource)uRIToResourceMap.get(id); @@ -211,7 +215,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -221,23 +225,24 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerURIToResource(ReadGraphImpl graph, String id, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { + public static void runnerURIToResource(ReadGraphImpl graph, String id, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, id)) { URIToResource.computeForEach(graph, id, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureURIToResource; - URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(id); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph.processor, id); + InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureURIToResource; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - URIToResource.computeForEach(graph, id, entry, procedure); + assert(entry.isPending()); + URIToResource.computeForEach(graph, id, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - ValueQuery getOrCreateValueQuery(int r) throws DatabaseException { + ValueQuery getOrCreateValueQuery(QueryProcessor processor, int r) throws DatabaseException { ValueQuery existing = null; synchronized(valueQueryMap) { existing = (ValueQuery)valueQueryMap.get(r); @@ -254,7 +259,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -264,23 +269,24 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerValueQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { + public static void runnerValueQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { ValueQuery.computeForEach(graph, r, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureValueQuery; - ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph.processor, r); + InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureValueQuery; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - ValueQuery.computeForEach(graph, r, entry, procedure); + assert(entry.isPending()); + ValueQuery.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - OrderedSet getOrCreateOrderedSet(int r) throws DatabaseException { + OrderedSet getOrCreateOrderedSet(QueryProcessor processor, int r) throws DatabaseException { OrderedSet existing = null; synchronized(orderedSetMap) { existing = (OrderedSet)orderedSetMap.get(r); @@ -297,7 +303,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -307,23 +313,24 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerOrderedSet(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, IntProcedure procedure) throws DatabaseException { + public static void runnerOrderedSet(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { OrderedSet.computeForEach(graph, r, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureOrderedSet; - OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph.processor, r); + IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - OrderedSet.computeForEach(graph, r, entry, procedure); + assert(entry.isPending()); + OrderedSet.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - PrincipalTypes getOrCreatePrincipalTypes(int r) throws DatabaseException { + PrincipalTypes getOrCreatePrincipalTypes(QueryProcessor processor, int r) throws DatabaseException { PrincipalTypes existing = null; synchronized(principalTypesMap) { existing = (PrincipalTypes)principalTypesMap.get(r); @@ -340,7 +347,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -350,23 +357,24 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerPrincipalTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, IntProcedure procedure) throws DatabaseException { + public static void runnerPrincipalTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { PrincipalTypes.computeForEach(graph, r, null, procedure); return; } - if(procedure == null) procedure = emptyProcedurePrincipalTypes; - PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph.processor, r); + IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - PrincipalTypes.computeForEach(graph, r, entry, procedure); + assert(entry.isPending()); + PrincipalTypes.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - DirectPredicates getOrCreateDirectPredicates(int r) throws DatabaseException { + DirectPredicates getOrCreateDirectPredicates(QueryProcessor processor, int r) throws DatabaseException { DirectPredicates existing = null; synchronized(directPredicatesMap) { existing = (DirectPredicates)directPredicatesMap.get(r); @@ -383,7 +391,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -393,23 +401,24 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerDirectPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { + public static void runnerDirectPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { DirectPredicates.computeForEach(graph, r, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureDirectPredicates; - DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph.processor, r); + InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - DirectPredicates.computeForEach(graph, r, entry, procedure); + assert(entry.isPending()); + DirectPredicates.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - Predicates getOrCreatePredicates(int r) throws DatabaseException { + Predicates getOrCreatePredicates(QueryProcessor processor, int r) throws DatabaseException { Predicates existing = null; synchronized(predicatesMap) { existing = (Predicates)predicatesMap.get(r); @@ -426,7 +435,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -436,23 +445,24 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { + public static void runnerPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { Predicates.computeForEach(graph, r, null, procedure); return; } - if(procedure == null) procedure = emptyProcedurePredicates; - Predicates entry = (Predicates)cache.getOrCreatePredicates(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + Predicates entry = (Predicates)cache.getOrCreatePredicates(graph.processor, r); + InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedurePredicates; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - Predicates.computeForEach(graph, r, entry, procedure); + assert(entry.isPending()); + Predicates.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - ReadEntry getOrCreateReadEntry(Read r) throws DatabaseException { + ReadEntry getOrCreateReadEntry(QueryProcessor processor, Read r, boolean isSync) throws DatabaseException { ReadEntry existing = null; synchronized(readEntryMap) { existing = (ReadEntry)readEntryMap.get(r); @@ -469,7 +479,10 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) { + if(isSync) waitPending(processor, existing); + else return null; + } return existing; } @@ -479,23 +492,38 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, AsyncProcedure procedure) throws DatabaseException { + public static void runnerReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean isSync) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { ReadEntry.computeForEach(graph, r, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureReadEntry; - ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph.processor, r, isSync); + if(entry == null) { + graph.processor.schedule(new SessionTask(false) { + @Override + public void run(int thread) { + try { + assert(!isSync); + runnerReadEntry(graph, r, parent, listener, procedure, isSync); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + } + }); + return; + } + AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureReadEntry; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - ReadEntry.computeForEach(graph, r, entry, procedure); + assert(entry.isPending()); + ReadEntry.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - AsyncReadEntry getOrCreateAsyncReadEntry(AsyncRead r) throws DatabaseException { + AsyncReadEntry getOrCreateAsyncReadEntry(QueryProcessor processor, AsyncRead r, boolean isSync) throws DatabaseException { AsyncReadEntry existing = null; synchronized(asyncReadEntryMap) { existing = (AsyncReadEntry)asyncReadEntryMap.get(r); @@ -512,7 +540,10 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) { + if(isSync) waitPending(processor, existing); + else return null; + } return existing; } @@ -522,23 +553,38 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, AsyncProcedure procedure) throws DatabaseException { + public static void runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean isSync) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { AsyncReadEntry.computeForEach(graph, r, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureAsyncReadEntry; - AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph.processor, r, isSync); + if(entry == null) { + graph.processor.schedule(new SessionTask(false) { + @Override + public void run(int thread) { + try { + assert(!isSync); + runnerAsyncReadEntry(graph, r, parent, listener, procedure, isSync); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + } + }); + return; + } + AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - AsyncReadEntry.computeForEach(graph, r, entry, procedure); + assert(entry.isPending()); + AsyncReadEntry.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - Types getOrCreateTypes(int r) throws DatabaseException { + Types getOrCreateTypes(QueryProcessor processor, int r) throws DatabaseException { Types existing = null; synchronized(typesMap) { existing = (Types)typesMap.get(r); @@ -555,7 +601,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -565,23 +611,24 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { + public static void runnerTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { Types.computeForEach(graph, r, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureTypes; - Types entry = (Types)cache.getOrCreateTypes(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + Types entry = (Types)cache.getOrCreateTypes(graph.processor, r); + InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureTypes; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - Types.computeForEach(graph, r, entry, procedure); + assert(entry.isPending()); + Types.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - ChildMap getOrCreateChildMap(int r) throws DatabaseException { + ChildMap getOrCreateChildMap(QueryProcessor processor, int r) throws DatabaseException { ChildMap existing = null; synchronized(childMapMap) { existing = (ChildMap)childMapMap.get(r); @@ -598,7 +645,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -608,31 +655,32 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerChildMap(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure> procedure) throws DatabaseException { + public static void runnerChildMap(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure> procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { ChildMap.computeForEach(graph, r, null, procedure); return; } - if(procedure == null) procedure = emptyProcedureChildMap; - ChildMap entry = (ChildMap)cache.getOrCreateChildMap(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph.processor, r); + InternalProcedure> procedure_ = procedure != null ? procedure : emptyProcedureChildMap; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - ChildMap.computeForEach(graph, r, entry, procedure); + assert(entry.isPending()); + ChildMap.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - AssertedStatements getOrCreateAssertedStatements(int r1, int r2) throws DatabaseException { - AssertedStatements existing = null; - synchronized(assertedStatementsMap) { - existing = (AssertedStatements)assertedStatementsMap.get(r1,r2); + TypeHierarchy getOrCreateTypeHierarchy(QueryProcessor processor, int r) throws DatabaseException { + TypeHierarchy existing = null; + synchronized(typeHierarchyMap) { + existing = (TypeHierarchy)typeHierarchyMap.get(r); if(existing == null) { - existing = new AssertedStatements(r1,r2); + existing = new TypeHierarchy(r); existing.clearResult(querySupport); existing.setPending(); - assertedStatementsMap.put(keyR2(r1,r2), existing); + typeHierarchyMap.put(keyR(r), existing); size++; return existing; } @@ -641,37 +689,42 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } - void remove(AssertedStatements entry) { - synchronized(assertedStatementsMap) { - assertedStatementsMap.remove(entry.id); + void remove(TypeHierarchy entry) { + synchronized(typeHierarchyMap) { + typeHierarchyMap.remove(entry.id); } } - public static void runnerAssertedStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, TripleIntProcedure procedure) throws DatabaseException { + public static void runnerTypeHierarchy(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - if(procedure == null) procedure = emptyProcedureAssertedStatements; - AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(r1,r2); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + TypeHierarchy.computeForEach(graph, r, null, procedure); + return; + } + TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph.processor, r); + InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - entry.compute(graph, procedure); + assert(entry.isPending()); + TypeHierarchy.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - AssertedPredicates getOrCreateAssertedPredicates(int r) throws DatabaseException { - AssertedPredicates existing = null; - synchronized(assertedPredicatesMap) { - existing = (AssertedPredicates)assertedPredicatesMap.get(r); + SuperTypes getOrCreateSuperTypes(QueryProcessor processor, int r) throws DatabaseException { + SuperTypes existing = null; + synchronized(superTypesMap) { + existing = (SuperTypes)superTypesMap.get(r); if(existing == null) { - existing = new AssertedPredicates(r); + existing = new SuperTypes(r); existing.clearResult(querySupport); existing.setPending(); - assertedPredicatesMap.put(keyR(r), existing); + superTypesMap.put(keyR(r), existing); size++; return existing; } @@ -680,37 +733,42 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } - void remove(AssertedPredicates entry) { - synchronized(assertedPredicatesMap) { - assertedPredicatesMap.remove(entry.id); + void remove(SuperTypes entry) { + synchronized(superTypesMap) { + superTypesMap.remove(entry.id); } } - public static void runnerAssertedPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, IntProcedure procedure) throws DatabaseException { + public static void runnerSuperTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - if(procedure == null) procedure = emptyProcedureAssertedPredicates; - AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + SuperTypes.computeForEach(graph, r, null, procedure); + return; + } + SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph.processor, r); + InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - entry.compute(graph, procedure); + assert(entry.isPending()); + SuperTypes.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - DirectSuperRelations getOrCreateDirectSuperRelations(int r) throws DatabaseException { - DirectSuperRelations existing = null; - synchronized(directSuperRelationsMap) { - existing = (DirectSuperRelations)directSuperRelationsMap.get(r); + SuperRelations getOrCreateSuperRelations(QueryProcessor processor, int r) throws DatabaseException { + SuperRelations existing = null; + synchronized(superRelationsMap) { + existing = (SuperRelations)superRelationsMap.get(r); if(existing == null) { - existing = new DirectSuperRelations(r); + existing = new SuperRelations(r); existing.clearResult(querySupport); existing.setPending(); - directSuperRelationsMap.put(keyR(r), existing); + superRelationsMap.put(keyR(r), existing); size++; return existing; } @@ -719,37 +777,42 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } - void remove(DirectSuperRelations entry) { - synchronized(directSuperRelationsMap) { - directSuperRelationsMap.remove(entry.id); + void remove(SuperRelations entry) { + synchronized(superRelationsMap) { + superRelationsMap.remove(entry.id); } } - public static void runnerDirectSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, IntProcedure procedure) throws DatabaseException { + public static void runnerSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - if(procedure == null) procedure = emptyProcedureDirectSuperRelations; - DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + SuperRelations.computeForEach(graph, r, null, procedure); + return; + } + SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph.processor, r); + InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - entry.compute(graph, procedure); + assert(entry.isPending()); + SuperRelations.computeForEach(graph, r, entry, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - SuperTypes getOrCreateSuperTypes(int r) throws DatabaseException { - SuperTypes existing = null; - synchronized(superTypesMap) { - existing = (SuperTypes)superTypesMap.get(r); + AssertedPredicates getOrCreateAssertedPredicates(QueryProcessor processor, int r) throws DatabaseException { + AssertedPredicates existing = null; + synchronized(assertedPredicatesMap) { + existing = (AssertedPredicates)assertedPredicatesMap.get(r); if(existing == null) { - existing = new SuperTypes(r); + existing = new AssertedPredicates(r); existing.clearResult(querySupport); existing.setPending(); - superTypesMap.put(keyR(r), existing); + assertedPredicatesMap.put(keyR(r), existing); size++; return existing; } @@ -758,37 +821,38 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } - void remove(SuperTypes entry) { - synchronized(superTypesMap) { - superTypesMap.remove(entry.id); + void remove(AssertedPredicates entry) { + synchronized(assertedPredicatesMap) { + assertedPredicatesMap.remove(entry.id); } } - public static void runnerSuperTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { + public static void runnerAssertedPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - if(procedure == null) procedure = emptyProcedureSuperTypes; - SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph.processor, r); + IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - entry.compute(graph, procedure); + assert(entry.isPending()); + entry.compute(graph, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - TypeHierarchy getOrCreateTypeHierarchy(int r) throws DatabaseException { - TypeHierarchy existing = null; - synchronized(typeHierarchyMap) { - existing = (TypeHierarchy)typeHierarchyMap.get(r); + AssertedStatements getOrCreateAssertedStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException { + AssertedStatements existing = null; + synchronized(assertedStatementsMap) { + existing = (AssertedStatements)assertedStatementsMap.get(r1,r2); if(existing == null) { - existing = new TypeHierarchy(r); + existing = new AssertedStatements(r1,r2); existing.clearResult(querySupport); existing.setPending(); - typeHierarchyMap.put(keyR(r), existing); + assertedStatementsMap.put(keyR2(r1,r2), existing); size++; return existing; } @@ -797,37 +861,38 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } - void remove(TypeHierarchy entry) { - synchronized(typeHierarchyMap) { - typeHierarchyMap.remove(entry.id); + void remove(AssertedStatements entry) { + synchronized(assertedStatementsMap) { + assertedStatementsMap.remove(entry.id); } } - public static void runnerTypeHierarchy(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { + public static void runnerAssertedStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final TripleIntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - if(procedure == null) procedure = emptyProcedureTypeHierarchy; - TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph.processor, r1,r2); + TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - entry.compute(graph, procedure); + assert(entry.isPending()); + entry.compute(graph, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - SuperRelations getOrCreateSuperRelations(int r) throws DatabaseException { - SuperRelations existing = null; - synchronized(superRelationsMap) { - existing = (SuperRelations)superRelationsMap.get(r); + DirectSuperRelations getOrCreateDirectSuperRelations(QueryProcessor processor, int r) throws DatabaseException { + DirectSuperRelations existing = null; + synchronized(directSuperRelationsMap) { + existing = (DirectSuperRelations)directSuperRelationsMap.get(r); if(existing == null) { - existing = new SuperRelations(r); + existing = new DirectSuperRelations(r); existing.clearResult(querySupport); existing.setPending(); - superRelationsMap.put(keyR(r), existing); + directSuperRelationsMap.put(keyR(r), existing); size++; return existing; } @@ -836,29 +901,30 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } - void remove(SuperRelations entry) { - synchronized(superRelationsMap) { - superRelationsMap.remove(entry.id); + void remove(DirectSuperRelations entry) { + synchronized(directSuperRelationsMap) { + directSuperRelationsMap.remove(entry.id); } } - public static void runnerSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { + public static void runnerDirectSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - if(procedure == null) procedure = emptyProcedureSuperRelations; - SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph.processor, r); + IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - entry.compute(graph, procedure); + assert(entry.isPending()); + entry.compute(graph, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - MultiReadEntry getOrCreateMultiReadEntry(MultiRead r) throws DatabaseException { + MultiReadEntry getOrCreateMultiReadEntry(QueryProcessor processor, MultiRead r) throws DatabaseException { MultiReadEntry existing = null; synchronized(multiReadEntryMap) { existing = (MultiReadEntry)multiReadEntryMap.get(r); @@ -875,7 +941,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -885,19 +951,20 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerMultiReadEntry(ReadGraphImpl graph, MultiRead r, CacheEntry parent, ListenerBase listener, AsyncMultiProcedure procedure) throws DatabaseException { + public static void runnerMultiReadEntry(ReadGraphImpl graph, MultiRead r, CacheEntry parent, ListenerBase listener, final AsyncMultiProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - if(procedure == null) procedure = emptyProcedureMultiReadEntry; - MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph.processor, r); + AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - entry.compute(graph, procedure); + assert(entry.isPending()); + entry.compute(graph, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(AsyncMultiRead r) throws DatabaseException { + AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(QueryProcessor processor, AsyncMultiRead r) throws DatabaseException { AsyncMultiReadEntry existing = null; synchronized(asyncMultiReadEntryMap) { existing = (AsyncMultiReadEntry)asyncMultiReadEntryMap.get(r); @@ -914,7 +981,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -924,19 +991,20 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead r, CacheEntry parent, ListenerBase listener, AsyncMultiProcedure procedure) throws DatabaseException { + public static void runnerAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead r, CacheEntry parent, ListenerBase listener, final AsyncMultiProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - if(procedure == null) procedure = emptyProcedureAsyncMultiReadEntry; - AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph.processor, r); + AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - entry.compute(graph, procedure); + assert(entry.isPending()); + entry.compute(graph, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - ExternalReadEntry getOrCreateExternalReadEntry(ExternalRead r) throws DatabaseException { + ExternalReadEntry getOrCreateExternalReadEntry(QueryProcessor processor, ExternalRead r) throws DatabaseException { ExternalReadEntry existing = null; synchronized(externalReadEntryMap) { existing = (ExternalReadEntry)externalReadEntryMap.get(r); @@ -953,7 +1021,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -963,14 +1031,15 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerExternalReadEntry(ReadGraphImpl graph, ExternalRead r, CacheEntry parent, ListenerBase listener, AsyncProcedure procedure) throws DatabaseException { + public static void runnerExternalReadEntry(ReadGraphImpl graph, ExternalRead r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - if(procedure == null) procedure = emptyProcedureExternalReadEntry; - ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(r); - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure, false); - if(entry.isReady()) entry.performFromCache(graph, procedure); + ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph.processor, r); + AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry; + ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); + if(entry.isReady()) entry.performFromCache(graph, procedure_); else { - entry.compute(graph, procedure); + assert(entry.isPending()); + entry.compute(graph, procedure_); if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } }