X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryCache.java;h=35ebdbc65ba4ce82e5e7e7dced301310c30f8b17;hp=7b591abb7fd80e062308ce3df88c86e872cfe27e;hb=90a52d58489b6c518b13dde238923125c8f63eec;hpb=9f0fd59be54719b1fe9322d8fd37e4950857308c diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java index 7b591abb7..35ebdbc65 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java @@ -19,28 +19,31 @@ import org.simantics.db.request.Read; public class QueryCache extends QueryCacheBase { + private static final boolean SINGLE = true; + public QueryCache(QuerySupport querySupport, int threads) { super(querySupport, threads); } - Objects getOrCreateObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException { + Objects getOrCreateObjects(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { Objects existing = null; synchronized(objectsMap) { existing = (Objects)objectsMap.get(r1,r2); if(existing == null) { existing = new Objects(r1,r2); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); objectsMap.put(keyR2(r1,r2), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -53,38 +56,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerObjects(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r1,r2)) { + if (SINGLE) { + Objects e = cache.peekObjects(r1,r2); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } Objects.computeForEach(graph, r1,r2, null, procedure); return; } - Objects entry = (Objects)cache.getOrCreateObjects(graph.processor, r1,r2); + Objects entry = (Objects)cache.getOrCreateObjects(graph, r1,r2); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - Objects.computeForEach(graph, r1,r2, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + Objects.computeForEach(graph, r1,r2, entry, procedure_); } } - Statements getOrCreateStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException { + private Objects peekObjects(int r1, int r2) { + synchronized(objectsMap) { + return (Objects) objectsMap.get(r1,r2); + } + } + + Statements getOrCreateStatements(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { Statements existing = null; synchronized(statementsMap) { existing = (Statements)statementsMap.get(r1,r2); if(existing == null) { existing = new Statements(r1,r2); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); statementsMap.put(keyR2(r1,r2), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -97,38 +116,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final TripleIntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r1,r2)) { + if (SINGLE) { + Statements e = cache.peekStatements(r1,r2); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } Statements.computeForEach(graph, r1,r2, null, procedure); return; } - Statements entry = (Statements)cache.getOrCreateStatements(graph.processor, r1,r2); + Statements entry = (Statements)cache.getOrCreateStatements(graph, r1,r2); TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - Statements.computeForEach(graph, r1,r2, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + Statements.computeForEach(graph, r1,r2, entry, procedure_); } } - DirectObjects getOrCreateDirectObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException { + private Statements peekStatements(int r1, int r2) { + synchronized(statementsMap) { + return (Statements) statementsMap.get(r1,r2); + } + } + + DirectObjects getOrCreateDirectObjects(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { DirectObjects existing = null; synchronized(directObjectsMap) { existing = (DirectObjects)directObjectsMap.get(r1,r2); if(existing == null) { existing = new DirectObjects(r1,r2); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); directObjectsMap.put(keyR2(r1,r2), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -141,38 +176,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerDirectObjects(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r1,r2)) { + if (SINGLE) { + DirectObjects e = cache.peekDirectObjects(r1,r2); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } DirectObjects.computeForEach(graph, r1,r2, null, procedure); return; } - DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph.processor, r1,r2); + DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph, r1,r2); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - DirectObjects.computeForEach(graph, r1,r2, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + DirectObjects.computeForEach(graph, r1,r2, entry, procedure_); } } - RelationInfoQuery getOrCreateRelationInfoQuery(QueryProcessor processor, int r) throws DatabaseException { + private DirectObjects peekDirectObjects(int r1, int r2) { + synchronized(directObjectsMap) { + return (DirectObjects) directObjectsMap.get(r1,r2); + } + } + + RelationInfoQuery getOrCreateRelationInfoQuery(ReadGraphImpl graph, int r) throws DatabaseException { RelationInfoQuery existing = null; synchronized(relationInfoQueryMap) { existing = (RelationInfoQuery)relationInfoQueryMap.get(r); if(existing == null) { existing = new RelationInfoQuery(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); relationInfoQueryMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -185,38 +236,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerRelationInfoQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + if (SINGLE) { + RelationInfoQuery e = cache.peekRelationInfoQuery(r); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } RelationInfoQuery.computeForEach(graph, r, null, procedure); return; } - RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph.processor, r); + RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - RelationInfoQuery.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + RelationInfoQuery.computeForEach(graph, r, entry, procedure_); } } - URIToResource getOrCreateURIToResource(QueryProcessor processor, String id) throws DatabaseException { + private RelationInfoQuery peekRelationInfoQuery(int r) { + synchronized(relationInfoQueryMap) { + return (RelationInfoQuery) relationInfoQueryMap.get(r); + } + } + + URIToResource getOrCreateURIToResource(ReadGraphImpl graph, String id) throws DatabaseException { URIToResource existing = null; synchronized(uRIToResourceMap) { existing = (URIToResource)uRIToResourceMap.get(id); if(existing == null) { existing = new URIToResource(id); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); uRIToResourceMap.put(keyID(id), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -229,38 +296,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerURIToResource(ReadGraphImpl graph, String id, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, id)) { + if (SINGLE) { + URIToResource e = cache.peekURIToResource(id); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } URIToResource.computeForEach(graph, id, null, procedure); return; } - URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph.processor, id); + URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph, id); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureURIToResource; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - URIToResource.computeForEach(graph, id, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + URIToResource.computeForEach(graph, id, entry, procedure_); + } + } + + private URIToResource peekURIToResource(String id) { + synchronized(uRIToResourceMap) { + return (URIToResource) uRIToResourceMap.get(id); } } - ValueQuery getOrCreateValueQuery(QueryProcessor processor, int r) throws DatabaseException { + ValueQuery getOrCreateValueQuery(ReadGraphImpl graph, int r) throws DatabaseException { ValueQuery existing = null; synchronized(valueQueryMap) { existing = (ValueQuery)valueQueryMap.get(r); if(existing == null) { existing = new ValueQuery(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); valueQueryMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -273,38 +356,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerValueQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + if (SINGLE) { + ValueQuery e = cache.peekValueQuery(r); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } ValueQuery.computeForEach(graph, r, null, procedure); return; } - ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph.processor, r); + ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureValueQuery; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - ValueQuery.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + ValueQuery.computeForEach(graph, r, entry, procedure_); } } - OrderedSet getOrCreateOrderedSet(QueryProcessor processor, int r) throws DatabaseException { + private ValueQuery peekValueQuery(int r) { + synchronized(valueQueryMap) { + return (ValueQuery) valueQueryMap.get(r); + } + } + + OrderedSet getOrCreateOrderedSet(ReadGraphImpl graph, int r) throws DatabaseException { OrderedSet existing = null; synchronized(orderedSetMap) { existing = (OrderedSet)orderedSetMap.get(r); if(existing == null) { existing = new OrderedSet(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); orderedSetMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -317,38 +416,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerOrderedSet(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + if (SINGLE) { + OrderedSet e = cache.peekOrderedSet(r); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } OrderedSet.computeForEach(graph, r, null, procedure); return; } - OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph.processor, r); + OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - OrderedSet.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + OrderedSet.computeForEach(graph, r, entry, procedure_); + } + } + + private OrderedSet peekOrderedSet(int r) { + synchronized(orderedSetMap) { + return (OrderedSet) orderedSetMap.get(r); } } - PrincipalTypes getOrCreatePrincipalTypes(QueryProcessor processor, int r) throws DatabaseException { + PrincipalTypes getOrCreatePrincipalTypes(ReadGraphImpl graph, int r) throws DatabaseException { PrincipalTypes existing = null; synchronized(principalTypesMap) { existing = (PrincipalTypes)principalTypesMap.get(r); if(existing == null) { existing = new PrincipalTypes(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); principalTypesMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -361,38 +476,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerPrincipalTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + if (SINGLE) { + PrincipalTypes e = cache.peekPrincipalTypes(r); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } PrincipalTypes.computeForEach(graph, r, null, procedure); return; } - PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph.processor, r); + PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - PrincipalTypes.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + PrincipalTypes.computeForEach(graph, r, entry, procedure_); } } - DirectPredicates getOrCreateDirectPredicates(QueryProcessor processor, int r) throws DatabaseException { + private PrincipalTypes peekPrincipalTypes(int r) { + synchronized(principalTypesMap) { + return (PrincipalTypes) principalTypesMap.get(r); + } + } + + DirectPredicates getOrCreateDirectPredicates(ReadGraphImpl graph, int r) throws DatabaseException { DirectPredicates existing = null; synchronized(directPredicatesMap) { existing = (DirectPredicates)directPredicatesMap.get(r); if(existing == null) { existing = new DirectPredicates(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); directPredicatesMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -405,38 +536,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerDirectPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + if (SINGLE) { + DirectPredicates e = cache.peekDirectPredicates(r); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } DirectPredicates.computeForEach(graph, r, null, procedure); return; } - DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph.processor, r); + DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - DirectPredicates.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + DirectPredicates.computeForEach(graph, r, entry, procedure_); } } - Predicates getOrCreatePredicates(QueryProcessor processor, int r) throws DatabaseException { + private DirectPredicates peekDirectPredicates(int r) { + synchronized(directPredicatesMap) { + return (DirectPredicates) directPredicatesMap.get(r); + } + } + + Predicates getOrCreatePredicates(ReadGraphImpl graph, int r) throws DatabaseException { Predicates existing = null; synchronized(predicatesMap) { existing = (Predicates)predicatesMap.get(r); if(existing == null) { existing = new Predicates(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); predicatesMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -449,160 +596,215 @@ public class QueryCache extends QueryCacheBase { public static void runnerPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + if (SINGLE) { + Predicates e = cache.peekPredicates(r); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } Predicates.computeForEach(graph, r, null, procedure); return; } - Predicates entry = (Predicates)cache.getOrCreatePredicates(graph.processor, r); + Predicates entry = (Predicates)cache.getOrCreatePredicates(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedurePredicates; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - Predicates.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + Predicates.computeForEach(graph, r, entry, procedure_); } } - ReadEntry getOrCreateReadEntry(QueryProcessor processor, Read r, boolean isSync) throws DatabaseException { + private Predicates peekPredicates(int r) { + synchronized(predicatesMap) { + return (Predicates) predicatesMap.get(r); + } + } + + private final ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read r, boolean needsToBlock) throws DatabaseException { ReadEntry existing = null; synchronized(readEntryMap) { existing = (ReadEntry)readEntryMap.get(r); if(existing == null) { existing = new ReadEntry(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); readEntryMap.put(id(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } if(existing.isPending()) { - if(isSync) waitPending(processor, existing); - else return null; + if(needsToBlock) + waitPending(graph, existing); + else { + return null; + } } return existing; } void remove(ReadEntry entry) { synchronized(readEntryMap) { - readEntryMap.remove(entry.request); + readEntryMap.remove(entry.id); } } - public static void runnerReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean isSync) throws DatabaseException { + public static Object runnerReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { - ReadEntry.computeForEach(graph, r, null, procedure); - return; + if (SINGLE) { + ReadEntry e = cache.peekReadEntry(r); + if (e != null && e.isReady()) { + return e.performFromCache(graph, procedure); + } + } + return ReadEntry.computeForEach(graph, r, null, procedure, needsToBlock); } - ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph.processor, r, isSync); + ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock); if(entry == null) { - graph.processor.schedule(new SessionTask(false) { - @Override - public void run(int thread) { - try { - assert(!isSync); - runnerReadEntry(graph, r, parent, listener, procedure, isSync); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - } - }); - return; + graph.asyncBarrier.inc(); + graph.processor.scheduleNow(new SessionTask() { + @Override + public void run0(int thread) { + try { + runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock); + graph.asyncBarrier.dec(); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + } + }); + return null; } AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureReadEntry; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + Object result = entry.performFromCache(graph, procedure_); + graph.processor.listening.registerFirstKnown(listener, result); + return result; + } else { assert(entry.isPending()); - ReadEntry.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + Object result = ReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock); + graph.processor.listening.registerFirstKnown(listener, result); + return result; } } - AsyncReadEntry getOrCreateAsyncReadEntry(QueryProcessor processor, AsyncRead r, boolean isSync) throws DatabaseException { + private ReadEntry peekReadEntry(Read r) { + synchronized(readEntryMap) { + return (ReadEntry) readEntryMap.get(r); + } + } + + AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException { AsyncReadEntry existing = null; synchronized(asyncReadEntryMap) { existing = (AsyncReadEntry)asyncReadEntryMap.get(r); if(existing == null) { existing = new AsyncReadEntry(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); asyncReadEntryMap.put(id(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } if(existing.isPending()) { - if(isSync) waitPending(processor, existing); - else return null; + if(needsToBlock) + waitPending(graph, existing); + else { + existing.executeWhenResultIsAvailable(graph.processor, new SessionTask(graph) { + @Override + public void run0(int thread) { + try { + runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + } + }); + return null; + } } return existing; } void remove(AsyncReadEntry entry) { synchronized(asyncReadEntryMap) { - asyncReadEntryMap.remove(entry.request); + asyncReadEntryMap.remove(entry.id); } } - public static void runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean isSync) throws DatabaseException { + public static Object runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { - AsyncReadEntry.computeForEach(graph, r, null, procedure); - return; + if (SINGLE) { + AsyncReadEntry e = cache.peekAsyncReadEntry(r); + if (e != null && e.isReady()) { + return e.performFromCache(graph, procedure); + } + } + return AsyncReadEntry.computeForEach(graph, r, null, procedure, needsToBlock); } - AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph.processor, r, isSync); + AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock); if(entry == null) { - graph.processor.schedule(new SessionTask(false) { - @Override - public void run(int thread) { - try { - assert(!isSync); - runnerAsyncReadEntry(graph, r, parent, listener, procedure, isSync); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - } - }); - return; + // Entry was pending and this request has been queued + return null; } AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + Object result = entry.performFromCache(graph, procedure_); + graph.processor.listening.registerFirstKnown(listener, result); + return result; + } else { assert(entry.isPending()); - AsyncReadEntry.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + Object result = AsyncReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock); + graph.processor.listening.registerFirstKnown(listener, result); + return result; + } + } + + private AsyncReadEntry peekAsyncReadEntry(AsyncRead r) { + synchronized(asyncReadEntryMap) { + return (AsyncReadEntry) asyncReadEntryMap.get(r); } } - Types getOrCreateTypes(QueryProcessor processor, int r) throws DatabaseException { + Types getOrCreateTypes(ReadGraphImpl graph, int r) throws DatabaseException { Types existing = null; synchronized(typesMap) { existing = (Types)typesMap.get(r); if(existing == null) { existing = new Types(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); typesMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -615,38 +817,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + if (SINGLE) { + Types e = cache.peekTypes(r); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } Types.computeForEach(graph, r, null, procedure); return; } - Types entry = (Types)cache.getOrCreateTypes(graph.processor, r); + Types entry = (Types)cache.getOrCreateTypes(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureTypes; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - Types.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + Types.computeForEach(graph, r, entry, procedure_); } } - ChildMap getOrCreateChildMap(QueryProcessor processor, int r) throws DatabaseException { + private Types peekTypes(int r) { + synchronized(typesMap) { + return (Types) typesMap.get(r); + } + } + + ChildMap getOrCreateChildMap(ReadGraphImpl graph, int r) throws DatabaseException { ChildMap existing = null; synchronized(childMapMap) { existing = (ChildMap)childMapMap.get(r); if(existing == null) { existing = new ChildMap(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); childMapMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -659,38 +877,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerChildMap(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure> procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + if (SINGLE) { + ChildMap e = cache.peekChildMap(r); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } ChildMap.computeForEach(graph, r, null, procedure); return; } - ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph.processor, r); + ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph, r); InternalProcedure> procedure_ = procedure != null ? procedure : emptyProcedureChildMap; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - ChildMap.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + ChildMap.computeForEach(graph, r, entry, procedure_); } } - TypeHierarchy getOrCreateTypeHierarchy(QueryProcessor processor, int r) throws DatabaseException { + private ChildMap peekChildMap(int r) { + synchronized(childMapMap) { + return (ChildMap) childMapMap.get(r); + } + } + + TypeHierarchy getOrCreateTypeHierarchy(ReadGraphImpl graph, int r) throws DatabaseException { TypeHierarchy existing = null; synchronized(typeHierarchyMap) { existing = (TypeHierarchy)typeHierarchyMap.get(r); if(existing == null) { existing = new TypeHierarchy(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); typeHierarchyMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -703,38 +937,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerTypeHierarchy(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + if (SINGLE) { + TypeHierarchy e = cache.peekTypeHierarchy(r); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } TypeHierarchy.computeForEach(graph, r, null, procedure); return; } - TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph.processor, r); + TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - TypeHierarchy.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + TypeHierarchy.computeForEach(graph, r, entry, procedure_); } } - SuperTypes getOrCreateSuperTypes(QueryProcessor processor, int r) throws DatabaseException { + private TypeHierarchy peekTypeHierarchy(int r) { + synchronized(typeHierarchyMap) { + return (TypeHierarchy) typeHierarchyMap.get(r); + } + } + + SuperTypes getOrCreateSuperTypes(ReadGraphImpl graph, int r) throws DatabaseException { SuperTypes existing = null; synchronized(superTypesMap) { existing = (SuperTypes)superTypesMap.get(r); if(existing == null) { existing = new SuperTypes(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); superTypesMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -747,38 +997,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerSuperTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + if (SINGLE) { + SuperTypes e = cache.peekSuperTypes(r); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } SuperTypes.computeForEach(graph, r, null, procedure); return; } - SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph.processor, r); + SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - SuperTypes.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + SuperTypes.computeForEach(graph, r, entry, procedure_); + } + } + + private SuperTypes peekSuperTypes(int r) { + synchronized(superTypesMap) { + return (SuperTypes) superTypesMap.get(r); } } - SuperRelations getOrCreateSuperRelations(QueryProcessor processor, int r) throws DatabaseException { + SuperRelations getOrCreateSuperRelations(ReadGraphImpl graph, int r) throws DatabaseException { SuperRelations existing = null; synchronized(superRelationsMap) { existing = (SuperRelations)superRelationsMap.get(r); if(existing == null) { existing = new SuperRelations(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); superRelationsMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -791,38 +1057,54 @@ public class QueryCache extends QueryCacheBase { public static void runnerSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { + if (SINGLE) { + SuperRelations e = cache.peekSuperRelations(r); + if (e != null && e.isReady()) { + e.performFromCache(graph, procedure); + return; + } + } SuperRelations.computeForEach(graph, r, null, procedure); return; } - SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph.processor, r); + SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - SuperRelations.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + SuperRelations.computeForEach(graph, r, entry, procedure_); } } - AssertedPredicates getOrCreateAssertedPredicates(QueryProcessor processor, int r) throws DatabaseException { + private SuperRelations peekSuperRelations(int r) { + synchronized(superRelationsMap) { + return (SuperRelations) superRelationsMap.get(r); + } + } + + AssertedPredicates getOrCreateAssertedPredicates(ReadGraphImpl graph, int r) throws DatabaseException { AssertedPredicates existing = null; synchronized(assertedPredicatesMap) { existing = (AssertedPredicates)assertedPredicatesMap.get(r); if(existing == null) { existing = new AssertedPredicates(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); assertedPredicatesMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -834,35 +1116,44 @@ public class QueryCache extends QueryCacheBase { public static void runnerAssertedPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph.processor, r); + AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.compute(graph, procedure_); } } - AssertedStatements getOrCreateAssertedStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException { + private AssertedPredicates peekAssertedPredicates(int r) { + synchronized(assertedPredicatesMap) { + return (AssertedPredicates) assertedPredicatesMap.get(r); + } + } + + AssertedStatements getOrCreateAssertedStatements(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { AssertedStatements existing = null; synchronized(assertedStatementsMap) { existing = (AssertedStatements)assertedStatementsMap.get(r1,r2); if(existing == null) { existing = new AssertedStatements(r1,r2); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); assertedStatementsMap.put(keyR2(r1,r2), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -874,35 +1165,44 @@ public class QueryCache extends QueryCacheBase { public static void runnerAssertedStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final TripleIntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph.processor, r1,r2); + AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph, r1,r2); TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.compute(graph, procedure_); + } + } + + private AssertedStatements peekAssertedStatements(int r1, int r2) { + synchronized(assertedStatementsMap) { + return (AssertedStatements) assertedStatementsMap.get(r1,r2); } } - DirectSuperRelations getOrCreateDirectSuperRelations(QueryProcessor processor, int r) throws DatabaseException { + DirectSuperRelations getOrCreateDirectSuperRelations(ReadGraphImpl graph, int r) throws DatabaseException { DirectSuperRelations existing = null; synchronized(directSuperRelationsMap) { existing = (DirectSuperRelations)directSuperRelationsMap.get(r); if(existing == null) { existing = new DirectSuperRelations(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); directSuperRelationsMap.put(keyR(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } @@ -914,134 +1214,169 @@ public class QueryCache extends QueryCacheBase { public static void runnerDirectSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph.processor, r); + DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.compute(graph, procedure_); } } - MultiReadEntry getOrCreateMultiReadEntry(QueryProcessor processor, MultiRead r) throws DatabaseException { + private DirectSuperRelations peekDirectSuperRelations(int r) { + synchronized(directSuperRelationsMap) { + return (DirectSuperRelations) directSuperRelationsMap.get(r); + } + } + + MultiReadEntry getOrCreateMultiReadEntry(ReadGraphImpl graph, MultiRead r) throws DatabaseException { MultiReadEntry existing = null; synchronized(multiReadEntryMap) { existing = (MultiReadEntry)multiReadEntryMap.get(r); if(existing == null) { existing = new MultiReadEntry(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); multiReadEntryMap.put(id(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } void remove(MultiReadEntry entry) { synchronized(multiReadEntryMap) { - multiReadEntryMap.remove(entry.request); + multiReadEntryMap.remove(entry.id); } } public static void runnerMultiReadEntry(ReadGraphImpl graph, MultiRead r, CacheEntry parent, ListenerBase listener, final SyncMultiProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph.processor, r); + MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph, r); SyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.compute(graph, procedure_); } } - AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(QueryProcessor processor, AsyncMultiRead r) throws DatabaseException { + private MultiReadEntry peekMultiReadEntry(MultiRead r) { + synchronized(multiReadEntryMap) { + return (MultiReadEntry) multiReadEntryMap.get(r); + } + } + + AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead r) throws DatabaseException { AsyncMultiReadEntry existing = null; synchronized(asyncMultiReadEntryMap) { existing = (AsyncMultiReadEntry)asyncMultiReadEntryMap.get(r); if(existing == null) { existing = new AsyncMultiReadEntry(r); - existing.clearResult(querySupport); - existing.setPending(); + existing.setPending(querySupport); asyncMultiReadEntryMap.put(id(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } void remove(AsyncMultiReadEntry entry) { synchronized(asyncMultiReadEntryMap) { - asyncMultiReadEntryMap.remove(entry.request); + asyncMultiReadEntryMap.remove(entry.id); } } public static void runnerAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead r, CacheEntry parent, ListenerBase listener, final AsyncMultiProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph.processor, r); + AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph, r); AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.compute(graph, procedure_); + } + } + + private AsyncMultiReadEntry peekAsyncMultiReadEntry(AsyncMultiRead r) { + synchronized(asyncMultiReadEntryMap) { + return (AsyncMultiReadEntry) asyncMultiReadEntryMap.get(r); } } - ExternalReadEntry getOrCreateExternalReadEntry(QueryProcessor processor, ExternalRead r) throws DatabaseException { + ExternalReadEntry getOrCreateExternalReadEntry(ReadGraphImpl graph, ExternalRead r) throws DatabaseException { ExternalReadEntry existing = null; synchronized(externalReadEntryMap) { existing = (ExternalReadEntry)externalReadEntryMap.get(r); if(existing == null) { - existing = new ExternalReadEntry(r); - existing.clearResult(querySupport); - existing.setPending(); + existing = new ExternalReadEntry(r, graph); + existing.setPending(querySupport); externalReadEntryMap.put(id(r), existing); size++; return existing; } if(existing.requiresComputation()) { - existing.setPending(); + existing.setPending(querySupport); return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) { + waitPending(graph, existing); + } return existing; } void remove(ExternalReadEntry entry) { synchronized(externalReadEntryMap) { - externalReadEntryMap.remove(entry.request); + externalReadEntryMap.remove(entry.id); } } public static void runnerExternalReadEntry(ReadGraphImpl graph, ExternalRead r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph.processor, r); + ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph, r); AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.compute(graph, procedure_); + } + } + + private ExternalReadEntry peekExternalReadEntry(ExternalRead r) { + synchronized(externalReadEntryMap) { + return (ExternalReadEntry) externalReadEntryMap.get(r); } }