Separate query dependency management from QueryProcessor 49/3249/9
authorAntti Villberg <antti.villberg@semantum.fi>
Thu, 12 Sep 2019 11:15:24 +0000 (14:15 +0300)
committerAntti Villberg <antti.villberg@semantum.fi>
Tue, 17 Sep 2019 08:37:12 +0000 (11:37 +0300)
gitlab #379

Change-Id: I23afbaef397f23b3645d47ac71906736b1021044

16 files changed:
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCollectorImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCollectorImpl2.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java [new file with mode: 0644]
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UnaryQuery.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UpdateEntry.java [new file with mode: 0644]
bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/request/ResourceURIToVariable.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/ClusterTable.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/DebugSupportImpl.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/State.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/WriteSupportImpl.java

index 66ee865a6ed3355ee7eefd0571d7739235fde231..feac07ea284625f9f20a7d775faa08dba62598af 100644 (file)
@@ -94,14 +94,29 @@ public class CodeGen {
             line(content, "    " + clazz + " entry = (" + clazz + ")cache.getOrCreate" + clazz + "(graph, " + signature.keyName + ");");
         }
         line(content, "    " + signature.procedure + " procedure_ = procedure != null ? procedure : emptyProcedure" + clazz + ";");
-        line(content, "    ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);");
-        line(content, "    if(entry.isReady()) " + (genReturn ? "return " : "") + "entry.performFromCache(graph, procedure_);");
+        line(content, "    if(entry.isReady()) {");
+        line(content, "      graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);");
+        if(genReturn) {
+            line(content, "      Object result = entry.performFromCache(graph, procedure_);");
+            line(content, "      graph.processor.listening.registerFirstKnown(listener, result);");
+            line(content, "      return result;");
+        } else {
+            line(content, "      entry.performFromCache(graph, procedure_);");
+        }
+        line(content, "    }");
         line(content, "    else {");
         line(content, "      assert(entry.isPending());");
-        if(shortcut) line(content, "      " + (genReturn ? "Object result = " : "") + clazz + ".computeForEach(graph, " + signature.keyName + ", entry, procedure_" + (signature.useNeedsToBlock ? ", needsToBlock" : "") + ");");
-        else line(content, "      entry.compute(graph, procedure_);"); 
-        line(content, "      if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());");
-        if(genReturn) line(content, "      return result;");
+        if(genReturn) {
+            line(content, "      graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);");
+            if(shortcut) line(content, "      Object result = " + clazz + ".computeForEach(graph, " + signature.keyName + ", entry, procedure_" + (signature.useNeedsToBlock ? ", needsToBlock" : "") + ");");
+            else line(content, "      entry.compute(graph, procedure_);"); 
+            line(content, "      graph.processor.listening.registerFirstKnown(listener, result);");
+            line(content, "      return result;");
+        } else {
+            line(content, "      graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);");
+            if(shortcut) line(content, "      " + clazz + ".computeForEach(graph, " + signature.keyName + ", entry, procedure_" + (signature.useNeedsToBlock ? ", needsToBlock" : "") + ");");
+            else line(content, "      entry.compute(graph, procedure_);"); 
+        }
         line(content, "    }");
         line(content, "}");
         line(content, "");
@@ -203,7 +218,7 @@ public class CodeGen {
                     content.append("public class QueryCache extends QueryCacheBase {\n");
                     content.append("\n");
 
-                    line(content, "private static final boolean SINGLE = false;");
+                    line(content, "private static final boolean SINGLE = true;");
                     content.append("\n");
 
                     line(content,"public QueryCache(QuerySupport querySupport, int threads) {");
index de942b90f8611c3fd01227c0acf6f7740e74dc4e..5c18d051089ef6026ce58048947bd82ae3e4f9b2 100644 (file)
@@ -68,12 +68,14 @@ public class QueryCache extends QueryCacheBase {
         }
         Objects entry = (Objects)cache.getOrCreateObjects(graph, r1,r2);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           Objects.computeForEach(graph, r1,r2, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -126,12 +128,14 @@ public class QueryCache extends QueryCacheBase {
         }
         Statements entry = (Statements)cache.getOrCreateStatements(graph, r1,r2);
         TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           Statements.computeForEach(graph, r1,r2, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -184,12 +188,14 @@ public class QueryCache extends QueryCacheBase {
         }
         DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph, r1,r2);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           DirectObjects.computeForEach(graph, r1,r2, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -242,12 +248,14 @@ public class QueryCache extends QueryCacheBase {
         }
         RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph, r);
         InternalProcedure<RelationInfo> procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           RelationInfoQuery.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -300,12 +308,14 @@ public class QueryCache extends QueryCacheBase {
         }
         URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph, id);
         InternalProcedure<Integer> procedure_ = procedure != null ? procedure : emptyProcedureURIToResource;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           URIToResource.computeForEach(graph, id, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -358,12 +368,14 @@ public class QueryCache extends QueryCacheBase {
         }
         ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph, r);
         InternalProcedure<byte[]> procedure_ = procedure != null ? procedure : emptyProcedureValueQuery;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           ValueQuery.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -416,12 +428,14 @@ public class QueryCache extends QueryCacheBase {
         }
         OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           OrderedSet.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -474,12 +488,14 @@ public class QueryCache extends QueryCacheBase {
         }
         PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           PrincipalTypes.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -532,12 +548,14 @@ public class QueryCache extends QueryCacheBase {
         }
         DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           DirectPredicates.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -590,12 +608,14 @@ public class QueryCache extends QueryCacheBase {
         }
         Predicates entry = (Predicates)cache.getOrCreatePredicates(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedurePredicates;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           Predicates.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -663,12 +683,17 @@ public class QueryCache extends QueryCacheBase {
           return null;
         }
         AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureReadEntry;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) return entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          Object result = entry.performFromCache(graph, procedure_);
+          graph.processor.listening.registerFirstKnown(listener, result);
+          return result;
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           Object result = ReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          graph.processor.listening.registerFirstKnown(listener, result);
           return result;
         }
     }
@@ -737,12 +762,17 @@ public class QueryCache extends QueryCacheBase {
           return null;
         }
         AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) return entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          Object result = entry.performFromCache(graph, procedure_);
+          graph.processor.listening.registerFirstKnown(listener, result);
+          return result;
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           Object result = AsyncReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          graph.processor.listening.registerFirstKnown(listener, result);
           return result;
         }
     }
@@ -796,12 +826,14 @@ public class QueryCache extends QueryCacheBase {
         }
         Types entry = (Types)cache.getOrCreateTypes(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypes;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           Types.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -854,12 +886,14 @@ public class QueryCache extends QueryCacheBase {
         }
         ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph, r);
         InternalProcedure<ObjectResourceIdMap<String>> procedure_ = procedure != null ? procedure : emptyProcedureChildMap;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           ChildMap.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -912,12 +946,14 @@ public class QueryCache extends QueryCacheBase {
         }
         TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           TypeHierarchy.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -970,12 +1006,14 @@ public class QueryCache extends QueryCacheBase {
         }
         SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           SuperTypes.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1028,12 +1066,14 @@ public class QueryCache extends QueryCacheBase {
         }
         SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           SuperRelations.computeForEach(graph, r, entry, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1075,12 +1115,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1122,12 +1164,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph, r1,r2);
         TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1169,12 +1213,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1216,12 +1262,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph, r);
         SyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1263,12 +1311,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph, r);
         AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
@@ -1310,12 +1360,14 @@ public class QueryCache extends QueryCacheBase {
         QueryCache cache  = graph.processor.cache;
         ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph, r);
         AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry;
-        ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) {
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
+          entry.performFromCache(graph, procedure_);
+        }
         else {
           assert(entry.isPending());
+          graph.processor.listening.registerDependencies(graph, entry, parent, listener, procedure_, false);
           entry.compute(graph, procedure_);
-          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
index 446e9c641406464de62e4cb484c66d02df4ca70f..54efda83976d8255f549ec0cd1cf41df8fc07e6c 100644 (file)
@@ -70,8 +70,6 @@ public class QueryCacheBase {
        protected final StableHashMap<AsyncMultiRead, AsyncMultiReadEntry>        asyncMultiReadEntryMap; 
        protected final StableHashMap<ExternalRead, ExternalReadEntry>            externalReadEntryMap; 
 
-       final THashMap<CacheEntry, ArrayList<ListenerEntry>>                      listeners;
-
        public final QuerySupport                                                 querySupport;
 
        public QueryCacheBase(QuerySupport querySupport, int threads) {
@@ -104,91 +102,8 @@ public class QueryCacheBase {
                asyncMultiReadEntryMap = new StableHashMap<AsyncMultiRead, AsyncMultiReadEntry>(); 
                multiReadEntryMap = new StableHashMap<MultiRead, MultiReadEntry>(); 
                externalReadEntryMap = new StableHashMap<ExternalRead, ExternalReadEntry>(); 
-               listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
        }
 
-//     public <T> Object performQuery(ReadGraphImpl parentGraph, final AsyncRead<T> query, final CacheEntryBase entry_, AsyncProcedure procedure_) throws DatabaseException {
-//
-//             AsyncReadEntry<T> entry = (AsyncReadEntry<T>)entry_;
-//             AsyncProcedure<T> procedure = (AsyncProcedure<T>)procedure_;
-//
-//             ReadGraphImpl queryGraph = parentGraph.withParent(entry_);
-//
-//             try {
-//                     
-//                     query.perform(queryGraph, new AsyncProcedure<T>() {
-//
-//                             @Override
-//                             public void execute(AsyncReadGraph returnGraph, T result) {
-//                                     ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
-//                                     entry.addOrSet(parentGraph, result);
-//                                     try {
-//                                             procedure.execute(parentGraph, result);
-//                                     } catch (Throwable t) {
-//                                             t.printStackTrace();
-//                                     }
-////                                   parentBarrier.dec(query);
-//                             }
-//
-//                             @Override
-//                             public void exception(AsyncReadGraph returnGraph, Throwable t) {
-//                                     ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
-////                                   AsyncReadGraph resumeGraph = finalParentGraph.newAsync();
-//                                     entry.except(parentGraph, t);
-//                                     try {
-//                                             procedure.exception(parentGraph, t);
-//                                     } catch (Throwable t2) {
-//                                             t2.printStackTrace();
-//                                     }
-////                                   parentBarrier.dec(query);
-//                             }
-//
-//                             @Override
-//                             public String toString() {
-//                                     return procedure.toString();
-//                             }
-//
-//                     });
-//
-//             } catch (Throwable t) {
-//
-//                     entry.except(t);
-//                     try {
-//                             procedure.exception(parentGraph, t);
-//                     } catch (Throwable t2) {
-//                             t2.printStackTrace();
-//                     }
-////                   parentBarrier.dec(query);
-//
-//             }
-//             
-//             return null;
-//             
-//     }
-
-//     public <T> Object performQuery(ReadGraphImpl parentGraph, final Read<T> query, final CacheEntryBase entry_, AsyncProcedure procedure_) throws DatabaseException {
-//
-//             ReadGraphImpl queryGraph = parentGraph.withParent(entry_);
-//
-//             ReadEntry entry = (ReadEntry)entry_;
-//
-//             try {
-//
-//                     T result = (T)query.perform(queryGraph);
-//                     entry.addOrSet(queryGraph, result);
-//
-//                     return (T)entry.get(parentGraph, procedure_);
-//
-//             }  catch (Throwable t) {
-//
-//                     entry.except(t);
-//                     return (T)entry.get(parentGraph, procedure_);
-//
-//             }
-//             
-//     }
-
-
        public <T> Object performQuery(ReadGraphImpl parentGraph, final AsyncMultiRead<T> query, final CacheEntryBase entry_, Object procedure_) throws DatabaseException {
 
                ReadGraphImpl queryGraph = parentGraph.withParent(entry_);
@@ -315,82 +230,6 @@ public class QueryCacheBase {
                
        }
        
-       public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
-
-               if (parent != null && !inferred) {
-                       try {
-                               if(!child.isImmutable(graph)) {
-                                       synchronized(child) {
-                                               child.addParent(parent);
-                                       }
-                               }
-                       } catch (DatabaseException e) {
-                               Logger.defaultLogError(e);
-                       }
-                       if (Development.DEVELOPMENT) {
-                               if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
-                                       System.err.println(child + " -> " + parent);
-                               }
-                       }
-               }
-
-               if (listener != null) {
-                       return registerListener(child, listener, procedure);
-               } else {
-                       return null;
-               }
-
-       }
-       
-       public synchronized ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
-
-               assert (entry != null);
-
-               if (base.isDisposed())
-                       return null;
-
-               return addListener(entry, base, procedure);
-
-       }
-
-       protected void primeListenerEntry(final ListenerEntry entry, final Object result) {
-               entry.setLastKnown(result);
-       }
-
-       private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
-
-               assert (entry != null);
-               assert (procedure != null);
-
-               ArrayList<ListenerEntry> list = listeners.get(entry);
-               if (list == null) {
-                       list = new ArrayList<ListenerEntry>(1);
-                       listeners.put(entry, list);
-               }
-
-               ListenerEntry result = new ListenerEntry(entry, base, procedure);
-               int currentIndex = list.indexOf(result);
-               // There was already a listener
-               if(currentIndex > -1) {
-                       ListenerEntry current = list.get(currentIndex);
-                       if(!current.base.isDisposed()) return null;
-                       list.set(currentIndex, result);
-               } else {
-                       list.add(result);
-               }
-
-               if (Development.DEVELOPMENT) {
-                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                               new Exception().printStackTrace();
-                               System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
-                       }
-               }
-
-               return result;
-
-       }
-       
-       
        public Collection<CacheEntry> getRootList() {
 
                ArrayList<CacheEntry> result = new ArrayList<CacheEntry>();
index 965f1e39f41fb1dc4d99000d533c03dde6c21da3..7bab1a77a9f6dbc60280e012a623566ddcb104e6 100644 (file)
@@ -160,7 +160,7 @@ class QueryCollectorImpl implements QueryProcessor.QueryCollector {
                                                        }
                                                }
 
-                                               if(queryProcessor.hasListenerAfterDisposing(entry)) {
+                                               if(queryProcessor.listening.hasListenerAfterDisposing(entry)) {
                                                        if((status & CacheEntry.HAS_BEEN_BOUND) == 0) {
                                                                queryProcessor.boundQueries++;
                                                                entry.setGCStatusFlag(CacheEntry.HAS_BEEN_BOUND, true);
@@ -176,7 +176,7 @@ class QueryCollectorImpl implements QueryProcessor.QueryCollector {
        }
 
        private boolean tryCollect(CacheEntry entry) {
-               if (!queryProcessor.hasListenerAfterDisposing(entry))
+               if (!queryProcessor.listening.hasListenerAfterDisposing(entry))
                        if(entry.shouldBeCollected()) {
                                queryProcessor.removeQuery(entry);
                                support.remove();
index 0efa49f56b7cfa3a03617669cf5ad892ef002d18..b0e6330700e2fd587d9aa873a0446671ddd1f77d 100644 (file)
@@ -47,7 +47,7 @@ class QueryCollectorImpl2 implements QueryProcessor.QueryCollector {
                        return false;
                }
 
-               if (this.queryProcessor.hasListenerAfterDisposing(entry)) {
+               if (this.queryProcessor.listening.hasListenerAfterDisposing(entry)) {
                        if (Development.DEVELOPMENT) {
                                if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYCOLLECTOR, Bindings.BOOLEAN)) {
                                        System.err.println("GC: listened entry " + entry + " was not collected. Entry=" + entry);
diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
new file mode 100644 (file)
index 0000000..e524a08
--- /dev/null
@@ -0,0 +1,404 @@
+package org.simantics.db.impl.query;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.debug.ListenerReport;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.graph.WriteGraphImpl;
+import org.simantics.db.procedure.ListenerBase;
+import org.simantics.utils.Development;
+import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.datastructures.collections.CollectionUtils;
+import org.slf4j.LoggerFactory;
+
+import gnu.trove.map.hash.THashMap;
+import gnu.trove.set.hash.THashSet;
+
+public class QueryListening {
+
+    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
+
+       final private QueryProcessor                            processor;
+       private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();
+       private boolean                                         firingListeners       = false;
+       final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
+       private BlockingQueue<Runnable>                         tasks = new LinkedBlockingQueue<Runnable>();
+       private Map<ListenerBase,ListenerEntry>                 addedEntries = new HashMap<>();
+       
+       QueryListening(QueryProcessor processor) {
+               this.processor = processor;
+               new DependencyManagementThread(processor, tasks).start();
+       }
+       
+       public void sync() {
+               Semaphore s = new Semaphore(0);
+               tasks.add(() -> {
+                       s.release();
+               });
+               try {
+                       s.acquire();
+               } catch (Throwable t) {
+                       LOGGER.error("Error while waiting for query dependency management", t);
+               }
+       }
+       
+       static class DependencyManagementThread extends Thread {
+               
+               final private QueryProcessor processor;
+               final BlockingQueue<Runnable> tasks;
+               
+               DependencyManagementThread(QueryProcessor processor, BlockingQueue<Runnable> tasks) {
+                       setName("Query Dependency Manager");
+                       this.processor = processor;
+                       this.tasks = tasks;
+               }
+               
+               @Override
+               public void run() {
+                       while(processor.isAlive()) {
+                               try {
+                                       Runnable r = tasks.take();
+                                       r.run();
+                               } catch (Throwable t) {
+                                       // Spurious problems?
+                                       LOGGER.error("Error while waiting for query dependency management tasks", t);
+                               }
+                       }
+               }
+               
+       }
+       
+       public boolean hasScheduledUpdates() {
+               return !scheduledListeners.isEmpty();
+       }
+
+       void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
+               
+               tasks.offer(() -> {
+
+                       if (parent != null && !inferred) {
+                               try {
+                                       if(!child.isImmutable(graph))
+                                               child.addParent(parent);
+                               } catch (DatabaseException e) {
+                                       LOGGER.error("Error while registering query dependencies", e);
+                               }
+                               if (Development.DEVELOPMENT) {
+                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+                                               System.out.println(child + " -> " + parent);
+                                       }
+                               }
+                       }
+
+                       if (listener != null)
+                               registerListener(child, listener, procedure);
+
+               });
+               
+       }
+
+       void registerFirstKnown(ListenerBase base, Object result) {
+               
+               tasks.offer(() -> {
+
+                       ListenerEntry entry = addedEntries.get(base);
+                       if(entry != null) entry.setLastKnown(result);
+                       
+               });
+               
+       }
+
+       public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
+
+               assert (entry != null);
+
+               if (base.isDisposed())
+                       return null;
+
+               return addListener(entry, base, procedure);
+
+       }
+
+       /*
+        * Registers a listener and returns an entry iff the entry was added
+        */
+       private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
+
+               assert (entry != null);
+               assert (procedure != null);
+
+               ArrayList<ListenerEntry> list = listeners.get(entry);
+               if (list == null) {
+                       list = new ArrayList<>(1);
+                       listeners.put(entry, list);
+               }
+
+               ListenerEntry result = new ListenerEntry(entry, base, procedure);
+               // Equals is here based on base
+               int currentIndex = list.indexOf(result);
+               // There was already a listener
+               if(currentIndex > -1) {
+                       ListenerEntry current = list.get(currentIndex);
+                       if(!current.base.isDisposed()) return null;
+                       list.set(currentIndex, result);
+               } else {
+                       list.add(result);
+               }
+
+               if (Development.DEVELOPMENT) {
+                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                               new Exception().printStackTrace();
+                               System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
+                       }
+               }
+               
+               addedEntries.put(base, result);
+               
+               return result;
+
+       }
+
+       void scheduleListener(ListenerEntry entry) {
+               assert (entry != null);
+               if (Development.DEVELOPMENT) {
+                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                               System.err.println("Scheduled " + entry.procedure);
+                       }
+               }
+               scheduledListeners.add(entry);
+       }
+
+       private void removeListener(ListenerEntry entry) {
+               assert (entry != null);
+               ArrayList<ListenerEntry> list = listeners.get(entry.entry);
+               if(list == null) return;
+               boolean success = list.remove(entry);
+               assert (success);
+               if (list.isEmpty())
+                       listeners.remove(entry.entry);
+       }
+
+       boolean hasListener(CacheEntry entry) {
+               if(listeners.get(entry) != null) return true;
+               return false;
+       }
+
+       boolean hasListenerAfterDisposing(CacheEntry entry) {
+               if(listeners.get(entry) != null) {
+                       ArrayList<ListenerEntry> entries = listeners.get(entry);
+                       ArrayList<ListenerEntry> list = null;
+                       for (ListenerEntry e : entries) {
+                               if (e.base.isDisposed()) {
+                                       if(list == null) list = new ArrayList<ListenerEntry>();
+                                       list.add(e);
+                               }
+                       }
+                       if(list != null) {
+                               for (ListenerEntry e : list) {
+                                       entries.remove(e);
+                               }
+                       }
+                       if (entries.isEmpty()) {
+                               listeners.remove(entry);
+                               return false;
+                       }
+                       return true;
+               }
+               return false;
+       }
+
+       List<ListenerEntry> getListenerEntries(CacheEntry entry) {
+               hasListenerAfterDisposing(entry);
+               if(listeners.get(entry) != null)
+                       return listeners.get(entry);
+               else 
+                       return Collections.emptyList();
+       }
+
+       void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
+
+               if(!workarea.containsKey(entry)) {
+
+                       HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
+                       for(ListenerEntry e : getListenerEntries(entry))
+                               ls.add(e.base);
+
+                       workarea.put(entry, ls);
+
+                       for(CacheEntry parent : entry.getParents(processor)) {
+                               processListenerReport(parent, workarea);
+                               ls.addAll(workarea.get(parent));
+                       }
+
+               }
+
+       }
+
+       public synchronized ListenerReport getListenerReport() throws IOException {
+
+               class ListenerReportImpl implements ListenerReport {
+
+                       Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
+
+                       @Override
+                       public void print(PrintStream b) {
+                               Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
+                               for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
+                                       for(ListenerBase l : e.getValue()) {
+                                               Integer i = hist.get(l);
+                                               hist.put(l, i != null ? i-1 : -1);
+                                       }
+                               }
+
+                               for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
+                                       b.print("" + -p.second + " " + p.first + "\n");
+                               }
+
+                               b.flush();
+                       }
+
+               }
+
+               ListenerReportImpl result = new ListenerReportImpl();
+
+               Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
+               for(CacheEntryBase entry : all) {
+                       hasListenerAfterDisposing(entry);
+               }
+               for(CacheEntryBase entry : all) {
+                       processListenerReport(entry, result.workarea);
+               }
+
+               return result;
+
+       }
+
+       public synchronized String reportListeners(File file) throws IOException {
+
+               if (!processor.isAlive())
+                       return "Disposed!";
+
+               PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
+               ListenerReport report = getListenerReport();
+               report.print(b);
+
+               return "Done reporting listeners.";
+
+       }
+
+       public void fireListeners(WriteGraphImpl graph) {
+
+               assert (!processor.updating);
+               assert (!processor.cache.collecting);
+               assert (!firingListeners);
+
+               firingListeners = true;
+
+               try {
+
+                       // Performing may cause further events to be scheduled.
+                       while (!scheduledListeners.isEmpty()) {
+
+                               // Clone current events to make new entries possible during
+                               // firing.
+                               THashSet<ListenerEntry> entries = scheduledListeners;
+                               scheduledListeners = new THashSet<ListenerEntry>();
+
+                               ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
+
+                               for (ListenerEntry listenerEntry : entries) {
+
+                                       if (pruneListener(listenerEntry)) {
+                                               if (Development.DEVELOPMENT) {
+                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                                                               new Exception().printStackTrace();
+                                                               System.err.println("Pruned " + listenerEntry.procedure);
+                                                       }
+                                               }
+                                               continue;
+                                       }
+
+                                       final CacheEntry entry = listenerEntry.entry;
+                                       assert (entry != null);
+
+                                       Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
+
+                                       if (newValue != ListenerEntry.NOT_CHANGED) {
+                                               if (Development.DEVELOPMENT) {
+                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                                                               new Exception().printStackTrace();
+                                                               System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
+                                                       }
+                                               }
+                                               schedule.add(listenerEntry);
+                                               listenerEntry.setLastKnown(entry.getResult());
+                                       }
+
+                               }
+
+                               for(ListenerEntry listenerEntry : schedule) {
+                                       final CacheEntry entry = listenerEntry.entry;
+                                       if (Development.DEVELOPMENT) {
+                                               if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                                                       System.err.println("Firing " + listenerEntry.procedure);
+                                               }
+                                       }
+                                       try {
+                                               if (Development.DEVELOPMENT) {
+                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                                                               System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
+                                                       }
+                                               }
+                                               entry.performFromCache(graph, listenerEntry.procedure);
+                                       } catch (Throwable t) {
+                                               t.printStackTrace();
+                                       }
+                               }
+
+                       }
+
+               } finally {
+                       firingListeners = false;
+               }
+
+       }
+       
+       void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
+
+               Iterable<CacheEntry> oldParents = entry.getParents(processor);
+               for (CacheEntry parent : oldParents) {
+                       if(!parent.isDiscarded())
+                               todo.push(new UpdateEntry(entry, parent, indent + 2));
+               }
+
+       }
+
+       private boolean pruneListener(ListenerEntry entry) {
+               if (entry.base.isDisposed()) {
+                       removeListener(entry);
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+       
+}
index 9b54d15f6996fbf708a5fce9fc5ea00fd467e0ba..7bfa0f4c6195659c21efdb0e6bcf99fed0dfb7bb 100644 (file)
@@ -44,7 +44,6 @@ import org.simantics.db.Statement;
 import org.simantics.db.VirtualGraph;
 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
 import org.simantics.db.common.utils.Logger;
-import org.simantics.db.debug.ListenerReport;
 import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
 import org.simantics.db.exception.NoInverseException;
@@ -53,7 +52,6 @@ import org.simantics.db.impl.ResourceImpl;
 import org.simantics.db.impl.graph.BarrierTracing;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.impl.graph.ReadGraphSupport;
-import org.simantics.db.impl.graph.WriteGraphImpl;
 import org.simantics.db.impl.procedure.IntProcedureAdapter;
 import org.simantics.db.impl.procedure.InternalProcedure;
 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
@@ -130,17 +128,15 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        AtomicInteger                                       sleepers = new AtomicInteger(0);
 
-       private boolean                                         updating              = false;
+       boolean                                         updating              = false;
 
 
-       private boolean                                         firingListeners       = false;
-
        final public QueryCache                                 cache;
        final public QuerySupport                               querySupport;
        final public Session                                    session;
        final public ResourceSupport                            resourceSupport;
-
-       private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();
+       
+       final public QueryListening                            listening = new QueryListening(this);
 
        QueryThread[]                                   executors;
 
@@ -504,7 +500,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        }
 
        final public void releaseWrite(ReadGraphImpl graph) {
-               performDirtyUpdates(graph);
+               propagateChangesInQueryCache(graph);
                modificationCounter++;
        }
 
@@ -699,35 +695,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        boolean isBound(ExternalReadEntry<?> entry) {
                if(entry.hasParents()) return true;
-               else if(hasListener(entry)) return true;
+               else if(listening.hasListener(entry)) return true;
                else return false;
        }
 
-       synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
-
-               if (parent != null && !inferred) {
-                       try {
-                               if(!child.isImmutable(graph))
-                                       child.addParent(parent);
-                       } catch (DatabaseException e) {
-                               Logger.defaultLogError(e);
-                       }
-                       if (Development.DEVELOPMENT) {
-                               if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
-                                       System.out.println(child + " -> " + parent);
-                               }
-                       }
-               }
-
-               if (listener != null) {
-                       return registerListener(child, listener, procedure);
-               } else {
-                       return null;
-               }
-
-       }
-
-       
        static class Dummy implements InternalProcedure<Object>, IntProcedure {
 
                @Override
@@ -929,181 +900,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
-       public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
-
-               assert (entry != null);
-
-               if (base.isDisposed())
-                       return null;
-
-               return addListener(entry, base, procedure);
-
-       }
-
-       private void primeListenerEntry(final ListenerEntry entry, final Object result) {
-               entry.setLastKnown(result);
-       }
-
-       private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
-
-               assert (entry != null);
-               assert (procedure != null);
-
-               ArrayList<ListenerEntry> list = cache.listeners.get(entry);
-               if (list == null) {
-                       list = new ArrayList<ListenerEntry>(1);
-                       cache.listeners.put(entry, list);
-               }
-
-               ListenerEntry result = new ListenerEntry(entry, base, procedure);
-               int currentIndex = list.indexOf(result);
-               // There was already a listener
-               if(currentIndex > -1) {
-                       ListenerEntry current = list.get(currentIndex);
-                       if(!current.base.isDisposed()) return null;
-                       list.set(currentIndex, result);
-               } else {
-                       list.add(result);
-               }
-
-               if (Development.DEVELOPMENT) {
-                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                               new Exception().printStackTrace();
-                               System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
-                       }
-               }
-
-               return result;
-
-       }
-
-       private void scheduleListener(ListenerEntry entry) {
-               assert (entry != null);
-               if (Development.DEVELOPMENT) {
-                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                               System.err.println("Scheduled " + entry.procedure);
-                       }
-               }
-               scheduledListeners.add(entry);
-       }
-
-       private void removeListener(ListenerEntry entry) {
-               assert (entry != null);
-               ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
-               if(list == null) return;
-               boolean success = list.remove(entry);
-               assert (success);
-               if (list.isEmpty())
-                       cache.listeners.remove(entry.entry);
-       }
-
-       private boolean hasListener(CacheEntry entry) {
-               if(cache.listeners.get(entry) != null) return true;
-               return false;
-       }
-
-       boolean hasListenerAfterDisposing(CacheEntry entry) {
-               if(cache.listeners.get(entry) != null) {
-                       ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
-                       ArrayList<ListenerEntry> list = null;
-                       for (ListenerEntry e : entries) {
-                               if (e.base.isDisposed()) {
-                                       if(list == null) list = new ArrayList<ListenerEntry>();
-                                       list.add(e);
-                               }
-                       }
-                       if(list != null) {
-                               for (ListenerEntry e : list) {
-                                       entries.remove(e);
-                               }
-                       }
-                       if (entries.isEmpty()) {
-                               cache.listeners.remove(entry);
-                               return false;
-                       }
-                       return true;
-               }
-               return false;
-       }
-
-       List<ListenerEntry> getListenerEntries(CacheEntry entry) {
-               hasListenerAfterDisposing(entry);
-               if(cache.listeners.get(entry) != null)
-                       return cache.listeners.get(entry);
-               else 
-                       return Collections.emptyList();
-       }
-
-       void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
-
-               if(!workarea.containsKey(entry)) {
-
-                       HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
-                       for(ListenerEntry e : getListenerEntries(entry))
-                               ls.add(e.base);
-
-                       workarea.put(entry, ls);
-
-                       for(CacheEntry parent : entry.getParents(this)) {
-                               processListenerReport(parent, workarea);
-                               ls.addAll(workarea.get(parent));
-                       }
-
-               }
-
-       }
-
-       public synchronized ListenerReport getListenerReport() throws IOException {
-
-               class ListenerReportImpl implements ListenerReport {
-
-                       Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
-
-                       @Override
-                       public void print(PrintStream b) {
-                               Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
-                               for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
-                                       for(ListenerBase l : e.getValue()) {
-                                               Integer i = hist.get(l);
-                                               hist.put(l, i != null ? i-1 : -1);
-                                       }
-                               }
-
-                               for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
-                                       b.print("" + -p.second + " " + p.first + "\n");
-                               }
-
-                               b.flush();
-                       }
-
-               }
-
-               ListenerReportImpl result = new ListenerReportImpl();
-
-               Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
-               for(CacheEntryBase entry : all) {
-                       hasListenerAfterDisposing(entry);
-               }
-               for(CacheEntryBase entry : all) {
-                       processListenerReport(entry, result.workarea);
-               }
-
-               return result;
-
-       }
-
-       public synchronized String reportListeners(File file) throws IOException {
-
-               if (!isAlive())
-                       return "Disposed!";
-
-               PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
-               ListenerReport report = getListenerReport();
-               report.print(b);
-
-               return "Done reporting listeners.";
-
-       }
 
        void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
 
@@ -1176,7 +972,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                int listeners = 0;
 
                for(CacheEntry entry : workarea.keySet()) {
-                       boolean listener = hasListenerAfterDisposing(entry);
+                       boolean listener = listening.hasListenerAfterDisposing(entry);
                        boolean hasParents = entry.getParents(this).iterator().hasNext();
                        if(listener) {
                                // Bound
@@ -1340,7 +1136,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                b.print("// Entry parent listing\n");
                for(CacheEntry entry : workarea.keySet()) {
                        int status = flagMap.get(entry);
-                       boolean hasListener = hasListenerAfterDisposing(entry);
+                       boolean hasListener = listening.hasListenerAfterDisposing(entry);
                        b.print("Q " + entry.toString());
                        if(hasListener) {
                                b.print(" (L" + status + ")");
@@ -1386,22 +1182,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
-       class UpdateEntry {
-
-               public CacheEntry caller;
-
-               public CacheEntry entry;
-
-               public int         indent;
-
-               public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
-                       this.caller = caller;
-                       this.entry = entry;
-                       this.indent = indent;
-               }
-
-       };
-
        boolean removeQuery(CacheEntry entry) {
 
                // This entry has been removed before. No need to do anything here.
@@ -1494,11 +1274,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                Query query = entry.getQuery();
                int type = query.type();
 
-               boolean hasListener = hasListener(entry); 
+               boolean hasListener = listening.hasListener(entry); 
 
                if (Development.DEVELOPMENT) {
                        if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
-                               if(hasListener(entry)) {
+                               if(listening.hasListener(entry)) {
                                        System.err.println(" (L)");
                                } else {
                                        System.err.println("");
@@ -1545,43 +1325,23 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 //             System.err.println(" => FOO " + type);
 
                if (hasListener) {
-                       ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
+                       ArrayList<ListenerEntry> entries = listening.listeners.get(entry);
                        if(entries != null) {
                                for (ListenerEntry le : entries) {
-                                       scheduleListener(le);
+                                       listening.scheduleListener(le);
                                }
                        }
                }
 
                // If invalid, update parents
                if (type == RequestFlags.INVALIDATE) {
-                       updateParents(e.indent, entry, todo);
+                       listening.updateParents(e.indent, entry, todo);
                }
 
                return hasListener;
 
        }
 
-       private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
-
-               Iterable<CacheEntry> oldParents = entry.getParents(this);
-               for (CacheEntry parent : oldParents) {
-//                     System.err.println("updateParents " + entry + " => " + parent);
-                       if(!parent.isDiscarded())
-                               todo.push(new UpdateEntry(entry, parent, indent + 2));
-               }
-
-       }
-
-       private boolean pruneListener(ListenerEntry entry) {
-               if (entry.base.isDisposed()) {
-                       removeListener(entry);
-                       return true;
-               } else {
-                       return false;
-               }
-       }
-
        /**
         * @param av1 an array (guaranteed)
         * @param av2 any object
@@ -1617,7 +1377,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
 
 
-       final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
+       final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
 
                try {
 
@@ -1681,93 +1441,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
-       public boolean hasScheduledUpdates() {
-               return !scheduledListeners.isEmpty();
-       }
-
-       public void performScheduledUpdates(WriteGraphImpl graph) {
-
-               assert (!updating);
-               assert (!cache.collecting);
-               assert (!firingListeners);
-
-               firingListeners = true;
-
-               try {
-
-                       // Performing may cause further events to be scheduled.
-                       while (!scheduledListeners.isEmpty()) {
-
-//                             graph.restart();
-//                             graph.state.barrier.inc();
-
-                               // Clone current events to make new entries possible during
-                               // firing.
-                               THashSet<ListenerEntry> entries = scheduledListeners;
-                               scheduledListeners = new THashSet<ListenerEntry>();
-
-                               ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
-
-                               for (ListenerEntry listenerEntry : entries) {
-
-                                       if (pruneListener(listenerEntry)) {
-                                               if (Development.DEVELOPMENT) {
-                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                                                               new Exception().printStackTrace();
-                                                               System.err.println("Pruned " + listenerEntry.procedure);
-                                                       }
-                                               }
-                                               continue;
-                                       }
-
-                                       final CacheEntry entry = listenerEntry.entry;
-                                       assert (entry != null);
-
-                                       Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
-
-                                       if (newValue != ListenerEntry.NOT_CHANGED) {
-                                               if (Development.DEVELOPMENT) {
-                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                                                               new Exception().printStackTrace();
-                                                               System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
-                                                       }
-                                               }
-                                               schedule.add(listenerEntry);
-                                               listenerEntry.setLastKnown(entry.getResult());
-                                       }
-
-                               }
-
-                               for(ListenerEntry listenerEntry : schedule) {
-                                       final CacheEntry entry = listenerEntry.entry;
-                                       if (Development.DEVELOPMENT) {
-                                               if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                                                       System.err.println("Firing " + listenerEntry.procedure);
-                                               }
-                                       }
-                                       try {
-                                               if (Development.DEVELOPMENT) {
-                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                                                               System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
-                                                       }
-                                               }
-                                               entry.performFromCache(graph, listenerEntry.procedure);
-                                       } catch (Throwable t) {
-                                               t.printStackTrace();
-                                       }
-                               }
-
-//                             graph.state.barrier.dec();
-//                             graph.waitAsync(null);
-//                             graph.state.barrier.assertReady();
-
-                       }
-
-               } finally {
-                       firingListeners = false;
-               }
-
-       }
 
        /**
         * 
@@ -1810,7 +1483,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                                                Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
                                                if (newValue != ListenerEntry.NOT_CHANGED)
-                                                       updateParents(0, immediate, todo);
+                                                       listening.updateParents(0, immediate, todo);
 
                                        } else {
 
@@ -1818,7 +1491,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                                                Object newValue = compareTo(graph, immediate, oldValue);
 
                                                if (newValue != ListenerEntry.NOT_CHANGED) {
-                                                       updateParents(0, immediate, todo);
+                                                       listening.updateParents(0, immediate, todo);
                                                } else {
                                                        // If not changed, keep the old value
                                                        immediate.setResult(oldValue);
@@ -1867,7 +1540,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                
        }
        
-       public void performDirtyUpdates(final ReadGraphImpl graph) {
+       public void propagateChangesInQueryCache(final ReadGraphImpl graph) {
+               
+               // Make sure that listening has performed its work
+               listening.sync();
 
                cache.dirty = false;
                lastInvalidate = 0;
index 26eff457dc99f29e741b53c73be2db895d4883a6..fd6cd7d879374bb8af81c18a40058137b46aed2e 100644 (file)
@@ -65,7 +65,7 @@ public abstract class UnaryQuery<Procedure> extends CacheEntryBase<Procedure> im
     
     @Override
     boolean isImmutable(ReadGraphImpl graph) throws DatabaseException {
-               return graph.processor.isImmutable(id);
+        return graph.processor.isImmutable(id);
     }
     
 }
diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UpdateEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/UpdateEntry.java
new file mode 100644 (file)
index 0000000..8899425
--- /dev/null
@@ -0,0 +1,17 @@
+package org.simantics.db.impl.query;
+
+class UpdateEntry {
+
+       public CacheEntry caller;
+
+       public CacheEntry entry;
+
+       public int         indent;
+
+       public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
+               this.caller = caller;
+               this.entry = entry;
+               this.indent = indent;
+       }
+
+}
\ No newline at end of file
index b49d270aa4420ded8800c505eca73648efd52cc1..2e664d53994e8e6c3b1869416dfac68a246c4897 100644 (file)
@@ -40,10 +40,10 @@ public class ResourceURIToVariable extends UnaryRead<String, Variable> implement
         
        }
 
-       @Override
-       public boolean isImmutable(ReadGraph graph) throws DatabaseException {
-               return false;
-       }
+    @Override
+    public boolean isImmutable(ReadGraph graph) {
+        return false;
+    }
 
        @Override
        public int getType() {
index 07df23df8a07a95527e40a4018c966bb4ed6ad1e..200e9e3c14f573478c4c50c2a3809266d77c5602 100644 (file)
@@ -781,9 +781,9 @@ public final class ClusterTable implements IClusterTable {
                 th = new TaskHelper("Refresh");
                 session.writeState = new WriteState<Object>(writer, th.writeTraits, th.sema, th.proc);
                 try {
-                    session.getQueryProvider2().performDirtyUpdates(writer);
+                    session.getQueryProvider2().propagateChangesInQueryCache(writer);
                     session.fireMetadataListeners(writer, cs);
-                    session.getQueryProvider2().performScheduledUpdates(writer);
+                    session.getQueryProvider2().listening.fireListeners(writer);
                     session.fireReactionsToSynchronize(cs);
                     session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
                     session.printDiagnostics();
index f788a12448bfd9b26254bcf489ebbec42c437f31..3b05bcba6d67baabc0ee5afcf48557f522f3b384 100644 (file)
@@ -69,7 +69,7 @@ public class DebugSupportImpl implements DebugSupport {
                        @Override
                        public Object apply(WriteGraph graph, String args) {
                                try {
-                                       return getSession(graph).queryProvider2.getListenerReport();
+                                       return getSession(graph).queryProvider2.listening.getListenerReport();
                                } catch (IOException e) {
                                        Logger.defaultLogError(e);
                                        return e.getMessage();
@@ -125,7 +125,7 @@ public class DebugSupportImpl implements DebugSupport {
                        @Override
                        public String apply(WriteGraph graph, File file, String args) {
                                try {
-                                       return getSession(graph).queryProvider2.reportListeners(file);
+                                       return getSession(graph).queryProvider2.listening.reportListeners(file);
                                } catch (IOException e) {
                                        Logger.defaultLogError(e);
                                        return e.getMessage();
index d32c066bcdfc9bee732cd6ecf3829b5b0eb7a9a1..b98b4dec643383d163cd05746427db59b6c523ea 100644 (file)
@@ -3542,8 +3542,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         while(dirtyPrimitives) {
             dirtyPrimitives = false;
-            getQueryProvider2().performDirtyUpdates(writer);
-            getQueryProvider2().performScheduledUpdates(writer);
+            getQueryProvider2().propagateChangesInQueryCache(writer);
+            getQueryProvider2().listening.fireListeners(writer);
         }
 
         fireMetadataListeners(writer, clientChanges);
index e0d8eee8a3ddb9d304d6b0109b752e66d0ca46f1..4c6e77b1a94729c0274078ef5318ffcf30a51f36 100644 (file)
@@ -194,7 +194,7 @@ class State {
     void stopReadTransaction() throws DatabaseException {
         lock.lock();
         try {
-            assert (!queryProvider.hasScheduledUpdates());
+            assert (!queryProvider.listening.hasScheduledUpdates());
             assert (readCount == 1);
             session.writeSupport.gc();
             transactionToken.stopReadTransaction();
@@ -290,16 +290,16 @@ class State {
                             throw new InternalException("Cancel failed. This should never happen.");
 //                        System.out.println("session cs: " + session.clientChanges);
 //                        System.out.println("reverse cs: " + cs);
-                        queryProvider.performDirtyUpdates(graph);
-                        queryProvider.performScheduledUpdates(graph);
+                        queryProvider.propagateChangesInQueryCache(graph);
+                        queryProvider.listening.fireListeners(graph);
                     } catch (DatabaseException e) {
                         Logger.defaultLogError(e);
                     }
                     // This will send and accept the reverse change set.
                     transactionToken.cancelEnd(session.writeSupport, null, session.clusterStream);
                 } else {
-                    queryProvider.performDirtyUpdates(graph);
-                    queryProvider.performScheduledUpdates(graph);
+                    queryProvider.propagateChangesInQueryCache(graph);
+                    queryProvider.listening.fireListeners(graph);
                 }
             }
             session.writeSupport.clearMetadata();
@@ -339,15 +339,16 @@ class State {
 
 
 //                start = System.nanoTime();
-                queryProvider.performScheduledUpdates(graph);
+                queryProvider.propagateChangesInQueryCache(graph);
+                queryProvider.listening.fireListeners(graph);
 //                duration = System.nanoTime() - start;
 //                System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
 
                 // Process updates as long as pending primitives exist
                 while (session.dirtyPrimitives) {
                     session.dirtyPrimitives = false;
-                    queryProvider.performDirtyUpdates(graph);
-                    queryProvider.performScheduledUpdates(graph);
+                    queryProvider.propagateChangesInQueryCache(graph);
+                    queryProvider.listening.fireListeners(graph);
                 }
 
                 if (!writeOnly) // TODO: fix me! FIX ME! Please fix me! Please!
index f2061cd427295f8488f161e72d7d7ec17bcb2f9b..e492f68337e3ba01ae76dd54cd71a07b7e16027e 100644 (file)
@@ -93,9 +93,9 @@ public class UndoRedoSupportImpl implements UndoRedoSupport {
                         um.setTypeAndRange(false, ope.getId(), ope.getCSId());
                         writer.addMetadata(um);
                     }
-                    session.getQueryProvider2().performDirtyUpdates(writer);
+                    session.getQueryProvider2().propagateChangesInQueryCache(writer);
                     session.fireMetadataListeners(writer, cs);
-                    session.getQueryProvider2().performScheduledUpdates(writer);
+                    session.getQueryProvider2().listening.fireListeners(writer);
                     session.fireReactionsToSynchronize(cs);
                     session.fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
                     session.printDiagnostics();
index aff7e17acf261bee201300c07eb4a656301cc82c..ea4319ace00c464095ab59547d93f6ce2da11ba7 100644 (file)
@@ -261,7 +261,7 @@ public class WriteSupportImpl implements WriteSupport {
 //        graph.state.barrier.dec();
 //        graph.waitAsync(request);
         
-        queryProcessor.performDirtyUpdates(graph);
+        queryProcessor.propagateChangesInQueryCache(graph);
         
         // Do not fire metadata listeners for virtual requests
         if(graph.getProvider() == null) {
@@ -291,7 +291,7 @@ public class WriteSupportImpl implements WriteSupport {
 //        graph.state.barrier.dec();
 //        graph.waitAsync(request);
         
-        queryProcessor.performDirtyUpdates(graph);
+        queryProcessor.propagateChangesInQueryCache(graph);
 
         // Do not fire metadata listeners for virtual requests
         if(graph.getProvider() == null) {
@@ -318,7 +318,7 @@ public class WriteSupportImpl implements WriteSupport {
         
         ReadGraphImpl impl = (ReadGraphImpl)graph;
         
-        queryProcessor.performDirtyUpdates(impl);
+        queryProcessor.propagateChangesInQueryCache(impl);
 
         // Do not fire metadata listeners for virtual requests
         if(graph.getProvider() == null) {