From 0db1af75cd14f99f16c42c920562708abf37be93 Mon Sep 17 00:00:00 2001 From: Antti Villberg Date: Thu, 12 Sep 2019 14:15:24 +0300 Subject: [PATCH] Separate query dependency management from QueryProcessor gitlab #379 Change-Id: I23afbaef397f23b3645d47ac71906736b1021044 --- .../org/simantics/db/impl/query/CodeGen.java | 29 +- .../simantics/db/impl/query/QueryCache.java | 190 +++++--- .../db/impl/query/QueryCacheBase.java | 161 ------- .../db/impl/query/QueryCollectorImpl.java | 4 +- .../db/impl/query/QueryCollectorImpl2.java | 2 +- .../db/impl/query/QueryListening.java | 404 ++++++++++++++++++ .../db/impl/query/QueryProcessor.java | 362 +--------------- .../simantics/db/impl/query/UnaryQuery.java | 2 +- .../simantics/db/impl/query/UpdateEntry.java | 17 + .../layer0/request/ResourceURIToVariable.java | 8 +- .../procore/internal/ClusterTable.java | 4 +- .../procore/internal/DebugSupportImpl.java | 4 +- .../procore/internal/SessionImplSocket.java | 4 +- .../vtt/simantics/procore/internal/State.java | 17 +- .../procore/internal/UndoRedoSupportImpl.java | 4 +- .../procore/internal/WriteSupportImpl.java | 6 +- 16 files changed, 611 insertions(+), 607 deletions(-) create mode 100644 bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java create mode 100644 bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UpdateEntry.java diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java index 66ee865a6..feac07ea2 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java @@ -94,14 +94,29 @@ public class CodeGen { line(content, " " + clazz + " entry = (" + clazz + ")cache.getOrCreate" + clazz + "(graph, " + signature.keyName + ");"); } line(content, " " + signature.procedure + " procedure_ = procedure != null ? procedure : emptyProcedure" + clazz + ";"); - line(content, " ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);"); - line(content, " if(entry.isReady()) " + (genReturn ? "return " : "") + "entry.performFromCache(graph, procedure_);"); + line(content, " if(entry.isReady()) {"); + line(content, " graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);"); + if(genReturn) { + line(content, " Object result = entry.performFromCache(graph, procedure_);"); + line(content, " graph.processor.listening.registerFirstKnown(listener, result);"); + line(content, " return result;"); + } else { + line(content, " entry.performFromCache(graph, procedure_);"); + } + line(content, " }"); line(content, " else {"); line(content, " assert(entry.isPending());"); - if(shortcut) line(content, " " + (genReturn ? "Object result = " : "") + clazz + ".computeForEach(graph, " + signature.keyName + ", entry, procedure_" + (signature.useNeedsToBlock ? ", needsToBlock" : "") + ");"); - else line(content, " entry.compute(graph, procedure_);"); - line(content, " if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());"); - if(genReturn) line(content, " return result;"); + if(genReturn) { + line(content, " graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);"); + if(shortcut) line(content, " Object result = " + clazz + ".computeForEach(graph, " + signature.keyName + ", entry, procedure_" + (signature.useNeedsToBlock ? ", needsToBlock" : "") + ");"); + else line(content, " entry.compute(graph, procedure_);"); + line(content, " graph.processor.listening.registerFirstKnown(listener, result);"); + line(content, " return result;"); + } else { + line(content, " graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);"); + if(shortcut) line(content, " " + clazz + ".computeForEach(graph, " + signature.keyName + ", entry, procedure_" + (signature.useNeedsToBlock ? ", needsToBlock" : "") + ");"); + else line(content, " entry.compute(graph, procedure_);"); + } line(content, " }"); line(content, "}"); line(content, ""); @@ -203,7 +218,7 @@ public class CodeGen { content.append("public class QueryCache extends QueryCacheBase {\n"); content.append("\n"); - line(content, "private static final boolean SINGLE = false;"); + line(content, "private static final boolean SINGLE = true;"); content.append("\n"); line(content,"public QueryCache(QuerySupport querySupport, int threads) {"); diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java index de942b90f..5c18d0510 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java @@ -68,12 +68,14 @@ public class QueryCache extends QueryCacheBase { } Objects entry = (Objects)cache.getOrCreateObjects(graph, r1,r2); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); Objects.computeForEach(graph, r1,r2, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -126,12 +128,14 @@ public class QueryCache extends QueryCacheBase { } Statements entry = (Statements)cache.getOrCreateStatements(graph, r1,r2); TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); Statements.computeForEach(graph, r1,r2, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -184,12 +188,14 @@ public class QueryCache extends QueryCacheBase { } DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph, r1,r2); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); DirectObjects.computeForEach(graph, r1,r2, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -242,12 +248,14 @@ public class QueryCache extends QueryCacheBase { } RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); RelationInfoQuery.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -300,12 +308,14 @@ public class QueryCache extends QueryCacheBase { } URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph, id); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureURIToResource; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); URIToResource.computeForEach(graph, id, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -358,12 +368,14 @@ public class QueryCache extends QueryCacheBase { } ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureValueQuery; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); ValueQuery.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -416,12 +428,14 @@ public class QueryCache extends QueryCacheBase { } OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); OrderedSet.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -474,12 +488,14 @@ public class QueryCache extends QueryCacheBase { } PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); PrincipalTypes.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -532,12 +548,14 @@ public class QueryCache extends QueryCacheBase { } DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); DirectPredicates.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -590,12 +608,14 @@ public class QueryCache extends QueryCacheBase { } Predicates entry = (Predicates)cache.getOrCreatePredicates(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedurePredicates; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); Predicates.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -663,12 +683,17 @@ public class QueryCache extends QueryCacheBase { return null; } AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureReadEntry; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) return entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + Object result = entry.performFromCache(graph, procedure_); + graph.processor.listening.registerFirstKnown(listener, result); + return result; + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); Object result = ReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerFirstKnown(listener, result); return result; } } @@ -737,12 +762,17 @@ public class QueryCache extends QueryCacheBase { return null; } AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) return entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + Object result = entry.performFromCache(graph, procedure_); + graph.processor.listening.registerFirstKnown(listener, result); + return result; + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); Object result = AsyncReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + graph.processor.listening.registerFirstKnown(listener, result); return result; } } @@ -796,12 +826,14 @@ public class QueryCache extends QueryCacheBase { } Types entry = (Types)cache.getOrCreateTypes(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureTypes; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); Types.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -854,12 +886,14 @@ public class QueryCache extends QueryCacheBase { } ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph, r); InternalProcedure> procedure_ = procedure != null ? procedure : emptyProcedureChildMap; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); ChildMap.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -912,12 +946,14 @@ public class QueryCache extends QueryCacheBase { } TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); TypeHierarchy.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -970,12 +1006,14 @@ public class QueryCache extends QueryCacheBase { } SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); SuperTypes.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -1028,12 +1066,14 @@ public class QueryCache extends QueryCacheBase { } SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); SuperRelations.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -1075,12 +1115,14 @@ public class QueryCache extends QueryCacheBase { QueryCache cache = graph.processor.cache; AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -1122,12 +1164,14 @@ public class QueryCache extends QueryCacheBase { QueryCache cache = graph.processor.cache; AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph, r1,r2); TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -1169,12 +1213,14 @@ public class QueryCache extends QueryCacheBase { QueryCache cache = graph.processor.cache; DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -1216,12 +1262,14 @@ public class QueryCache extends QueryCacheBase { QueryCache cache = graph.processor.cache; MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph, r); SyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -1263,12 +1311,14 @@ public class QueryCache extends QueryCacheBase { QueryCache cache = graph.processor.cache; AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph, r); AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } @@ -1310,12 +1360,14 @@ public class QueryCache extends QueryCacheBase { QueryCache cache = graph.processor.cache; ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph, r); AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry; - ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) { + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); + entry.performFromCache(graph, procedure_); + } else { assert(entry.isPending()); + graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false); entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java index 446e9c641..54efda839 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java @@ -70,8 +70,6 @@ public class QueryCacheBase { protected final StableHashMap asyncMultiReadEntryMap; protected final StableHashMap externalReadEntryMap; - final THashMap> listeners; - public final QuerySupport querySupport; public QueryCacheBase(QuerySupport querySupport, int threads) { @@ -104,91 +102,8 @@ public class QueryCacheBase { asyncMultiReadEntryMap = new StableHashMap(); multiReadEntryMap = new StableHashMap(); externalReadEntryMap = new StableHashMap(); - listeners = new THashMap>(10, 0.75f); } -// public Object performQuery(ReadGraphImpl parentGraph, final AsyncRead query, final CacheEntryBase entry_, AsyncProcedure procedure_) throws DatabaseException { -// -// AsyncReadEntry entry = (AsyncReadEntry)entry_; -// AsyncProcedure procedure = (AsyncProcedure)procedure_; -// -// ReadGraphImpl queryGraph = parentGraph.withParent(entry_); -// -// try { -// -// query.perform(queryGraph, new AsyncProcedure() { -// -// @Override -// public void execute(AsyncReadGraph returnGraph, T result) { -// ReadGraphImpl impl = (ReadGraphImpl)returnGraph; -// entry.addOrSet(parentGraph, result); -// try { -// procedure.execute(parentGraph, result); -// } catch (Throwable t) { -// t.printStackTrace(); -// } -//// parentBarrier.dec(query); -// } -// -// @Override -// public void exception(AsyncReadGraph returnGraph, Throwable t) { -// ReadGraphImpl impl = (ReadGraphImpl)returnGraph; -//// AsyncReadGraph resumeGraph = finalParentGraph.newAsync(); -// entry.except(parentGraph, t); -// try { -// procedure.exception(parentGraph, t); -// } catch (Throwable t2) { -// t2.printStackTrace(); -// } -//// parentBarrier.dec(query); -// } -// -// @Override -// public String toString() { -// return procedure.toString(); -// } -// -// }); -// -// } catch (Throwable t) { -// -// entry.except(t); -// try { -// procedure.exception(parentGraph, t); -// } catch (Throwable t2) { -// t2.printStackTrace(); -// } -//// parentBarrier.dec(query); -// -// } -// -// return null; -// -// } - -// public Object performQuery(ReadGraphImpl parentGraph, final Read query, final CacheEntryBase entry_, AsyncProcedure procedure_) throws DatabaseException { -// -// ReadGraphImpl queryGraph = parentGraph.withParent(entry_); -// -// ReadEntry entry = (ReadEntry)entry_; -// -// try { -// -// T result = (T)query.perform(queryGraph); -// entry.addOrSet(queryGraph, result); -// -// return (T)entry.get(parentGraph, procedure_); -// -// } catch (Throwable t) { -// -// entry.except(t); -// return (T)entry.get(parentGraph, procedure_); -// -// } -// -// } - - public Object performQuery(ReadGraphImpl parentGraph, final AsyncMultiRead query, final CacheEntryBase entry_, Object procedure_) throws DatabaseException { ReadGraphImpl queryGraph = parentGraph.withParent(entry_); @@ -315,82 +230,6 @@ public class QueryCacheBase { } - public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { - - if (parent != null && !inferred) { - try { - if(!child.isImmutable(graph)) { - synchronized(child) { - child.addParent(parent); - } - } - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { - System.err.println(child + " -> " + parent); - } - } - } - - if (listener != null) { - return registerListener(child, listener, procedure); - } else { - return null; - } - - } - - public synchronized ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { - - assert (entry != null); - - if (base.isDisposed()) - return null; - - return addListener(entry, base, procedure); - - } - - protected void primeListenerEntry(final ListenerEntry entry, final Object result) { - entry.setLastKnown(result); - } - - private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { - - assert (entry != null); - assert (procedure != null); - - ArrayList list = listeners.get(entry); - if (list == null) { - list = new ArrayList(1); - listeners.put(entry, list); - } - - ListenerEntry result = new ListenerEntry(entry, base, procedure); - int currentIndex = list.indexOf(result); - // There was already a listener - if(currentIndex > -1) { - ListenerEntry current = list.get(currentIndex); - if(!current.base.isDisposed()) return null; - list.set(currentIndex, result); - } else { - list.add(result); - } - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); - } - } - - return result; - - } - - public Collection getRootList() { ArrayList result = new ArrayList(); diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCollectorImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCollectorImpl.java index 965f1e39f..7bab1a77a 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCollectorImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCollectorImpl.java @@ -160,7 +160,7 @@ class QueryCollectorImpl implements QueryProcessor.QueryCollector { } } - if(queryProcessor.hasListenerAfterDisposing(entry)) { + if(queryProcessor.listening.hasListenerAfterDisposing(entry)) { if((status & CacheEntry.HAS_BEEN_BOUND) == 0) { queryProcessor.boundQueries++; entry.setGCStatusFlag(CacheEntry.HAS_BEEN_BOUND, true); @@ -176,7 +176,7 @@ class QueryCollectorImpl implements QueryProcessor.QueryCollector { } private boolean tryCollect(CacheEntry entry) { - if (!queryProcessor.hasListenerAfterDisposing(entry)) + if (!queryProcessor.listening.hasListenerAfterDisposing(entry)) if(entry.shouldBeCollected()) { queryProcessor.removeQuery(entry); support.remove(); diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCollectorImpl2.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCollectorImpl2.java index 0efa49f56..b0e633070 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCollectorImpl2.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCollectorImpl2.java @@ -47,7 +47,7 @@ class QueryCollectorImpl2 implements QueryProcessor.QueryCollector { return false; } - if (this.queryProcessor.hasListenerAfterDisposing(entry)) { + if (this.queryProcessor.listening.hasListenerAfterDisposing(entry)) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYCOLLECTOR, Bindings.BOOLEAN)) { System.err.println("GC: listened entry " + entry + " was not collected. Entry=" + entry); diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java new file mode 100644 index 000000000..e524a080a --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java @@ -0,0 +1,404 @@ +package org.simantics.db.impl.query; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; + +import org.simantics.databoard.Bindings; +import org.simantics.db.DevelopmentKeys; +import org.simantics.db.debug.ListenerReport; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.impl.graph.ReadGraphImpl; +import org.simantics.db.impl.graph.WriteGraphImpl; +import org.simantics.db.procedure.ListenerBase; +import org.simantics.utils.Development; +import org.simantics.utils.datastructures.Pair; +import org.simantics.utils.datastructures.collections.CollectionUtils; +import org.slf4j.LoggerFactory; + +import gnu.trove.map.hash.THashMap; +import gnu.trove.set.hash.THashSet; + +public class QueryListening { + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); + + final private QueryProcessor processor; + private THashSet scheduledListeners = new THashSet(); + private boolean firingListeners = false; + final THashMap> listeners = new THashMap>(10, 0.75f); + private BlockingQueue tasks = new LinkedBlockingQueue(); + private Map addedEntries = new HashMap<>(); + + QueryListening(QueryProcessor processor) { + this.processor = processor; + new DependencyManagementThread(processor, tasks).start(); + } + + public void sync() { + Semaphore s = new Semaphore(0); + tasks.add(() -> { + s.release(); + }); + try { + s.acquire(); + } catch (Throwable t) { + LOGGER.error("Error while waiting for query dependency management", t); + } + } + + static class DependencyManagementThread extends Thread { + + final private QueryProcessor processor; + final BlockingQueue tasks; + + DependencyManagementThread(QueryProcessor processor, BlockingQueue tasks) { + setName("Query Dependency Manager"); + this.processor = processor; + this.tasks = tasks; + } + + @Override + public void run() { + while(processor.isAlive()) { + try { + Runnable r = tasks.take(); + r.run(); + } catch (Throwable t) { + // Spurious problems? + LOGGER.error("Error while waiting for query dependency management tasks", t); + } + } + } + + } + + public boolean hasScheduledUpdates() { + return !scheduledListeners.isEmpty(); + } + + void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { + + tasks.offer(() -> { + + if (parent != null && !inferred) { + try { + if(!child.isImmutable(graph)) + child.addParent(parent); + } catch (DatabaseException e) { + LOGGER.error("Error while registering query dependencies", e); + } + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { + System.out.println(child + " -> " + parent); + } + } + } + + if (listener != null) + registerListener(child, listener, procedure); + + }); + + } + + void registerFirstKnown(ListenerBase base, Object result) { + + tasks.offer(() -> { + + ListenerEntry entry = addedEntries.get(base); + if(entry != null) entry.setLastKnown(result); + + }); + + } + + public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { + + assert (entry != null); + + if (base.isDisposed()) + return null; + + return addListener(entry, base, procedure); + + } + + /* + * Registers a listener and returns an entry iff the entry was added + */ + private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { + + assert (entry != null); + assert (procedure != null); + + ArrayList list = listeners.get(entry); + if (list == null) { + list = new ArrayList<>(1); + listeners.put(entry, list); + } + + ListenerEntry result = new ListenerEntry(entry, base, procedure); + // Equals is here based on base + int currentIndex = list.indexOf(result); + // There was already a listener + if(currentIndex > -1) { + ListenerEntry current = list.get(currentIndex); + if(!current.base.isDisposed()) return null; + list.set(currentIndex, result); + } else { + list.add(result); + } + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); + } + } + + addedEntries.put(base, result); + + return result; + + } + + void scheduleListener(ListenerEntry entry) { + assert (entry != null); + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Scheduled " + entry.procedure); + } + } + scheduledListeners.add(entry); + } + + private void removeListener(ListenerEntry entry) { + assert (entry != null); + ArrayList list = listeners.get(entry.entry); + if(list == null) return; + boolean success = list.remove(entry); + assert (success); + if (list.isEmpty()) + listeners.remove(entry.entry); + } + + boolean hasListener(CacheEntry entry) { + if(listeners.get(entry) != null) return true; + return false; + } + + boolean hasListenerAfterDisposing(CacheEntry entry) { + if(listeners.get(entry) != null) { + ArrayList entries = listeners.get(entry); + ArrayList list = null; + for (ListenerEntry e : entries) { + if (e.base.isDisposed()) { + if(list == null) list = new ArrayList(); + list.add(e); + } + } + if(list != null) { + for (ListenerEntry e : list) { + entries.remove(e); + } + } + if (entries.isEmpty()) { + listeners.remove(entry); + return false; + } + return true; + } + return false; + } + + List getListenerEntries(CacheEntry entry) { + hasListenerAfterDisposing(entry); + if(listeners.get(entry) != null) + return listeners.get(entry); + else + return Collections.emptyList(); + } + + void processListenerReport(CacheEntry entry, Map> workarea) { + + if(!workarea.containsKey(entry)) { + + HashSet ls = new HashSet(); + for(ListenerEntry e : getListenerEntries(entry)) + ls.add(e.base); + + workarea.put(entry, ls); + + for(CacheEntry parent : entry.getParents(processor)) { + processListenerReport(parent, workarea); + ls.addAll(workarea.get(parent)); + } + + } + + } + + public synchronized ListenerReport getListenerReport() throws IOException { + + class ListenerReportImpl implements ListenerReport { + + Map> workarea = new HashMap>(); + + @Override + public void print(PrintStream b) { + Map hist = new HashMap(); + for(Map.Entry> e : workarea.entrySet()) { + for(ListenerBase l : e.getValue()) { + Integer i = hist.get(l); + hist.put(l, i != null ? i-1 : -1); + } + } + + for(Pair p : CollectionUtils.valueSortedEntries(hist)) { + b.print("" + -p.second + " " + p.first + "\n"); + } + + b.flush(); + } + + } + + ListenerReportImpl result = new ListenerReportImpl(); + + Collection all = processor.allCaches(new CacheCollectionResult()).toCollection(); + for(CacheEntryBase entry : all) { + hasListenerAfterDisposing(entry); + } + for(CacheEntryBase entry : all) { + processListenerReport(entry, result.workarea); + } + + return result; + + } + + public synchronized String reportListeners(File file) throws IOException { + + if (!processor.isAlive()) + return "Disposed!"; + + PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); + ListenerReport report = getListenerReport(); + report.print(b); + + return "Done reporting listeners."; + + } + + public void fireListeners(WriteGraphImpl graph) { + + assert (!processor.updating); + assert (!processor.cache.collecting); + assert (!firingListeners); + + firingListeners = true; + + try { + + // Performing may cause further events to be scheduled. + while (!scheduledListeners.isEmpty()) { + + // Clone current events to make new entries possible during + // firing. + THashSet entries = scheduledListeners; + scheduledListeners = new THashSet(); + + ArrayList schedule = new ArrayList(); + + for (ListenerEntry listenerEntry : entries) { + + if (pruneListener(listenerEntry)) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("Pruned " + listenerEntry.procedure); + } + } + continue; + } + + final CacheEntry entry = listenerEntry.entry; + assert (entry != null); + + Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown()); + + if (newValue != ListenerEntry.NOT_CHANGED) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); + } + } + schedule.add(listenerEntry); + listenerEntry.setLastKnown(entry.getResult()); + } + + } + + for(ListenerEntry listenerEntry : schedule) { + final CacheEntry entry = listenerEntry.entry; + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Firing " + listenerEntry.procedure); + } + } + try { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); + } + } + entry.performFromCache(graph, listenerEntry.procedure); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + } + + } finally { + firingListeners = false; + } + + } + + void updateParents(int indent, CacheEntry entry, LinkedList todo) { + + Iterable oldParents = entry.getParents(processor); + for (CacheEntry parent : oldParents) { + if(!parent.isDiscarded()) + todo.push(new UpdateEntry(entry, parent, indent + 2)); + } + + } + + private boolean pruneListener(ListenerEntry entry) { + if (entry.base.isDisposed()) { + removeListener(entry); + return true; + } else { + return false; + } + } + +} diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 9b54d15f6..7bfa0f4c6 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -44,7 +44,6 @@ import org.simantics.db.Statement; import org.simantics.db.VirtualGraph; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.utils.Logger; -import org.simantics.db.debug.ListenerReport; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ManyObjectsForFunctionalRelationException; import org.simantics.db.exception.NoInverseException; @@ -53,7 +52,6 @@ import org.simantics.db.impl.ResourceImpl; import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.ReadGraphSupport; -import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.procedure.IntProcedureAdapter; import org.simantics.db.impl.procedure.InternalProcedure; import org.simantics.db.impl.procedure.TripleIntProcedureAdapter; @@ -130,17 +128,15 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap AtomicInteger sleepers = new AtomicInteger(0); - private boolean updating = false; + boolean updating = false; - private boolean firingListeners = false; - final public QueryCache cache; final public QuerySupport querySupport; final public Session session; final public ResourceSupport resourceSupport; - - private THashSet scheduledListeners = new THashSet(); + + final public QueryListening listening = new QueryListening(this); QueryThread[] executors; @@ -504,7 +500,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } final public void releaseWrite(ReadGraphImpl graph) { - performDirtyUpdates(graph); + propagateChangesInQueryCache(graph); modificationCounter++; } @@ -699,35 +695,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap boolean isBound(ExternalReadEntry entry) { if(entry.hasParents()) return true; - else if(hasListener(entry)) return true; + else if(listening.hasListener(entry)) return true; else return false; } - synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { - - if (parent != null && !inferred) { - try { - if(!child.isImmutable(graph)) - child.addParent(parent); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { - System.out.println(child + " -> " + parent); - } - } - } - - if (listener != null) { - return registerListener(child, listener, procedure); - } else { - return null; - } - - } - - static class Dummy implements InternalProcedure, IntProcedure { @Override @@ -929,181 +900,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { - - assert (entry != null); - - if (base.isDisposed()) - return null; - - return addListener(entry, base, procedure); - - } - - private void primeListenerEntry(final ListenerEntry entry, final Object result) { - entry.setLastKnown(result); - } - - private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { - - assert (entry != null); - assert (procedure != null); - - ArrayList list = cache.listeners.get(entry); - if (list == null) { - list = new ArrayList(1); - cache.listeners.put(entry, list); - } - - ListenerEntry result = new ListenerEntry(entry, base, procedure); - int currentIndex = list.indexOf(result); - // There was already a listener - if(currentIndex > -1) { - ListenerEntry current = list.get(currentIndex); - if(!current.base.isDisposed()) return null; - list.set(currentIndex, result); - } else { - list.add(result); - } - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); - } - } - - return result; - - } - - private void scheduleListener(ListenerEntry entry) { - assert (entry != null); - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Scheduled " + entry.procedure); - } - } - scheduledListeners.add(entry); - } - - private void removeListener(ListenerEntry entry) { - assert (entry != null); - ArrayList list = cache.listeners.get(entry.entry); - if(list == null) return; - boolean success = list.remove(entry); - assert (success); - if (list.isEmpty()) - cache.listeners.remove(entry.entry); - } - - private boolean hasListener(CacheEntry entry) { - if(cache.listeners.get(entry) != null) return true; - return false; - } - - boolean hasListenerAfterDisposing(CacheEntry entry) { - if(cache.listeners.get(entry) != null) { - ArrayList entries = cache.listeners.get(entry); - ArrayList list = null; - for (ListenerEntry e : entries) { - if (e.base.isDisposed()) { - if(list == null) list = new ArrayList(); - list.add(e); - } - } - if(list != null) { - for (ListenerEntry e : list) { - entries.remove(e); - } - } - if (entries.isEmpty()) { - cache.listeners.remove(entry); - return false; - } - return true; - } - return false; - } - - List getListenerEntries(CacheEntry entry) { - hasListenerAfterDisposing(entry); - if(cache.listeners.get(entry) != null) - return cache.listeners.get(entry); - else - return Collections.emptyList(); - } - - void processListenerReport(CacheEntry entry, Map> workarea) { - - if(!workarea.containsKey(entry)) { - - HashSet ls = new HashSet(); - for(ListenerEntry e : getListenerEntries(entry)) - ls.add(e.base); - - workarea.put(entry, ls); - - for(CacheEntry parent : entry.getParents(this)) { - processListenerReport(parent, workarea); - ls.addAll(workarea.get(parent)); - } - - } - - } - - public synchronized ListenerReport getListenerReport() throws IOException { - - class ListenerReportImpl implements ListenerReport { - - Map> workarea = new HashMap>(); - - @Override - public void print(PrintStream b) { - Map hist = new HashMap(); - for(Map.Entry> e : workarea.entrySet()) { - for(ListenerBase l : e.getValue()) { - Integer i = hist.get(l); - hist.put(l, i != null ? i-1 : -1); - } - } - - for(Pair p : CollectionUtils.valueSortedEntries(hist)) { - b.print("" + -p.second + " " + p.first + "\n"); - } - - b.flush(); - } - - } - - ListenerReportImpl result = new ListenerReportImpl(); - - Collection all = allCaches(new CacheCollectionResult()).toCollection(); - for(CacheEntryBase entry : all) { - hasListenerAfterDisposing(entry); - } - for(CacheEntryBase entry : all) { - processListenerReport(entry, result.workarea); - } - - return result; - - } - - public synchronized String reportListeners(File file) throws IOException { - - if (!isAlive()) - return "Disposed!"; - - PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); - ListenerReport report = getListenerReport(); - report.print(b); - - return "Done reporting listeners."; - - } void processParentReport(CacheEntry entry, Map> workarea) { @@ -1176,7 +972,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap int listeners = 0; for(CacheEntry entry : workarea.keySet()) { - boolean listener = hasListenerAfterDisposing(entry); + boolean listener = listening.hasListenerAfterDisposing(entry); boolean hasParents = entry.getParents(this).iterator().hasNext(); if(listener) { // Bound @@ -1340,7 +1136,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap b.print("// Entry parent listing\n"); for(CacheEntry entry : workarea.keySet()) { int status = flagMap.get(entry); - boolean hasListener = hasListenerAfterDisposing(entry); + boolean hasListener = listening.hasListenerAfterDisposing(entry); b.print("Q " + entry.toString()); if(hasListener) { b.print(" (L" + status + ")"); @@ -1386,22 +1182,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - class UpdateEntry { - - public CacheEntry caller; - - public CacheEntry entry; - - public int indent; - - public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) { - this.caller = caller; - this.entry = entry; - this.indent = indent; - } - - }; - boolean removeQuery(CacheEntry entry) { // This entry has been removed before. No need to do anything here. @@ -1494,11 +1274,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Query query = entry.getQuery(); int type = query.type(); - boolean hasListener = hasListener(entry); + boolean hasListener = listening.hasListener(entry); if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - if(hasListener(entry)) { + if(listening.hasListener(entry)) { System.err.println(" (L)"); } else { System.err.println(""); @@ -1545,43 +1325,23 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // System.err.println(" => FOO " + type); if (hasListener) { - ArrayList entries = cache.listeners.get(entry); + ArrayList entries = listening.listeners.get(entry); if(entries != null) { for (ListenerEntry le : entries) { - scheduleListener(le); + listening.scheduleListener(le); } } } // If invalid, update parents if (type == RequestFlags.INVALIDATE) { - updateParents(e.indent, entry, todo); + listening.updateParents(e.indent, entry, todo); } return hasListener; } - private void updateParents(int indent, CacheEntry entry, LinkedList todo) { - - Iterable oldParents = entry.getParents(this); - for (CacheEntry parent : oldParents) { -// System.err.println("updateParents " + entry + " => " + parent); - if(!parent.isDiscarded()) - todo.push(new UpdateEntry(entry, parent, indent + 2)); - } - - } - - private boolean pruneListener(ListenerEntry entry) { - if (entry.base.isDisposed()) { - removeListener(entry); - return true; - } else { - return false; - } - } - /** * @param av1 an array (guaranteed) * @param av2 any object @@ -1617,7 +1377,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap - final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) { + final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) { try { @@ -1681,93 +1441,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - public boolean hasScheduledUpdates() { - return !scheduledListeners.isEmpty(); - } - - public void performScheduledUpdates(WriteGraphImpl graph) { - - assert (!updating); - assert (!cache.collecting); - assert (!firingListeners); - - firingListeners = true; - - try { - - // Performing may cause further events to be scheduled. - while (!scheduledListeners.isEmpty()) { - -// graph.restart(); -// graph.state.barrier.inc(); - - // Clone current events to make new entries possible during - // firing. - THashSet entries = scheduledListeners; - scheduledListeners = new THashSet(); - - ArrayList schedule = new ArrayList(); - - for (ListenerEntry listenerEntry : entries) { - - if (pruneListener(listenerEntry)) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Pruned " + listenerEntry.procedure); - } - } - continue; - } - - final CacheEntry entry = listenerEntry.entry; - assert (entry != null); - - Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown()); - - if (newValue != ListenerEntry.NOT_CHANGED) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); - } - } - schedule.add(listenerEntry); - listenerEntry.setLastKnown(entry.getResult()); - } - - } - - for(ListenerEntry listenerEntry : schedule) { - final CacheEntry entry = listenerEntry.entry; - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure); - } - } - try { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); - } - } - entry.performFromCache(graph, listenerEntry.procedure); - } catch (Throwable t) { - t.printStackTrace(); - } - } - -// graph.state.barrier.dec(); -// graph.waitAsync(null); -// graph.state.barrier.assertReady(); - - } - - } finally { - firingListeners = false; - } - - } /** * @@ -1810,7 +1483,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE); if (newValue != ListenerEntry.NOT_CHANGED) - updateParents(0, immediate, todo); + listening.updateParents(0, immediate, todo); } else { @@ -1818,7 +1491,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Object newValue = compareTo(graph, immediate, oldValue); if (newValue != ListenerEntry.NOT_CHANGED) { - updateParents(0, immediate, todo); + listening.updateParents(0, immediate, todo); } else { // If not changed, keep the old value immediate.setResult(oldValue); @@ -1867,7 +1540,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - public void performDirtyUpdates(final ReadGraphImpl graph) { + public void propagateChangesInQueryCache(final ReadGraphImpl graph) { + + // Make sure that listening has performed its work + listening.sync(); cache.dirty = false; lastInvalidate = 0; diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UnaryQuery.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UnaryQuery.java index 26eff457d..fd6cd7d87 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UnaryQuery.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UnaryQuery.java @@ -65,7 +65,7 @@ public abstract class UnaryQuery extends CacheEntryBase im @Override boolean isImmutable(ReadGraphImpl graph) throws DatabaseException { - return graph.processor.isImmutable(id); + return graph.processor.isImmutable(id); } } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UpdateEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UpdateEntry.java new file mode 100644 index 000000000..88994250a --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UpdateEntry.java @@ -0,0 +1,17 @@ +package org.simantics.db.impl.query; + +class UpdateEntry { + + public CacheEntry caller; + + public CacheEntry entry; + + public int indent; + + public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) { + this.caller = caller; + this.entry = entry; + this.indent = indent; + } + +} \ No newline at end of file diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/request/ResourceURIToVariable.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/request/ResourceURIToVariable.java index b49d270aa..2e664d539 100644 --- a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/request/ResourceURIToVariable.java +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/request/ResourceURIToVariable.java @@ -40,10 +40,10 @@ public class ResourceURIToVariable extends UnaryRead implement } - @Override - public boolean isImmutable(ReadGraph graph) throws DatabaseException { - return false; - } + @Override + public boolean isImmutable(ReadGraph graph) { + return false; + } @Override public int getType() { diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java index 07df23df8..200e9e3c1 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java @@ -781,9 +781,9 @@ public final class ClusterTable implements IClusterTable { th = new TaskHelper("Refresh"); session.writeState = new WriteState(writer, th.writeTraits, th.sema, th.proc); try { - session.getQueryProvider2().performDirtyUpdates(writer); + session.getQueryProvider2().propagateChangesInQueryCache(writer); session.fireMetadataListeners(writer, cs); - session.getQueryProvider2().performScheduledUpdates(writer); + session.getQueryProvider2().listening.fireListeners(writer); session.fireReactionsToSynchronize(cs); session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES); session.printDiagnostics(); diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/DebugSupportImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/DebugSupportImpl.java index f788a1244..3b05bcba6 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/DebugSupportImpl.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/DebugSupportImpl.java @@ -69,7 +69,7 @@ public class DebugSupportImpl implements DebugSupport { @Override public Object apply(WriteGraph graph, String args) { try { - return getSession(graph).queryProvider2.getListenerReport(); + return getSession(graph).queryProvider2.listening.getListenerReport(); } catch (IOException e) { Logger.defaultLogError(e); return e.getMessage(); @@ -125,7 +125,7 @@ public class DebugSupportImpl implements DebugSupport { @Override public String apply(WriteGraph graph, File file, String args) { try { - return getSession(graph).queryProvider2.reportListeners(file); + return getSession(graph).queryProvider2.listening.reportListeners(file); } catch (IOException e) { Logger.defaultLogError(e); return e.getMessage(); diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index d32c066bc..b98b4dec6 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -3542,8 +3542,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule while(dirtyPrimitives) { dirtyPrimitives = false; - getQueryProvider2().performDirtyUpdates(writer); - getQueryProvider2().performScheduledUpdates(writer); + getQueryProvider2().propagateChangesInQueryCache(writer); + getQueryProvider2().listening.fireListeners(writer); } fireMetadataListeners(writer, clientChanges); diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java index e0d8eee8a..4c6e77b1a 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java @@ -194,7 +194,7 @@ class State { void stopReadTransaction() throws DatabaseException { lock.lock(); try { - assert (!queryProvider.hasScheduledUpdates()); + assert (!queryProvider.listening.hasScheduledUpdates()); assert (readCount == 1); session.writeSupport.gc(); transactionToken.stopReadTransaction(); @@ -290,16 +290,16 @@ class State { throw new InternalException("Cancel failed. This should never happen."); // System.out.println("session cs: " + session.clientChanges); // System.out.println("reverse cs: " + cs); - queryProvider.performDirtyUpdates(graph); - queryProvider.performScheduledUpdates(graph); + queryProvider.propagateChangesInQueryCache(graph); + queryProvider.listening.fireListeners(graph); } catch (DatabaseException e) { Logger.defaultLogError(e); } // This will send and accept the reverse change set. transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream); } else { - queryProvider.performDirtyUpdates(graph); - queryProvider.performScheduledUpdates(graph); + queryProvider.propagateChangesInQueryCache(graph); + queryProvider.listening.fireListeners(graph); } } session.writeSupport.clearMetadata(); @@ -339,15 +339,16 @@ class State { // start = System.nanoTime(); - queryProvider.performScheduledUpdates(graph); + queryProvider.propagateChangesInQueryCache(graph); + queryProvider.listening.fireListeners(graph); // duration = System.nanoTime() - start; // System.out.println("performScheduledUpdates " + 1e-9*duration + "s. "); // Process updates as long as pending primitives exist while (session.dirtyPrimitives) { session.dirtyPrimitives = false; - queryProvider.performDirtyUpdates(graph); - queryProvider.performScheduledUpdates(graph); + queryProvider.propagateChangesInQueryCache(graph); + queryProvider.listening.fireListeners(graph); } if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please! diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java index f2061cd42..e492f6833 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java @@ -93,9 +93,9 @@ public class UndoRedoSupportImpl implements UndoRedoSupport { um.setTypeAndRange(false, ope.getId(), ope.getCSId()); writer.addMetadata(um); } - session.getQueryProvider2().performDirtyUpdates(writer); + session.getQueryProvider2().propagateChangesInQueryCache(writer); session.fireMetadataListeners(writer, cs); - session.getQueryProvider2().performScheduledUpdates(writer); + session.getQueryProvider2().listening.fireListeners(writer); session.fireReactionsToSynchronize(cs); session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES); session.printDiagnostics(); diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/WriteSupportImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/WriteSupportImpl.java index aff7e17ac..ea4319ace 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/WriteSupportImpl.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/WriteSupportImpl.java @@ -261,7 +261,7 @@ public class WriteSupportImpl implements WriteSupport { // graph.state.barrier.dec(); // graph.waitAsync(request); - queryProcessor.performDirtyUpdates(graph); + queryProcessor.propagateChangesInQueryCache(graph); // Do not fire metadata listeners for virtual requests if(graph.getProvider() == null) { @@ -291,7 +291,7 @@ public class WriteSupportImpl implements WriteSupport { // graph.state.barrier.dec(); // graph.waitAsync(request); - queryProcessor.performDirtyUpdates(graph); + queryProcessor.propagateChangesInQueryCache(graph); // Do not fire metadata listeners for virtual requests if(graph.getProvider() == null) { @@ -318,7 +318,7 @@ public class WriteSupportImpl implements WriteSupport { ReadGraphImpl impl = (ReadGraphImpl)graph; - queryProcessor.performDirtyUpdates(impl); + queryProcessor.propagateChangesInQueryCache(impl); // Do not fire metadata listeners for virtual requests if(graph.getProvider() == null) { -- 2.43.2