public class QueryCache extends QueryCacheBase {
+ private static final boolean SINGLE = true;
+
public QueryCache(QuerySupport querySupport, int threads) {
super(querySupport, threads);
}
existing = (Objects)objectsMap.get(r1,r2);
if(existing == null) {
existing = new Objects(r1,r2);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
objectsMap.put(keyR2(r1,r2), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerObjects(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r1,r2)) {
+ if (SINGLE) {
+ Objects e = cache.peekObjects(r1,r2);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
Objects.computeForEach(graph, r1,r2, null, procedure);
return;
}
Objects entry = (Objects)cache.getOrCreateObjects(graph, r1,r2);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Objects.computeForEach(graph, r1,r2, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private Objects peekObjects(int r1, int r2) {
+ synchronized(objectsMap) {
+ return (Objects) objectsMap.get(r1,r2);
}
}
existing = (Statements)statementsMap.get(r1,r2);
if(existing == null) {
existing = new Statements(r1,r2);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
statementsMap.put(keyR2(r1,r2), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final TripleIntProcedure procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r1,r2)) {
+ if (SINGLE) {
+ Statements e = cache.peekStatements(r1,r2);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
Statements.computeForEach(graph, r1,r2, null, procedure);
return;
}
Statements entry = (Statements)cache.getOrCreateStatements(graph, r1,r2);
TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Statements.computeForEach(graph, r1,r2, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private Statements peekStatements(int r1, int r2) {
+ synchronized(statementsMap) {
+ return (Statements) statementsMap.get(r1,r2);
}
}
existing = (DirectObjects)directObjectsMap.get(r1,r2);
if(existing == null) {
existing = new DirectObjects(r1,r2);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
directObjectsMap.put(keyR2(r1,r2), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerDirectObjects(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r1,r2)) {
+ if (SINGLE) {
+ DirectObjects e = cache.peekDirectObjects(r1,r2);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
DirectObjects.computeForEach(graph, r1,r2, null, procedure);
return;
}
DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph, r1,r2);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
DirectObjects.computeForEach(graph, r1,r2, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private DirectObjects peekDirectObjects(int r1, int r2) {
+ synchronized(directObjectsMap) {
+ return (DirectObjects) directObjectsMap.get(r1,r2);
}
}
existing = (RelationInfoQuery)relationInfoQueryMap.get(r);
if(existing == null) {
existing = new RelationInfoQuery(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
relationInfoQueryMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerRelationInfoQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<RelationInfo> procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ RelationInfoQuery e = cache.peekRelationInfoQuery(r);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
RelationInfoQuery.computeForEach(graph, r, null, procedure);
return;
}
RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph, r);
InternalProcedure<RelationInfo> procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
RelationInfoQuery.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private RelationInfoQuery peekRelationInfoQuery(int r) {
+ synchronized(relationInfoQueryMap) {
+ return (RelationInfoQuery) relationInfoQueryMap.get(r);
}
}
existing = (URIToResource)uRIToResourceMap.get(id);
if(existing == null) {
existing = new URIToResource(id);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
uRIToResourceMap.put(keyID(id), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerURIToResource(ReadGraphImpl graph, String id, CacheEntry parent, ListenerBase listener, final InternalProcedure<Integer> procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, id)) {
+ if (SINGLE) {
+ URIToResource e = cache.peekURIToResource(id);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
URIToResource.computeForEach(graph, id, null, procedure);
return;
}
URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph, id);
InternalProcedure<Integer> procedure_ = procedure != null ? procedure : emptyProcedureURIToResource;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
URIToResource.computeForEach(graph, id, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private URIToResource peekURIToResource(String id) {
+ synchronized(uRIToResourceMap) {
+ return (URIToResource) uRIToResourceMap.get(id);
}
}
existing = (ValueQuery)valueQueryMap.get(r);
if(existing == null) {
existing = new ValueQuery(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
valueQueryMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerValueQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<byte[]> procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ ValueQuery e = cache.peekValueQuery(r);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
ValueQuery.computeForEach(graph, r, null, procedure);
return;
}
ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph, r);
InternalProcedure<byte[]> procedure_ = procedure != null ? procedure : emptyProcedureValueQuery;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
ValueQuery.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private ValueQuery peekValueQuery(int r) {
+ synchronized(valueQueryMap) {
+ return (ValueQuery) valueQueryMap.get(r);
}
}
existing = (OrderedSet)orderedSetMap.get(r);
if(existing == null) {
existing = new OrderedSet(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
orderedSetMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerOrderedSet(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ OrderedSet e = cache.peekOrderedSet(r);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
OrderedSet.computeForEach(graph, r, null, procedure);
return;
}
OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
OrderedSet.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private OrderedSet peekOrderedSet(int r) {
+ synchronized(orderedSetMap) {
+ return (OrderedSet) orderedSetMap.get(r);
}
}
existing = (PrincipalTypes)principalTypesMap.get(r);
if(existing == null) {
existing = new PrincipalTypes(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
principalTypesMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerPrincipalTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ PrincipalTypes e = cache.peekPrincipalTypes(r);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
PrincipalTypes.computeForEach(graph, r, null, procedure);
return;
}
PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
PrincipalTypes.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private PrincipalTypes peekPrincipalTypes(int r) {
+ synchronized(principalTypesMap) {
+ return (PrincipalTypes) principalTypesMap.get(r);
}
}
existing = (DirectPredicates)directPredicatesMap.get(r);
if(existing == null) {
existing = new DirectPredicates(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
directPredicatesMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerDirectPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<IntSet> procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ DirectPredicates e = cache.peekDirectPredicates(r);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
DirectPredicates.computeForEach(graph, r, null, procedure);
return;
}
DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
DirectPredicates.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private DirectPredicates peekDirectPredicates(int r) {
+ synchronized(directPredicatesMap) {
+ return (DirectPredicates) directPredicatesMap.get(r);
}
}
existing = (Predicates)predicatesMap.get(r);
if(existing == null) {
existing = new Predicates(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
predicatesMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<IntSet> procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ Predicates e = cache.peekPredicates(r);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
Predicates.computeForEach(graph, r, null, procedure);
return;
}
Predicates entry = (Predicates)cache.getOrCreatePredicates(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedurePredicates;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Predicates.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
- ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read<?> r, boolean needsToBlock) throws DatabaseException {
+ private Predicates peekPredicates(int r) {
+ synchronized(predicatesMap) {
+ return (Predicates) predicatesMap.get(r);
+ }
+ }
+
+ private final ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read<?> r, boolean needsToBlock) throws DatabaseException {
ReadEntry existing = null;
synchronized(readEntryMap) {
existing = (ReadEntry)readEntryMap.get(r);
if(existing == null) {
existing = new ReadEntry(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
readEntryMap.put(id(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
if(existing.isPending()) {
- if(needsToBlock) waitPending(graph, existing);
- else return null;
+ if(needsToBlock)
+ waitPending(graph, existing);
+ else {
+ return null;
+ }
}
return existing;
}
void remove(ReadEntry entry) {
synchronized(readEntryMap) {
- readEntryMap.remove(entry.request);
+ readEntryMap.remove(entry.id);
}
}
- public static Object runnerReadEntry(ReadGraphImpl graph, Read<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, final boolean needsToBlock) throws DatabaseException {
+ public static Object runnerReadEntry(ReadGraphImpl graph, Read<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
- return ReadEntry.computeForEach(graph, r, null, procedure);
+ if (SINGLE) {
+ ReadEntry e = cache.peekReadEntry(r);
+ if (e != null && e.isReady()) {
+ return e.performFromCache(graph, procedure);
+ }
+ }
+ return ReadEntry.computeForEach(graph, r, null, procedure, needsToBlock);
}
ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock);
if(entry == null) {
- graph.processor.schedule(new SessionTask(graph) {
- @Override
- public void run(int thread) {
- try {
- runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock);
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
- }
- });
- return null;
+ graph.asyncBarrier.inc();
+ graph.processor.scheduleNow(new SessionTask() {
+ @Override
+ public void run0(int thread) {
+ try {
+ runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+ graph.asyncBarrier.dec();
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+ });
+ return null;
}
AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) return entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ Object result = entry.performFromCache(graph, procedure_);
+ graph.processor.listening.registerFirstKnown(listener, result);
+ return result;
+ }
else {
assert(entry.isPending());
- Object result = ReadEntry.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ Object result = ReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock);
+ graph.processor.listening.registerFirstKnown(listener, result);
return result;
}
}
- AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, boolean needsToBlock) throws DatabaseException {
+ private ReadEntry peekReadEntry(Read<?> r) {
+ synchronized(readEntryMap) {
+ return (ReadEntry) readEntryMap.get(r);
+ }
+ }
+
+ AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException {
AsyncReadEntry existing = null;
synchronized(asyncReadEntryMap) {
existing = (AsyncReadEntry)asyncReadEntryMap.get(r);
if(existing == null) {
existing = new AsyncReadEntry(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
asyncReadEntryMap.put(id(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
if(existing.isPending()) {
- if(needsToBlock) waitPending(graph, existing);
- else return null;
+ if(needsToBlock)
+ waitPending(graph, existing);
+ else {
+ existing.executeWhenResultIsAvailable(graph.processor, new SessionTask(graph) {
+ @Override
+ public void run0(int thread) {
+ try {
+ runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+ });
+ return null;
+ }
}
return existing;
}
void remove(AsyncReadEntry entry) {
synchronized(asyncReadEntryMap) {
- asyncReadEntryMap.remove(entry.request);
+ asyncReadEntryMap.remove(entry.id);
}
}
- public static Object runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, final boolean needsToBlock) throws DatabaseException {
+ public static Object runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ AsyncReadEntry e = cache.peekAsyncReadEntry(r);
+ if (e != null && e.isReady()) {
+ return e.performFromCache(graph, procedure);
+ }
+ }
return AsyncReadEntry.computeForEach(graph, r, null, procedure, needsToBlock);
}
- AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock);
+ AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
if(entry == null) {
- graph.processor.schedule(new SessionTask(graph) {
- @Override
- public void run(int thread) {
- try {
- runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
- }
- });
+ // Entry was pending and this request has been queued
return null;
}
AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) return entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ Object result = entry.performFromCache(graph, procedure_);
+ graph.processor.listening.registerFirstKnown(listener, result);
+ return result;
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Object result = AsyncReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ graph.processor.listening.registerFirstKnown(listener, result);
return result;
}
}
+ private AsyncReadEntry peekAsyncReadEntry(AsyncRead<?> r) {
+ synchronized(asyncReadEntryMap) {
+ return (AsyncReadEntry) asyncReadEntryMap.get(r);
+ }
+ }
+
Types getOrCreateTypes(ReadGraphImpl graph, int r) throws DatabaseException {
Types existing = null;
synchronized(typesMap) {
existing = (Types)typesMap.get(r);
if(existing == null) {
existing = new Types(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
typesMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<IntSet> procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ Types e = cache.peekTypes(r);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
Types.computeForEach(graph, r, null, procedure);
return;
}
Types entry = (Types)cache.getOrCreateTypes(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypes;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Types.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private Types peekTypes(int r) {
+ synchronized(typesMap) {
+ return (Types) typesMap.get(r);
}
}
existing = (ChildMap)childMapMap.get(r);
if(existing == null) {
existing = new ChildMap(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
childMapMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerChildMap(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<ObjectResourceIdMap<String>> procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ ChildMap e = cache.peekChildMap(r);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
ChildMap.computeForEach(graph, r, null, procedure);
return;
}
ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph, r);
InternalProcedure<ObjectResourceIdMap<String>> procedure_ = procedure != null ? procedure : emptyProcedureChildMap;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
ChildMap.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private ChildMap peekChildMap(int r) {
+ synchronized(childMapMap) {
+ return (ChildMap) childMapMap.get(r);
}
}
existing = (TypeHierarchy)typeHierarchyMap.get(r);
if(existing == null) {
existing = new TypeHierarchy(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
typeHierarchyMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerTypeHierarchy(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<IntSet> procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ TypeHierarchy e = cache.peekTypeHierarchy(r);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
TypeHierarchy.computeForEach(graph, r, null, procedure);
return;
}
TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
TypeHierarchy.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private TypeHierarchy peekTypeHierarchy(int r) {
+ synchronized(typeHierarchyMap) {
+ return (TypeHierarchy) typeHierarchyMap.get(r);
}
}
existing = (SuperTypes)superTypesMap.get(r);
if(existing == null) {
existing = new SuperTypes(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
superTypesMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerSuperTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<IntSet> procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ SuperTypes e = cache.peekSuperTypes(r);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
SuperTypes.computeForEach(graph, r, null, procedure);
return;
}
SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
SuperTypes.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private SuperTypes peekSuperTypes(int r) {
+ synchronized(superTypesMap) {
+ return (SuperTypes) superTypesMap.get(r);
}
}
existing = (SuperRelations)superRelationsMap.get(r);
if(existing == null) {
existing = new SuperRelations(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
superRelationsMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
public static void runnerSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<IntSet> procedure) throws DatabaseException {
QueryCache cache = graph.processor.cache;
if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
+ if (SINGLE) {
+ SuperRelations e = cache.peekSuperRelations(r);
+ if (e != null && e.isReady()) {
+ e.performFromCache(graph, procedure);
+ return;
+ }
+ }
SuperRelations.computeForEach(graph, r, null, procedure);
return;
}
SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
SuperRelations.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private SuperRelations peekSuperRelations(int r) {
+ synchronized(superRelationsMap) {
+ return (SuperRelations) superRelationsMap.get(r);
}
}
existing = (AssertedPredicates)assertedPredicatesMap.get(r);
if(existing == null) {
existing = new AssertedPredicates(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
assertedPredicatesMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
QueryCache cache = graph.processor.cache;
AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private AssertedPredicates peekAssertedPredicates(int r) {
+ synchronized(assertedPredicatesMap) {
+ return (AssertedPredicates) assertedPredicatesMap.get(r);
}
}
existing = (AssertedStatements)assertedStatementsMap.get(r1,r2);
if(existing == null) {
existing = new AssertedStatements(r1,r2);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
assertedStatementsMap.put(keyR2(r1,r2), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
QueryCache cache = graph.processor.cache;
AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph, r1,r2);
TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private AssertedStatements peekAssertedStatements(int r1, int r2) {
+ synchronized(assertedStatementsMap) {
+ return (AssertedStatements) assertedStatementsMap.get(r1,r2);
}
}
existing = (DirectSuperRelations)directSuperRelationsMap.get(r);
if(existing == null) {
existing = new DirectSuperRelations(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
directSuperRelationsMap.put(keyR(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
QueryCache cache = graph.processor.cache;
DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private DirectSuperRelations peekDirectSuperRelations(int r) {
+ synchronized(directSuperRelationsMap) {
+ return (DirectSuperRelations) directSuperRelationsMap.get(r);
}
}
existing = (MultiReadEntry)multiReadEntryMap.get(r);
if(existing == null) {
existing = new MultiReadEntry(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
multiReadEntryMap.put(id(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
void remove(MultiReadEntry entry) {
synchronized(multiReadEntryMap) {
- multiReadEntryMap.remove(entry.request);
+ multiReadEntryMap.remove(entry.id);
}
}
QueryCache cache = graph.processor.cache;
MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph, r);
SyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private MultiReadEntry peekMultiReadEntry(MultiRead<?> r) {
+ synchronized(multiReadEntryMap) {
+ return (MultiReadEntry) multiReadEntryMap.get(r);
}
}
existing = (AsyncMultiReadEntry)asyncMultiReadEntryMap.get(r);
if(existing == null) {
existing = new AsyncMultiReadEntry(r);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
asyncMultiReadEntryMap.put(id(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
void remove(AsyncMultiReadEntry entry) {
synchronized(asyncMultiReadEntryMap) {
- asyncMultiReadEntryMap.remove(entry.request);
+ asyncMultiReadEntryMap.remove(entry.id);
}
}
QueryCache cache = graph.processor.cache;
AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph, r);
AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private AsyncMultiReadEntry peekAsyncMultiReadEntry(AsyncMultiRead<?> r) {
+ synchronized(asyncMultiReadEntryMap) {
+ return (AsyncMultiReadEntry) asyncMultiReadEntryMap.get(r);
}
}
existing = (ExternalReadEntry)externalReadEntryMap.get(r);
if(existing == null) {
existing = new ExternalReadEntry(r, graph);
- existing.clearResult(querySupport);
- existing.setPending();
+ existing.setPending(querySupport);
externalReadEntryMap.put(id(r), existing);
size++;
return existing;
}
if(existing.requiresComputation()) {
- existing.setPending();
+ existing.setPending(querySupport);
return existing;
}
}
- if(existing.isPending()) waitPending(graph, existing);
+ if(existing.isPending()) {
+ waitPending(graph, existing);
+ }
return existing;
}
void remove(ExternalReadEntry entry) {
synchronized(externalReadEntryMap) {
- externalReadEntryMap.remove(entry.request);
+ externalReadEntryMap.remove(entry.id);
}
}
QueryCache cache = graph.processor.cache;
ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph, r);
AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ }
+ }
+
+ private ExternalReadEntry peekExternalReadEntry(ExternalRead<?> r) {
+ synchronized(externalReadEntryMap) {
+ return (ExternalReadEntry) externalReadEntryMap.get(r);
}
}