}
Objects entry = (Objects)cache.getOrCreateObjects(graph, r1,r2);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Objects.computeForEach(graph, r1,r2, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
Statements entry = (Statements)cache.getOrCreateStatements(graph, r1,r2);
TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Statements.computeForEach(graph, r1,r2, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph, r1,r2);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
DirectObjects.computeForEach(graph, r1,r2, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph, r);
InternalProcedure<RelationInfo> procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
RelationInfoQuery.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph, id);
InternalProcedure<Integer> procedure_ = procedure != null ? procedure : emptyProcedureURIToResource;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
URIToResource.computeForEach(graph, id, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph, r);
InternalProcedure<byte[]> procedure_ = procedure != null ? procedure : emptyProcedureValueQuery;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
ValueQuery.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
OrderedSet.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
PrincipalTypes.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
DirectPredicates.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
Predicates entry = (Predicates)cache.getOrCreatePredicates(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedurePredicates;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Predicates.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
}
- ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read<?> r, boolean needsToBlock) throws DatabaseException {
+ private final ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read<?> r, boolean needsToBlock) throws DatabaseException {
ReadEntry existing = null;
synchronized(readEntryMap) {
existing = (ReadEntry)readEntryMap.get(r);
}
ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock);
if(entry == null) {
- graph.processor.schedule(new SessionTask(graph) {
+ graph.asyncBarrier.inc();
+ graph.processor.scheduleNow(new SessionTask() {
@Override
public void run0(int thread) {
try {
runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+ graph.asyncBarrier.dec();
} catch (DatabaseException e) {
Logger.defaultLogError(e);
}
return null;
}
AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) return entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ Object result = entry.performFromCache(graph, procedure_);
+ graph.processor.listening.registerFirstKnown(listener, result);
+ return result;
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Object result = ReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ graph.processor.listening.registerFirstKnown(listener, result);
return result;
}
}
}
}
- AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, boolean needsToBlock) throws DatabaseException {
+ AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException {
AsyncReadEntry existing = null;
synchronized(asyncReadEntryMap) {
existing = (AsyncReadEntry)asyncReadEntryMap.get(r);
}
}
if(existing.isPending()) {
- if(needsToBlock)
- waitPending(graph, existing);
- else {
- return null;
- }
+ if(needsToBlock)
+ waitPending(graph, existing);
+ else {
+ existing.executeWhenResultIsAvailable(graph.processor, new SessionTask(graph) {
+ @Override
+ public void run0(int thread) {
+ try {
+ runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+ });
+ return null;
+ }
}
return existing;
}
}
return AsyncReadEntry.computeForEach(graph, r, null, procedure, needsToBlock);
}
- AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock);
+ AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
if(entry == null) {
- graph.processor.schedule(new SessionTask(graph) {
- @Override
- public void run0(int thread) {
- try {
- runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
- }
- });
- return null;
+ // Entry was pending and this request has been queued
+ return null;
}
AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) return entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ Object result = entry.performFromCache(graph, procedure_);
+ graph.processor.listening.registerFirstKnown(listener, result);
+ return result;
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Object result = AsyncReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ graph.processor.listening.registerFirstKnown(listener, result);
return result;
}
}
}
Types entry = (Types)cache.getOrCreateTypes(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypes;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Types.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph, r);
InternalProcedure<ObjectResourceIdMap<String>> procedure_ = procedure != null ? procedure : emptyProcedureChildMap;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
ChildMap.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
TypeHierarchy.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
SuperTypes.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
SuperRelations.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph, r1,r2);
TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph, r);
SyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph, r);
AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph, r);
AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}