line(content, " " + clazz + " entry = (" + clazz + ")cache.getOrCreate" + clazz + "(graph, " + signature.keyName + ");");
}
line(content, " " + signature.procedure + " procedure_ = procedure != null ? procedure : emptyProcedure" + clazz + ";");
- line(content, " ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);");
- line(content, " if(entry.isReady()) " + (genReturn ? "return " : "") + "entry.performFromCache(graph, procedure_);");
+ line(content, " if(entry.isReady()) {");
+ line(content, " graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);");
+ if(genReturn) {
+ line(content, " Object result = entry.performFromCache(graph, procedure_);");
+ line(content, " graph.processor.listening.registerFirstKnown(listener, result);");
+ line(content, " return result;");
+ } else {
+ line(content, " entry.performFromCache(graph, procedure_);");
+ }
+ line(content, " }");
line(content, " else {");
line(content, " assert(entry.isPending());");
- if(shortcut) line(content, " " + (genReturn ? "Object result = " : "") + clazz + ".computeForEach(graph, " + signature.keyName + ", entry, procedure_" + (signature.useNeedsToBlock ? ", needsToBlock" : "") + ");");
- else line(content, " entry.compute(graph, procedure_);");
- line(content, " if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());");
- if(genReturn) line(content, " return result;");
+ if(genReturn) {
+ line(content, " graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);");
+ if(shortcut) line(content, " Object result = " + clazz + ".computeForEach(graph, " + signature.keyName + ", entry, procedure_" + (signature.useNeedsToBlock ? ", needsToBlock" : "") + ");");
+ else line(content, " entry.compute(graph, procedure_);");
+ line(content, " graph.processor.listening.registerFirstKnown(listener, result);");
+ line(content, " return result;");
+ } else {
+ line(content, " graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);");
+ if(shortcut) line(content, " " + clazz + ".computeForEach(graph, " + signature.keyName + ", entry, procedure_" + (signature.useNeedsToBlock ? ", needsToBlock" : "") + ");");
+ else line(content, " entry.compute(graph, procedure_);");
+ }
line(content, " }");
line(content, "}");
line(content, "");
content.append("public class QueryCache extends QueryCacheBase {\n");
content.append("\n");
- line(content, "private static final boolean SINGLE = false;");
+ line(content, "private static final boolean SINGLE = true;");
content.append("\n");
line(content,"public QueryCache(QuerySupport querySupport, int threads) {");
}
Objects entry = (Objects)cache.getOrCreateObjects(graph, r1,r2);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Objects.computeForEach(graph, r1,r2, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
Statements entry = (Statements)cache.getOrCreateStatements(graph, r1,r2);
TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Statements.computeForEach(graph, r1,r2, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph, r1,r2);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
DirectObjects.computeForEach(graph, r1,r2, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph, r);
InternalProcedure<RelationInfo> procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
RelationInfoQuery.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph, id);
InternalProcedure<Integer> procedure_ = procedure != null ? procedure : emptyProcedureURIToResource;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
URIToResource.computeForEach(graph, id, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph, r);
InternalProcedure<byte[]> procedure_ = procedure != null ? procedure : emptyProcedureValueQuery;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
ValueQuery.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
OrderedSet.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
PrincipalTypes.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
DirectPredicates.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
Predicates entry = (Predicates)cache.getOrCreatePredicates(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedurePredicates;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Predicates.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
return null;
}
AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) return entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ Object result = entry.performFromCache(graph, procedure_);
+ graph.processor.listening.registerFirstKnown(listener, result);
+ return result;
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Object result = ReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ graph.processor.listening.registerFirstKnown(listener, result);
return result;
}
}
return null;
}
AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) return entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ Object result = entry.performFromCache(graph, procedure_);
+ graph.processor.listening.registerFirstKnown(listener, result);
+ return result;
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Object result = AsyncReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+ graph.processor.listening.registerFirstKnown(listener, result);
return result;
}
}
}
Types entry = (Types)cache.getOrCreateTypes(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypes;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
Types.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph, r);
InternalProcedure<ObjectResourceIdMap<String>> procedure_ = procedure != null ? procedure : emptyProcedureChildMap;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
ChildMap.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
TypeHierarchy.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
SuperTypes.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
}
SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph, r);
InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
SuperRelations.computeForEach(graph, r, entry, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph, r1,r2);
TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph, r);
IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph, r);
SyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph, r);
AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
QueryCache cache = graph.processor.cache;
ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph, r);
AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry;
- ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
- if(entry.isReady()) entry.performFromCache(graph, procedure_);
+ if(entry.isReady()) {
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+ entry.performFromCache(graph, procedure_);
+ }
else {
assert(entry.isPending());
+ graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
entry.compute(graph, procedure_);
- if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
}
}
protected final StableHashMap<AsyncMultiRead, AsyncMultiReadEntry> asyncMultiReadEntryMap;
protected final StableHashMap<ExternalRead, ExternalReadEntry> externalReadEntryMap;
- final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners;
-
public final QuerySupport querySupport;
public QueryCacheBase(QuerySupport querySupport, int threads) {
asyncMultiReadEntryMap = new StableHashMap<AsyncMultiRead, AsyncMultiReadEntry>();
multiReadEntryMap = new StableHashMap<MultiRead, MultiReadEntry>();
externalReadEntryMap = new StableHashMap<ExternalRead, ExternalReadEntry>();
- listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
}
-// public <T> Object performQuery(ReadGraphImpl parentGraph, final AsyncRead<T> query, final CacheEntryBase entry_, AsyncProcedure procedure_) throws DatabaseException {
-//
-// AsyncReadEntry<T> entry = (AsyncReadEntry<T>)entry_;
-// AsyncProcedure<T> procedure = (AsyncProcedure<T>)procedure_;
-//
-// ReadGraphImpl queryGraph = parentGraph.withParent(entry_);
-//
-// try {
-//
-// query.perform(queryGraph, new AsyncProcedure<T>() {
-//
-// @Override
-// public void execute(AsyncReadGraph returnGraph, T result) {
-// ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
-// entry.addOrSet(parentGraph, result);
-// try {
-// procedure.execute(parentGraph, result);
-// } catch (Throwable t) {
-// t.printStackTrace();
-// }
-//// parentBarrier.dec(query);
-// }
-//
-// @Override
-// public void exception(AsyncReadGraph returnGraph, Throwable t) {
-// ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
-//// AsyncReadGraph resumeGraph = finalParentGraph.newAsync();
-// entry.except(parentGraph, t);
-// try {
-// procedure.exception(parentGraph, t);
-// } catch (Throwable t2) {
-// t2.printStackTrace();
-// }
-//// parentBarrier.dec(query);
-// }
-//
-// @Override
-// public String toString() {
-// return procedure.toString();
-// }
-//
-// });
-//
-// } catch (Throwable t) {
-//
-// entry.except(t);
-// try {
-// procedure.exception(parentGraph, t);
-// } catch (Throwable t2) {
-// t2.printStackTrace();
-// }
-//// parentBarrier.dec(query);
-//
-// }
-//
-// return null;
-//
-// }
-
-// public <T> Object performQuery(ReadGraphImpl parentGraph, final Read<T> query, final CacheEntryBase entry_, AsyncProcedure procedure_) throws DatabaseException {
-//
-// ReadGraphImpl queryGraph = parentGraph.withParent(entry_);
-//
-// ReadEntry entry = (ReadEntry)entry_;
-//
-// try {
-//
-// T result = (T)query.perform(queryGraph);
-// entry.addOrSet(queryGraph, result);
-//
-// return (T)entry.get(parentGraph, procedure_);
-//
-// } catch (Throwable t) {
-//
-// entry.except(t);
-// return (T)entry.get(parentGraph, procedure_);
-//
-// }
-//
-// }
-
-
public <T> Object performQuery(ReadGraphImpl parentGraph, final AsyncMultiRead<T> query, final CacheEntryBase entry_, Object procedure_) throws DatabaseException {
ReadGraphImpl queryGraph = parentGraph.withParent(entry_);
}
- public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
-
- if (parent != null && !inferred) {
- try {
- if(!child.isImmutable(graph)) {
- synchronized(child) {
- child.addParent(parent);
- }
- }
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
- System.err.println(child + " -> " + parent);
- }
- }
- }
-
- if (listener != null) {
- return registerListener(child, listener, procedure);
- } else {
- return null;
- }
-
- }
-
- public synchronized ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
-
- assert (entry != null);
-
- if (base.isDisposed())
- return null;
-
- return addListener(entry, base, procedure);
-
- }
-
- protected void primeListenerEntry(final ListenerEntry entry, final Object result) {
- entry.setLastKnown(result);
- }
-
- private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
-
- assert (entry != null);
- assert (procedure != null);
-
- ArrayList<ListenerEntry> list = listeners.get(entry);
- if (list == null) {
- list = new ArrayList<ListenerEntry>(1);
- listeners.put(entry, list);
- }
-
- ListenerEntry result = new ListenerEntry(entry, base, procedure);
- int currentIndex = list.indexOf(result);
- // There was already a listener
- if(currentIndex > -1) {
- ListenerEntry current = list.get(currentIndex);
- if(!current.base.isDisposed()) return null;
- list.set(currentIndex, result);
- } else {
- list.add(result);
- }
-
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- new Exception().printStackTrace();
- System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
- }
- }
-
- return result;
-
- }
-
-
public Collection<CacheEntry> getRootList() {
ArrayList<CacheEntry> result = new ArrayList<CacheEntry>();
}
}
- if(queryProcessor.hasListenerAfterDisposing(entry)) {
+ if(queryProcessor.listening.hasListenerAfterDisposing(entry)) {
if((status & CacheEntry.HAS_BEEN_BOUND) == 0) {
queryProcessor.boundQueries++;
entry.setGCStatusFlag(CacheEntry.HAS_BEEN_BOUND, true);
}
private boolean tryCollect(CacheEntry entry) {
- if (!queryProcessor.hasListenerAfterDisposing(entry))
+ if (!queryProcessor.listening.hasListenerAfterDisposing(entry))
if(entry.shouldBeCollected()) {
queryProcessor.removeQuery(entry);
support.remove();
return false;
}
- if (this.queryProcessor.hasListenerAfterDisposing(entry)) {
+ if (this.queryProcessor.listening.hasListenerAfterDisposing(entry)) {
if (Development.DEVELOPMENT) {
if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYCOLLECTOR, Bindings.BOOLEAN)) {
System.err.println("GC: listened entry " + entry + " was not collected. Entry=" + entry);
--- /dev/null
+package org.simantics.db.impl.query;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.debug.ListenerReport;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.graph.WriteGraphImpl;
+import org.simantics.db.procedure.ListenerBase;
+import org.simantics.utils.Development;
+import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.datastructures.collections.CollectionUtils;
+import org.slf4j.LoggerFactory;
+
+import gnu.trove.map.hash.THashMap;
+import gnu.trove.set.hash.THashSet;
+
+public class QueryListening {
+
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
+
+ final private QueryProcessor processor;
+ private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
+ private boolean firingListeners = false;
+ final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
+ private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
+ private Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
+
+ QueryListening(QueryProcessor processor) {
+ this.processor = processor;
+ new DependencyManagementThread(processor, tasks).start();
+ }
+
+ public void sync() {
+ Semaphore s = new Semaphore(0);
+ tasks.add(() -> {
+ s.release();
+ });
+ try {
+ s.acquire();
+ } catch (Throwable t) {
+ LOGGER.error("Error while waiting for query dependency management", t);
+ }
+ }
+
+ static class DependencyManagementThread extends Thread {
+
+ final private QueryProcessor processor;
+ final BlockingQueue<Runnable> tasks;
+
+ DependencyManagementThread(QueryProcessor processor, BlockingQueue<Runnable> tasks) {
+ setName("Query Dependency Manager");
+ this.processor = processor;
+ this.tasks = tasks;
+ }
+
+ @Override
+ public void run() {
+ while(processor.isAlive()) {
+ try {
+ Runnable r = tasks.take();
+ r.run();
+ } catch (Throwable t) {
+ // Spurious problems?
+ LOGGER.error("Error while waiting for query dependency management tasks", t);
+ }
+ }
+ }
+
+ }
+
+ public boolean hasScheduledUpdates() {
+ return !scheduledListeners.isEmpty();
+ }
+
+ void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
+
+ tasks.offer(() -> {
+
+ if (parent != null && !inferred) {
+ try {
+ if(!child.isImmutable(graph))
+ child.addParent(parent);
+ } catch (DatabaseException e) {
+ LOGGER.error("Error while registering query dependencies", e);
+ }
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+ System.out.println(child + " -> " + parent);
+ }
+ }
+ }
+
+ if (listener != null)
+ registerListener(child, listener, procedure);
+
+ });
+
+ }
+
+ void registerFirstKnown(ListenerBase base, Object result) {
+
+ tasks.offer(() -> {
+
+ ListenerEntry entry = addedEntries.get(base);
+ if(entry != null) entry.setLastKnown(result);
+
+ });
+
+ }
+
+ public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
+
+ assert (entry != null);
+
+ if (base.isDisposed())
+ return null;
+
+ return addListener(entry, base, procedure);
+
+ }
+
+ /*
+ * Registers a listener and returns an entry iff the entry was added
+ */
+ private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
+
+ assert (entry != null);
+ assert (procedure != null);
+
+ ArrayList<ListenerEntry> list = listeners.get(entry);
+ if (list == null) {
+ list = new ArrayList<>(1);
+ listeners.put(entry, list);
+ }
+
+ ListenerEntry result = new ListenerEntry(entry, base, procedure);
+ // Equals is here based on base
+ int currentIndex = list.indexOf(result);
+ // There was already a listener
+ if(currentIndex > -1) {
+ ListenerEntry current = list.get(currentIndex);
+ if(!current.base.isDisposed()) return null;
+ list.set(currentIndex, result);
+ } else {
+ list.add(result);
+ }
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
+ }
+ }
+
+ addedEntries.put(base, result);
+
+ return result;
+
+ }
+
+ void scheduleListener(ListenerEntry entry) {
+ assert (entry != null);
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ System.err.println("Scheduled " + entry.procedure);
+ }
+ }
+ scheduledListeners.add(entry);
+ }
+
+ private void removeListener(ListenerEntry entry) {
+ assert (entry != null);
+ ArrayList<ListenerEntry> list = listeners.get(entry.entry);
+ if(list == null) return;
+ boolean success = list.remove(entry);
+ assert (success);
+ if (list.isEmpty())
+ listeners.remove(entry.entry);
+ }
+
+ boolean hasListener(CacheEntry entry) {
+ if(listeners.get(entry) != null) return true;
+ return false;
+ }
+
+ boolean hasListenerAfterDisposing(CacheEntry entry) {
+ if(listeners.get(entry) != null) {
+ ArrayList<ListenerEntry> entries = listeners.get(entry);
+ ArrayList<ListenerEntry> list = null;
+ for (ListenerEntry e : entries) {
+ if (e.base.isDisposed()) {
+ if(list == null) list = new ArrayList<ListenerEntry>();
+ list.add(e);
+ }
+ }
+ if(list != null) {
+ for (ListenerEntry e : list) {
+ entries.remove(e);
+ }
+ }
+ if (entries.isEmpty()) {
+ listeners.remove(entry);
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ List<ListenerEntry> getListenerEntries(CacheEntry entry) {
+ hasListenerAfterDisposing(entry);
+ if(listeners.get(entry) != null)
+ return listeners.get(entry);
+ else
+ return Collections.emptyList();
+ }
+
+ void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
+
+ if(!workarea.containsKey(entry)) {
+
+ HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
+ for(ListenerEntry e : getListenerEntries(entry))
+ ls.add(e.base);
+
+ workarea.put(entry, ls);
+
+ for(CacheEntry parent : entry.getParents(processor)) {
+ processListenerReport(parent, workarea);
+ ls.addAll(workarea.get(parent));
+ }
+
+ }
+
+ }
+
+ public synchronized ListenerReport getListenerReport() throws IOException {
+
+ class ListenerReportImpl implements ListenerReport {
+
+ Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
+
+ @Override
+ public void print(PrintStream b) {
+ Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
+ for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
+ for(ListenerBase l : e.getValue()) {
+ Integer i = hist.get(l);
+ hist.put(l, i != null ? i-1 : -1);
+ }
+ }
+
+ for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
+ b.print("" + -p.second + " " + p.first + "\n");
+ }
+
+ b.flush();
+ }
+
+ }
+
+ ListenerReportImpl result = new ListenerReportImpl();
+
+ Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
+ for(CacheEntryBase entry : all) {
+ hasListenerAfterDisposing(entry);
+ }
+ for(CacheEntryBase entry : all) {
+ processListenerReport(entry, result.workarea);
+ }
+
+ return result;
+
+ }
+
+ public synchronized String reportListeners(File file) throws IOException {
+
+ if (!processor.isAlive())
+ return "Disposed!";
+
+ PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
+ ListenerReport report = getListenerReport();
+ report.print(b);
+
+ return "Done reporting listeners.";
+
+ }
+
+ public void fireListeners(WriteGraphImpl graph) {
+
+ assert (!processor.updating);
+ assert (!processor.cache.collecting);
+ assert (!firingListeners);
+
+ firingListeners = true;
+
+ try {
+
+ // Performing may cause further events to be scheduled.
+ while (!scheduledListeners.isEmpty()) {
+
+ // Clone current events to make new entries possible during
+ // firing.
+ THashSet<ListenerEntry> entries = scheduledListeners;
+ scheduledListeners = new THashSet<ListenerEntry>();
+
+ ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
+
+ for (ListenerEntry listenerEntry : entries) {
+
+ if (pruneListener(listenerEntry)) {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("Pruned " + listenerEntry.procedure);
+ }
+ }
+ continue;
+ }
+
+ final CacheEntry entry = listenerEntry.entry;
+ assert (entry != null);
+
+ Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
+
+ if (newValue != ListenerEntry.NOT_CHANGED) {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
+ }
+ }
+ schedule.add(listenerEntry);
+ listenerEntry.setLastKnown(entry.getResult());
+ }
+
+ }
+
+ for(ListenerEntry listenerEntry : schedule) {
+ final CacheEntry entry = listenerEntry.entry;
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ System.err.println("Firing " + listenerEntry.procedure);
+ }
+ }
+ try {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
+ }
+ }
+ entry.performFromCache(graph, listenerEntry.procedure);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+ }
+
+ } finally {
+ firingListeners = false;
+ }
+
+ }
+
+ void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
+
+ Iterable<CacheEntry> oldParents = entry.getParents(processor);
+ for (CacheEntry parent : oldParents) {
+ if(!parent.isDiscarded())
+ todo.push(new UpdateEntry(entry, parent, indent + 2));
+ }
+
+ }
+
+ private boolean pruneListener(ListenerEntry entry) {
+ if (entry.base.isDisposed()) {
+ removeListener(entry);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+}
import org.simantics.db.VirtualGraph;
import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
import org.simantics.db.common.utils.Logger;
-import org.simantics.db.debug.ListenerReport;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
import org.simantics.db.exception.NoInverseException;
import org.simantics.db.impl.graph.BarrierTracing;
import org.simantics.db.impl.graph.ReadGraphImpl;
import org.simantics.db.impl.graph.ReadGraphSupport;
-import org.simantics.db.impl.graph.WriteGraphImpl;
import org.simantics.db.impl.procedure.IntProcedureAdapter;
import org.simantics.db.impl.procedure.InternalProcedure;
import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
AtomicInteger sleepers = new AtomicInteger(0);
- private boolean updating = false;
+ boolean updating = false;
- private boolean firingListeners = false;
-
final public QueryCache cache;
final public QuerySupport querySupport;
final public Session session;
final public ResourceSupport resourceSupport;
-
- private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
+
+ final public QueryListening listening = new QueryListening(this);
QueryThread[] executors;
}
final public void releaseWrite(ReadGraphImpl graph) {
- performDirtyUpdates(graph);
+ propagateChangesInQueryCache(graph);
modificationCounter++;
}
boolean isBound(ExternalReadEntry<?> entry) {
if(entry.hasParents()) return true;
- else if(hasListener(entry)) return true;
+ else if(listening.hasListener(entry)) return true;
else return false;
}
- synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
-
- if (parent != null && !inferred) {
- try {
- if(!child.isImmutable(graph))
- child.addParent(parent);
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
- System.out.println(child + " -> " + parent);
- }
- }
- }
-
- if (listener != null) {
- return registerListener(child, listener, procedure);
- } else {
- return null;
- }
-
- }
-
-
static class Dummy implements InternalProcedure<Object>, IntProcedure {
@Override
}
- public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
-
- assert (entry != null);
-
- if (base.isDisposed())
- return null;
-
- return addListener(entry, base, procedure);
-
- }
-
- private void primeListenerEntry(final ListenerEntry entry, final Object result) {
- entry.setLastKnown(result);
- }
-
- private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
-
- assert (entry != null);
- assert (procedure != null);
-
- ArrayList<ListenerEntry> list = cache.listeners.get(entry);
- if (list == null) {
- list = new ArrayList<ListenerEntry>(1);
- cache.listeners.put(entry, list);
- }
-
- ListenerEntry result = new ListenerEntry(entry, base, procedure);
- int currentIndex = list.indexOf(result);
- // There was already a listener
- if(currentIndex > -1) {
- ListenerEntry current = list.get(currentIndex);
- if(!current.base.isDisposed()) return null;
- list.set(currentIndex, result);
- } else {
- list.add(result);
- }
-
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- new Exception().printStackTrace();
- System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
- }
- }
-
- return result;
-
- }
-
- private void scheduleListener(ListenerEntry entry) {
- assert (entry != null);
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- System.err.println("Scheduled " + entry.procedure);
- }
- }
- scheduledListeners.add(entry);
- }
-
- private void removeListener(ListenerEntry entry) {
- assert (entry != null);
- ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
- if(list == null) return;
- boolean success = list.remove(entry);
- assert (success);
- if (list.isEmpty())
- cache.listeners.remove(entry.entry);
- }
-
- private boolean hasListener(CacheEntry entry) {
- if(cache.listeners.get(entry) != null) return true;
- return false;
- }
-
- boolean hasListenerAfterDisposing(CacheEntry entry) {
- if(cache.listeners.get(entry) != null) {
- ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
- ArrayList<ListenerEntry> list = null;
- for (ListenerEntry e : entries) {
- if (e.base.isDisposed()) {
- if(list == null) list = new ArrayList<ListenerEntry>();
- list.add(e);
- }
- }
- if(list != null) {
- for (ListenerEntry e : list) {
- entries.remove(e);
- }
- }
- if (entries.isEmpty()) {
- cache.listeners.remove(entry);
- return false;
- }
- return true;
- }
- return false;
- }
-
- List<ListenerEntry> getListenerEntries(CacheEntry entry) {
- hasListenerAfterDisposing(entry);
- if(cache.listeners.get(entry) != null)
- return cache.listeners.get(entry);
- else
- return Collections.emptyList();
- }
-
- void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
-
- if(!workarea.containsKey(entry)) {
-
- HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
- for(ListenerEntry e : getListenerEntries(entry))
- ls.add(e.base);
-
- workarea.put(entry, ls);
-
- for(CacheEntry parent : entry.getParents(this)) {
- processListenerReport(parent, workarea);
- ls.addAll(workarea.get(parent));
- }
-
- }
-
- }
-
- public synchronized ListenerReport getListenerReport() throws IOException {
-
- class ListenerReportImpl implements ListenerReport {
-
- Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
-
- @Override
- public void print(PrintStream b) {
- Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
- for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
- for(ListenerBase l : e.getValue()) {
- Integer i = hist.get(l);
- hist.put(l, i != null ? i-1 : -1);
- }
- }
-
- for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
- b.print("" + -p.second + " " + p.first + "\n");
- }
-
- b.flush();
- }
-
- }
-
- ListenerReportImpl result = new ListenerReportImpl();
-
- Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
- for(CacheEntryBase entry : all) {
- hasListenerAfterDisposing(entry);
- }
- for(CacheEntryBase entry : all) {
- processListenerReport(entry, result.workarea);
- }
-
- return result;
-
- }
-
- public synchronized String reportListeners(File file) throws IOException {
-
- if (!isAlive())
- return "Disposed!";
-
- PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
- ListenerReport report = getListenerReport();
- report.print(b);
-
- return "Done reporting listeners.";
-
- }
void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
int listeners = 0;
for(CacheEntry entry : workarea.keySet()) {
- boolean listener = hasListenerAfterDisposing(entry);
+ boolean listener = listening.hasListenerAfterDisposing(entry);
boolean hasParents = entry.getParents(this).iterator().hasNext();
if(listener) {
// Bound
b.print("// Entry parent listing\n");
for(CacheEntry entry : workarea.keySet()) {
int status = flagMap.get(entry);
- boolean hasListener = hasListenerAfterDisposing(entry);
+ boolean hasListener = listening.hasListenerAfterDisposing(entry);
b.print("Q " + entry.toString());
if(hasListener) {
b.print(" (L" + status + ")");
}
- class UpdateEntry {
-
- public CacheEntry caller;
-
- public CacheEntry entry;
-
- public int indent;
-
- public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
- this.caller = caller;
- this.entry = entry;
- this.indent = indent;
- }
-
- };
-
boolean removeQuery(CacheEntry entry) {
// This entry has been removed before. No need to do anything here.
Query query = entry.getQuery();
int type = query.type();
- boolean hasListener = hasListener(entry);
+ boolean hasListener = listening.hasListener(entry);
if (Development.DEVELOPMENT) {
if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
- if(hasListener(entry)) {
+ if(listening.hasListener(entry)) {
System.err.println(" (L)");
} else {
System.err.println("");
// System.err.println(" => FOO " + type);
if (hasListener) {
- ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
+ ArrayList<ListenerEntry> entries = listening.listeners.get(entry);
if(entries != null) {
for (ListenerEntry le : entries) {
- scheduleListener(le);
+ listening.scheduleListener(le);
}
}
}
// If invalid, update parents
if (type == RequestFlags.INVALIDATE) {
- updateParents(e.indent, entry, todo);
+ listening.updateParents(e.indent, entry, todo);
}
return hasListener;
}
- private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
-
- Iterable<CacheEntry> oldParents = entry.getParents(this);
- for (CacheEntry parent : oldParents) {
-// System.err.println("updateParents " + entry + " => " + parent);
- if(!parent.isDiscarded())
- todo.push(new UpdateEntry(entry, parent, indent + 2));
- }
-
- }
-
- private boolean pruneListener(ListenerEntry entry) {
- if (entry.base.isDisposed()) {
- removeListener(entry);
- return true;
- } else {
- return false;
- }
- }
-
/**
* @param av1 an array (guaranteed)
* @param av2 any object
- final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
+ final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
try {
}
- public boolean hasScheduledUpdates() {
- return !scheduledListeners.isEmpty();
- }
-
- public void performScheduledUpdates(WriteGraphImpl graph) {
-
- assert (!updating);
- assert (!cache.collecting);
- assert (!firingListeners);
-
- firingListeners = true;
-
- try {
-
- // Performing may cause further events to be scheduled.
- while (!scheduledListeners.isEmpty()) {
-
-// graph.restart();
-// graph.state.barrier.inc();
-
- // Clone current events to make new entries possible during
- // firing.
- THashSet<ListenerEntry> entries = scheduledListeners;
- scheduledListeners = new THashSet<ListenerEntry>();
-
- ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
-
- for (ListenerEntry listenerEntry : entries) {
-
- if (pruneListener(listenerEntry)) {
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- new Exception().printStackTrace();
- System.err.println("Pruned " + listenerEntry.procedure);
- }
- }
- continue;
- }
-
- final CacheEntry entry = listenerEntry.entry;
- assert (entry != null);
-
- Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
-
- if (newValue != ListenerEntry.NOT_CHANGED) {
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- new Exception().printStackTrace();
- System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
- }
- }
- schedule.add(listenerEntry);
- listenerEntry.setLastKnown(entry.getResult());
- }
-
- }
-
- for(ListenerEntry listenerEntry : schedule) {
- final CacheEntry entry = listenerEntry.entry;
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- System.err.println("Firing " + listenerEntry.procedure);
- }
- }
- try {
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
- }
- }
- entry.performFromCache(graph, listenerEntry.procedure);
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
-
-// graph.state.barrier.dec();
-// graph.waitAsync(null);
-// graph.state.barrier.assertReady();
-
- }
-
- } finally {
- firingListeners = false;
- }
-
- }
/**
*
Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
if (newValue != ListenerEntry.NOT_CHANGED)
- updateParents(0, immediate, todo);
+ listening.updateParents(0, immediate, todo);
} else {
Object newValue = compareTo(graph, immediate, oldValue);
if (newValue != ListenerEntry.NOT_CHANGED) {
- updateParents(0, immediate, todo);
+ listening.updateParents(0, immediate, todo);
} else {
// If not changed, keep the old value
immediate.setResult(oldValue);
}
- public void performDirtyUpdates(final ReadGraphImpl graph) {
+ public void propagateChangesInQueryCache(final ReadGraphImpl graph) {
+
+ // Make sure that listening has performed its work
+ listening.sync();
cache.dirty = false;
lastInvalidate = 0;
@Override
boolean isImmutable(ReadGraphImpl graph) throws DatabaseException {
- return graph.processor.isImmutable(id);
+ return graph.processor.isImmutable(id);
}
}
--- /dev/null
+package org.simantics.db.impl.query;
+
+class UpdateEntry {
+
+ public CacheEntry caller;
+
+ public CacheEntry entry;
+
+ public int indent;
+
+ public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
+ this.caller = caller;
+ this.entry = entry;
+ this.indent = indent;
+ }
+
+}
\ No newline at end of file
}
- @Override
- public boolean isImmutable(ReadGraph graph) throws DatabaseException {
- return false;
- }
+ @Override
+ public boolean isImmutable(ReadGraph graph) {
+ return false;
+ }
@Override
public int getType() {
th = new TaskHelper("Refresh");
session.writeState = new WriteState<Object>(writer, th.writeTraits, th.sema, th.proc);
try {
- session.getQueryProvider2().performDirtyUpdates(writer);
+ session.getQueryProvider2().propagateChangesInQueryCache(writer);
session.fireMetadataListeners(writer, cs);
- session.getQueryProvider2().performScheduledUpdates(writer);
+ session.getQueryProvider2().listening.fireListeners(writer);
session.fireReactionsToSynchronize(cs);
session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
session.printDiagnostics();
@Override
public Object apply(WriteGraph graph, String args) {
try {
- return getSession(graph).queryProvider2.getListenerReport();
+ return getSession(graph).queryProvider2.listening.getListenerReport();
} catch (IOException e) {
Logger.defaultLogError(e);
return e.getMessage();
@Override
public String apply(WriteGraph graph, File file, String args) {
try {
- return getSession(graph).queryProvider2.reportListeners(file);
+ return getSession(graph).queryProvider2.listening.reportListeners(file);
} catch (IOException e) {
Logger.defaultLogError(e);
return e.getMessage();
while(dirtyPrimitives) {
dirtyPrimitives = false;
- getQueryProvider2().performDirtyUpdates(writer);
- getQueryProvider2().performScheduledUpdates(writer);
+ getQueryProvider2().propagateChangesInQueryCache(writer);
+ getQueryProvider2().listening.fireListeners(writer);
}
fireMetadataListeners(writer, clientChanges);
void stopReadTransaction() throws DatabaseException {
lock.lock();
try {
- assert (!queryProvider.hasScheduledUpdates());
+ assert (!queryProvider.listening.hasScheduledUpdates());
assert (readCount == 1);
session.writeSupport.gc();
transactionToken.stopReadTransaction();
throw new InternalException("Cancel failed. This should never happen.");
// System.out.println("session cs: " + session.clientChanges);
// System.out.println("reverse cs: " + cs);
- queryProvider.performDirtyUpdates(graph);
- queryProvider.performScheduledUpdates(graph);
+ queryProvider.propagateChangesInQueryCache(graph);
+ queryProvider.listening.fireListeners(graph);
} catch (DatabaseException e) {
Logger.defaultLogError(e);
}
// This will send and accept the reverse change set.
transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
} else {
- queryProvider.performDirtyUpdates(graph);
- queryProvider.performScheduledUpdates(graph);
+ queryProvider.propagateChangesInQueryCache(graph);
+ queryProvider.listening.fireListeners(graph);
}
}
session.writeSupport.clearMetadata();
// start = System.nanoTime();
- queryProvider.performScheduledUpdates(graph);
+ queryProvider.propagateChangesInQueryCache(graph);
+ queryProvider.listening.fireListeners(graph);
// duration = System.nanoTime() - start;
// System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
// Process updates as long as pending primitives exist
while (session.dirtyPrimitives) {
session.dirtyPrimitives = false;
- queryProvider.performDirtyUpdates(graph);
- queryProvider.performScheduledUpdates(graph);
+ queryProvider.propagateChangesInQueryCache(graph);
+ queryProvider.listening.fireListeners(graph);
}
if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
um.setTypeAndRange(false, ope.getId(), ope.getCSId());
writer.addMetadata(um);
}
- session.getQueryProvider2().performDirtyUpdates(writer);
+ session.getQueryProvider2().propagateChangesInQueryCache(writer);
session.fireMetadataListeners(writer, cs);
- session.getQueryProvider2().performScheduledUpdates(writer);
+ session.getQueryProvider2().listening.fireListeners(writer);
session.fireReactionsToSynchronize(cs);
session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
session.printDiagnostics();
// graph.state.barrier.dec();
// graph.waitAsync(request);
- queryProcessor.performDirtyUpdates(graph);
+ queryProcessor.propagateChangesInQueryCache(graph);
// Do not fire metadata listeners for virtual requests
if(graph.getProvider() == null) {
// graph.state.barrier.dec();
// graph.waitAsync(request);
- queryProcessor.performDirtyUpdates(graph);
+ queryProcessor.propagateChangesInQueryCache(graph);
// Do not fire metadata listeners for virtual requests
if(graph.getProvider() == null) {
ReadGraphImpl impl = (ReadGraphImpl)graph;
- queryProcessor.performDirtyUpdates(impl);
+ queryProcessor.propagateChangesInQueryCache(impl);
// Do not fire metadata listeners for virtual requests
if(graph.getProvider() == null) {