]> gerrit.simantics Code Review - simantics/platform.git/commitdiff
Fixed asynchronous recompute problems 11/1911/1
authorAntti Villberg <antti.villberg@semantum.fi>
Wed, 11 Jul 2018 05:57:39 +0000 (08:57 +0300)
committerAntti Villberg <antti.villberg@semantum.fi>
Wed, 11 Jul 2018 05:57:39 +0000 (08:57 +0300)
gitlab #5

Change-Id: Ia80fec89736dbef1f6bd44e730d6c4186921836d

bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/WriteGraphImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncMultiReadEntry.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java

index 6998ee55e127afa761b7aad2499264bc816588d8..3a735e6c24c07900f92a5006790e100af3bb52b2 100644 (file)
@@ -5401,11 +5401,6 @@ public class ReadGraphImpl implements ReadGraph {
                return processor.getSession();
        }
 
-//     @Override
-//     final public Builtins getBuiltins() {
-//             return processor.getSession().getBuiltins();
-//     }
-
        @Override
        public <T> void asyncRequest(final Read<T> request) {
 
@@ -5471,10 +5466,6 @@ public class ReadGraphImpl implements ReadGraph {
         return new ReadGraphImpl(null, support);
     }
 
-    public static ReadGraphImpl forRecompute(CacheEntry entry, QueryProcessor support) {
-        return new ReadGraphImpl(entry, support);       
-    }
-
        @Override
        public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
                asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
@@ -6015,53 +6006,17 @@ public class ReadGraphImpl implements ReadGraph {
                return thread == Integer.MIN_VALUE;
        }
 
-//     final private boolean isSync(int thread) {
-//             return thread < -1 && thread > Integer.MIN_VALUE;
-//     }
-
        ReadGraphImpl(ReadGraphImpl graph) {
                this(graph.parent, graph.processor);
        }
 
        ReadGraphImpl(CacheEntry parent, QueryProcessor support) {
-//             this.state = new ReadGraphState(barrier, support);
                this.parent = parent;
                this.processor = support;
        }
        
-       ReadGraphImpl(final QueryProcessor support) {
-               
-//             this.state = state;
-               this.processor = support;
-               this.parent = null;
-               
-       }
-
-//     public static ReadGraphImpl createSync(int syncThread, Object syncParent,
-//                     ReadGraphSupportImpl support) {
-//             return new ReadGraphImpl(syncThread, syncThread, syncParent, null,
-//                             support, new AsyncBarrierImpl(null));
-//     }
-
        public static ReadGraphImpl create(QueryProcessor support) {
-               return new ReadGraphImpl(support);
-       }
-
-//     public ReadGraphImpl newAsync() {
-//             return this;
-////           if(!state.synchronizedExecution) {
-////                   return this;
-////           } else {
-////                   return new ReadGraphImpl(false, parent, state.support, state.barrier);
-////           }
-//     }
-
-//     public ReadGraphImpl newSync() {
-//             return new ReadGraphImpl(parent, processor);
-//     }
-
-       public ReadGraphImpl newSync(CacheEntry parentEntry) {
-               return new ReadGraphImpl(parentEntry, processor);
+               return new ReadGraphImpl(null, support);
        }
 
        public ReadGraphImpl newRestart(ReadGraphImpl impl) {
@@ -6069,41 +6024,10 @@ public class ReadGraphImpl implements ReadGraph {
                WriteGraphImpl write = processor.getSession().getService(
                                WriteGraphImpl.class);
 
-//             if (write.callerThread != impl.callerThread)
-//                     return new WriteGraphImpl(impl.callerThread, parent, state.support, write.writeSupport, write.provider, write.state.barrier);
                return write;
 
        }
 
-//     public ReadGraphImpl newSync(Object parentRequest) {
-//             return new ReadGraphImpl(callerThread, state.parent, state.support, new AsyncBarrierImpl(state.barrier));
-//     }
-
-//     public ReadGraphImpl newSync(final int callerThread, Object parentRequest) {
-//             assert (state.syncThread == callerThread || (state.syncThread == Integer.MIN_VALUE && callerThread != Integer.MIN_VALUE));
-//             return new ReadGraphImpl(callerThread, callerThread, parentRequest,
-//                             state.parent, state.support, new AsyncBarrierImpl(state.barrier));
-//     }
-//
-//     public ReadGraphImpl newSyncAsync(Object parentRequest) {
-////           assert (callerThread < 0);
-//             return new ReadGraphImpl(callerThread, state.syncThread, parentRequest,
-//                             state.parent, state.support, new AsyncBarrierImpl(state.barrier));
-//     }
-//
-//     public ReadGraphImpl newSyncAsync(final int callerThread,
-//                     Object parentRequest) {
-////           assert (callerThread < 0);
-//             // assert(state.syncThread == callerThread || (state.syncThread == Integer.MIN_VALUE
-//             // && callerThread != Integer.MIN_VALUE) );
-//             return new ReadGraphImpl(callerThread, callerThread, parentRequest,
-//                             state.parent, state.support, new AsyncBarrierImpl(state.barrier));
-//     }
-
-       public ReadGraphImpl withAsyncParent(CacheEntry parent) {
-               return new ReadGraphImpl(parent, processor);
-       }
-
        public ReadGraphImpl withParent(CacheEntry parent) {
                if(parent == this.parent) return this;
                else return new ReadGraphImpl(parent, processor);
@@ -6120,56 +6044,13 @@ public class ReadGraphImpl implements ReadGraph {
                
                assert(procedure.done());
                
-//             while (!procedure.done()) {
-//
-//                     boolean executed = processor.resumeTasks(callerThread, null, null);
-//                     if (!executed) {
-//                             try {
-//                                     Thread.sleep(1);
-//                                     //                                                              sema.tryAcquire(1, TimeUnit.MILLISECONDS);
-//                             } catch (InterruptedException e) {
-//                                     e.printStackTrace();
-//                             }
-//                     }
-//
-//             }
-               
        }
 
        public <T> void waitAsyncProcedure(AsyncReadProcedure<T> procedure) {
                
                assert(procedure.done());
                
-//             while (!procedure.done()) {
-//
-//                     boolean executed = processor.processor.resume(this);
-//                     if (!executed) {
-//                             try {
-//                                     Thread.sleep(1);
-//                                     //                                                              sema.tryAcquire(1, TimeUnit.MILLISECONDS);
-//                             } catch (InterruptedException e) {
-//                                     e.printStackTrace();
-//                             }
-//                     }
-//
-//             }
-               
        }
-       
-//     public void waitAsync(Object request) {
-//             try {
-//                     state.barrier.waitBarrier(request, this);
-//             } catch (Throwable t) {
-//                     t.printStackTrace();
-//                     processor.scanPending();
-//                     processor.querySupport.checkTasks();
-//                     throw new RuntimeDatabaseException(t);
-//             }
-//     }
-
-//     public void restart() {
-//             state.barrier.restart();
-//     }
 
        public boolean resumeTasks() {
                return processor.resumeTasks(this);
index 8462433f1b49e50a3e0fc4068f5aea067237db0a..410a8fe69623efb2dfe5be462e8c7392be1495b5 100644 (file)
@@ -128,11 +128,6 @@ final public class WriteGraphImpl extends ReadGraphImpl implements WriteGraph {
         return new WriteGraphImpl(parent, processor, writeSupport, provider);
     }
 
-    @Override
-    final public ReadGraphImpl withAsyncParent(CacheEntry parent2) {
-        return new WriteGraphImpl(parent2, processor, writeSupport, provider);
-    }
-
     @Override
     public ReadGraphImpl newRestart(ReadGraphImpl impl) {
 
index 6f718f1fe4d2407cb92ee0cbd445e3dd2e1d3382..1791c50acccb37eb0fb41dbf4c3c149e0fc1c6d7 100644 (file)
@@ -14,6 +14,7 @@ package org.simantics.db.impl.query;
 import java.util.ArrayList;
 
 import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.common.GraphSemaphore;
 import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.procedure.AsyncMultiProcedure;
@@ -88,23 +89,37 @@ final public class AsyncMultiReadEntry<T> extends CacheEntryBase<AsyncMultiProce
                        @Override
                        public void recompute(ReadGraphImpl graph) {
 
-                               request.perform(graph , new AsyncMultiProcedure<T>() {
+                               try {
 
-                                       @Override
-                                       public void execute(AsyncReadGraph graph, T result) {
-                                               addOrSet(result);
-                                       }
+                                       GraphSemaphore s = new GraphSemaphore(graph, 0);
 
-                                       public void finished(AsyncReadGraph graph) {
-                                               finish(graph);
-                                       };
+                                       request.perform(graph , new AsyncMultiProcedure<T>() {
 
-                                       @Override
-                                       public void exception(AsyncReadGraph graph, Throwable t) {
-                                               except(t);
-                                       }
+                                               @Override
+                                               public void execute(AsyncReadGraph graph, T result) {
+                                                       addOrSet(result);
+                                               }
 
-                               });
+                                               public void finished(AsyncReadGraph graph) {
+                                                       finish(graph);
+                                                       s.release();
+                                               };
+
+                                               @Override
+                                               public void exception(AsyncReadGraph graph, Throwable t) {
+                                                       except(t);
+                                                       s.release();
+                                               }
+
+                                       });
+
+                                       s.waitFor(1);
+
+                               } catch (Throwable t) {
+                                       
+                                       except(t);
+                                       
+                               }
                                
                        }
 
index 08f0ad8b5f689e189348b1af64a46e2d2a97c8b7..75169d02e9946ca0a1dea3f1e03a3dba2ce49b97 100644 (file)
@@ -12,6 +12,7 @@
 package org.simantics.db.impl.query;
 
 import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.common.GraphSemaphore;
 import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.DebugPolicy;
 import org.simantics.db.impl.graph.ReadGraphImpl;
@@ -78,20 +79,26 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> {
 
                                try {
 
+                                       GraphSemaphore s = new GraphSemaphore(graph, 0);
+                                       
                                    request.perform(graph , new AsyncProcedure<T>() {
 
                         @Override
                         public void execute(AsyncReadGraph graph, T result) {
                             addOrSet(graph, result);
+                            s.release();
                         }
                                
                                @Override
                                public void exception(AsyncReadGraph graph, Throwable t) {
                                    except(t);
+                            s.release();
                         }
 
                     });
 
+                    s.waitFor(1);
+
                                } catch (Throwable t) {
                                    except(t);
                 }
index cd540eaadbf59a63c3e450999254ead404a9cefe..deecbabbc143a0408a0a55c0beceadd6ffd5868d 100644 (file)
@@ -54,7 +54,7 @@ public class CodeGen {
                        line(content, "        return;");
                        line(content, "    }");
                }
-               line(content, "    " + clazz + " entry = (" + clazz + ")cache.getOrCreate" + clazz + "(graph, " + signature[1] + (genAsync ? ", isSync" : "") + ");");
+               line(content, "    " + clazz + " entry = (" + clazz + ")cache.getOrCreate" + clazz + "(graph.processor, " + signature[1] + (genAsync ? ", isSync" : "") + ");");
                if(genAsync) {
                        line(content, "    if(entry == null) {");
                        line(content, "        graph.processor.schedule(new SessionTask(false) {");
@@ -102,7 +102,7 @@ public class CodeGen {
                
                String lower = Character.toLowerCase(clazz.charAt(0)) + clazz.substring(1);
                
-               line(content, "" + clazz + " getOrCreate" + clazz + "(ReadGraphImpl graph, " + signature[0] + (genAsync ? ", boolean isSync" : "") + ") throws DatabaseException {");
+               line(content, "" + clazz + " getOrCreate" + clazz + "(QueryProcessor processor, " + signature[0] + (genAsync ? ", boolean isSync" : "") + ") throws DatabaseException {");
                line(content, "    " + clazz + " existing = null;");
                line(content, "    synchronized(" + lower + "Map) {");
                line(content, "        existing = (" + clazz + ")" + lower + "Map.get(" + signature[1] + ");");
@@ -121,11 +121,11 @@ public class CodeGen {
                line(content, "    }");
                if(genAsync) {
                        line(content, "    if(existing.isPending()) {");
-                       line(content, "      if(isSync) waitPending(graph, existing);");
+                       line(content, "      if(isSync) waitPending(processor, existing);");
                        line(content, "      else return null;");
                        line(content, "    }");
                } else {
-                       line(content, "    if(existing.isPending()) waitPending(graph, existing);");
+                       line(content, "    if(existing.isPending()) waitPending(processor, existing);");
                }
                line(content, "    return existing;");
                line(content, "}");
index 5a6f5d0ba67bb120e9bfe4f4cb08e06d80e8b50c..38ed21026f9ff2bbcbc70d7936f0aca30e7fc497 100644 (file)
@@ -24,7 +24,7 @@ public class QueryCache extends QueryCacheBase {
         super(querySupport, threads);
     }
 
-    Objects getOrCreateObjects(ReadGraphImpl graph, int r1, int r2) throws DatabaseException {
+    Objects getOrCreateObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException {
         Objects existing = null;
         synchronized(objectsMap) {
             existing = (Objects)objectsMap.get(r1,r2);
@@ -41,7 +41,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -57,7 +57,7 @@ public class QueryCache extends QueryCacheBase {
             Objects.computeForEach(graph, r1,r2, null, procedure);
             return;
         }
-        Objects entry = (Objects)cache.getOrCreateObjects(graph, r1,r2);
+        Objects entry = (Objects)cache.getOrCreateObjects(graph.processor, r1,r2);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -68,7 +68,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    Statements getOrCreateStatements(ReadGraphImpl graph, int r1, int r2) throws DatabaseException {
+    Statements getOrCreateStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException {
         Statements existing = null;
         synchronized(statementsMap) {
             existing = (Statements)statementsMap.get(r1,r2);
@@ -85,7 +85,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -101,7 +101,7 @@ public class QueryCache extends QueryCacheBase {
             Statements.computeForEach(graph, r1,r2, null, procedure);
             return;
         }
-        Statements entry = (Statements)cache.getOrCreateStatements(graph, r1,r2);
+        Statements entry = (Statements)cache.getOrCreateStatements(graph.processor, r1,r2);
         TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -112,7 +112,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    DirectObjects getOrCreateDirectObjects(ReadGraphImpl graph, int r1, int r2) throws DatabaseException {
+    DirectObjects getOrCreateDirectObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException {
         DirectObjects existing = null;
         synchronized(directObjectsMap) {
             existing = (DirectObjects)directObjectsMap.get(r1,r2);
@@ -129,7 +129,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -145,7 +145,7 @@ public class QueryCache extends QueryCacheBase {
             DirectObjects.computeForEach(graph, r1,r2, null, procedure);
             return;
         }
-        DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph, r1,r2);
+        DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph.processor, r1,r2);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -156,7 +156,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    RelationInfoQuery getOrCreateRelationInfoQuery(ReadGraphImpl graph, int r) throws DatabaseException {
+    RelationInfoQuery getOrCreateRelationInfoQuery(QueryProcessor processor, int r) throws DatabaseException {
         RelationInfoQuery existing = null;
         synchronized(relationInfoQueryMap) {
             existing = (RelationInfoQuery)relationInfoQueryMap.get(r);
@@ -173,7 +173,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -189,7 +189,7 @@ public class QueryCache extends QueryCacheBase {
             RelationInfoQuery.computeForEach(graph, r, null, procedure);
             return;
         }
-        RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph, r);
+        RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph.processor, r);
         InternalProcedure<RelationInfo> procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -200,7 +200,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    URIToResource getOrCreateURIToResource(ReadGraphImpl graph, String id) throws DatabaseException {
+    URIToResource getOrCreateURIToResource(QueryProcessor processor, String id) throws DatabaseException {
         URIToResource existing = null;
         synchronized(uRIToResourceMap) {
             existing = (URIToResource)uRIToResourceMap.get(id);
@@ -217,7 +217,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -233,7 +233,7 @@ public class QueryCache extends QueryCacheBase {
             URIToResource.computeForEach(graph, id, null, procedure);
             return;
         }
-        URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph, id);
+        URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph.processor, id);
         InternalProcedure<Integer> procedure_ = procedure != null ? procedure : emptyProcedureURIToResource;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -244,7 +244,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    ValueQuery getOrCreateValueQuery(ReadGraphImpl graph, int r) throws DatabaseException {
+    ValueQuery getOrCreateValueQuery(QueryProcessor processor, int r) throws DatabaseException {
         ValueQuery existing = null;
         synchronized(valueQueryMap) {
             existing = (ValueQuery)valueQueryMap.get(r);
@@ -261,7 +261,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -277,7 +277,7 @@ public class QueryCache extends QueryCacheBase {
             ValueQuery.computeForEach(graph, r, null, procedure);
             return;
         }
-        ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph, r);
+        ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph.processor, r);
         InternalProcedure<byte[]> procedure_ = procedure != null ? procedure : emptyProcedureValueQuery;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -288,7 +288,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    OrderedSet getOrCreateOrderedSet(ReadGraphImpl graph, int r) throws DatabaseException {
+    OrderedSet getOrCreateOrderedSet(QueryProcessor processor, int r) throws DatabaseException {
         OrderedSet existing = null;
         synchronized(orderedSetMap) {
             existing = (OrderedSet)orderedSetMap.get(r);
@@ -305,7 +305,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -321,7 +321,7 @@ public class QueryCache extends QueryCacheBase {
             OrderedSet.computeForEach(graph, r, null, procedure);
             return;
         }
-        OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph, r);
+        OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph.processor, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -332,7 +332,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    PrincipalTypes getOrCreatePrincipalTypes(ReadGraphImpl graph, int r) throws DatabaseException {
+    PrincipalTypes getOrCreatePrincipalTypes(QueryProcessor processor, int r) throws DatabaseException {
         PrincipalTypes existing = null;
         synchronized(principalTypesMap) {
             existing = (PrincipalTypes)principalTypesMap.get(r);
@@ -349,7 +349,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -365,7 +365,7 @@ public class QueryCache extends QueryCacheBase {
             PrincipalTypes.computeForEach(graph, r, null, procedure);
             return;
         }
-        PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph, r);
+        PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph.processor, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -376,7 +376,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    DirectPredicates getOrCreateDirectPredicates(ReadGraphImpl graph, int r) throws DatabaseException {
+    DirectPredicates getOrCreateDirectPredicates(QueryProcessor processor, int r) throws DatabaseException {
         DirectPredicates existing = null;
         synchronized(directPredicatesMap) {
             existing = (DirectPredicates)directPredicatesMap.get(r);
@@ -393,7 +393,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -409,7 +409,7 @@ public class QueryCache extends QueryCacheBase {
             DirectPredicates.computeForEach(graph, r, null, procedure);
             return;
         }
-        DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph, r);
+        DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph.processor, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -420,7 +420,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    Predicates getOrCreatePredicates(ReadGraphImpl graph, int r) throws DatabaseException {
+    Predicates getOrCreatePredicates(QueryProcessor processor, int r) throws DatabaseException {
         Predicates existing = null;
         synchronized(predicatesMap) {
             existing = (Predicates)predicatesMap.get(r);
@@ -437,7 +437,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -453,7 +453,7 @@ public class QueryCache extends QueryCacheBase {
             Predicates.computeForEach(graph, r, null, procedure);
             return;
         }
-        Predicates entry = (Predicates)cache.getOrCreatePredicates(graph, r);
+        Predicates entry = (Predicates)cache.getOrCreatePredicates(graph.processor, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedurePredicates;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -464,7 +464,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read<?> r, boolean isSync) throws DatabaseException {
+    ReadEntry getOrCreateReadEntry(QueryProcessor processor, Read<?> r, boolean isSync) throws DatabaseException {
         ReadEntry existing = null;
         synchronized(readEntryMap) {
             existing = (ReadEntry)readEntryMap.get(r);
@@ -482,7 +482,7 @@ public class QueryCache extends QueryCacheBase {
             }
         }
         if(existing.isPending()) {
-          if(isSync) waitPending(graph, existing);
+          if(isSync) waitPending(processor, existing);
           else return null;
         }
         return existing;
@@ -500,7 +500,7 @@ public class QueryCache extends QueryCacheBase {
             ReadEntry.computeForEach(graph, r, null, procedure);
             return;
         }
-        ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, isSync);
+        ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph.processor, r, isSync);
         if(entry == null) {
             graph.processor.schedule(new SessionTask(false) {
                 @Override
@@ -525,7 +525,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, boolean isSync) throws DatabaseException {
+    AsyncReadEntry getOrCreateAsyncReadEntry(QueryProcessor processor, AsyncRead<?> r, boolean isSync) throws DatabaseException {
         AsyncReadEntry existing = null;
         synchronized(asyncReadEntryMap) {
             existing = (AsyncReadEntry)asyncReadEntryMap.get(r);
@@ -543,7 +543,7 @@ public class QueryCache extends QueryCacheBase {
             }
         }
         if(existing.isPending()) {
-          if(isSync) waitPending(graph, existing);
+          if(isSync) waitPending(processor, existing);
           else return null;
         }
         return existing;
@@ -561,7 +561,7 @@ public class QueryCache extends QueryCacheBase {
             AsyncReadEntry.computeForEach(graph, r, null, procedure);
             return;
         }
-        AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, isSync);
+        AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph.processor, r, isSync);
         if(entry == null) {
             graph.processor.schedule(new SessionTask(false) {
                 @Override
@@ -586,7 +586,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    Types getOrCreateTypes(ReadGraphImpl graph, int r) throws DatabaseException {
+    Types getOrCreateTypes(QueryProcessor processor, int r) throws DatabaseException {
         Types existing = null;
         synchronized(typesMap) {
             existing = (Types)typesMap.get(r);
@@ -603,7 +603,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -619,7 +619,7 @@ public class QueryCache extends QueryCacheBase {
             Types.computeForEach(graph, r, null, procedure);
             return;
         }
-        Types entry = (Types)cache.getOrCreateTypes(graph, r);
+        Types entry = (Types)cache.getOrCreateTypes(graph.processor, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypes;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -630,7 +630,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    ChildMap getOrCreateChildMap(ReadGraphImpl graph, int r) throws DatabaseException {
+    ChildMap getOrCreateChildMap(QueryProcessor processor, int r) throws DatabaseException {
         ChildMap existing = null;
         synchronized(childMapMap) {
             existing = (ChildMap)childMapMap.get(r);
@@ -647,7 +647,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -663,7 +663,7 @@ public class QueryCache extends QueryCacheBase {
             ChildMap.computeForEach(graph, r, null, procedure);
             return;
         }
-        ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph, r);
+        ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph.processor, r);
         InternalProcedure<ObjectResourceIdMap<String>> procedure_ = procedure != null ? procedure : emptyProcedureChildMap;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -674,7 +674,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    AssertedStatements getOrCreateAssertedStatements(ReadGraphImpl graph, int r1, int r2) throws DatabaseException {
+    AssertedStatements getOrCreateAssertedStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException {
         AssertedStatements existing = null;
         synchronized(assertedStatementsMap) {
             existing = (AssertedStatements)assertedStatementsMap.get(r1,r2);
@@ -691,7 +691,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -703,7 +703,7 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerAssertedStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final TripleIntProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph, r1,r2);
+        AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph.processor, r1,r2);
         TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -714,7 +714,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    AssertedPredicates getOrCreateAssertedPredicates(ReadGraphImpl graph, int r) throws DatabaseException {
+    AssertedPredicates getOrCreateAssertedPredicates(QueryProcessor processor, int r) throws DatabaseException {
         AssertedPredicates existing = null;
         synchronized(assertedPredicatesMap) {
             existing = (AssertedPredicates)assertedPredicatesMap.get(r);
@@ -731,7 +731,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -743,7 +743,7 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerAssertedPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph, r);
+        AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph.processor, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -754,7 +754,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    DirectSuperRelations getOrCreateDirectSuperRelations(ReadGraphImpl graph, int r) throws DatabaseException {
+    DirectSuperRelations getOrCreateDirectSuperRelations(QueryProcessor processor, int r) throws DatabaseException {
         DirectSuperRelations existing = null;
         synchronized(directSuperRelationsMap) {
             existing = (DirectSuperRelations)directSuperRelationsMap.get(r);
@@ -771,7 +771,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -783,7 +783,7 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerDirectSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph, r);
+        DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph.processor, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -794,7 +794,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    SuperTypes getOrCreateSuperTypes(ReadGraphImpl graph, int r) throws DatabaseException {
+    SuperTypes getOrCreateSuperTypes(QueryProcessor processor, int r) throws DatabaseException {
         SuperTypes existing = null;
         synchronized(superTypesMap) {
             existing = (SuperTypes)superTypesMap.get(r);
@@ -811,7 +811,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -823,7 +823,7 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerSuperTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<IntSet> procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph, r);
+        SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph.processor, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -834,7 +834,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    TypeHierarchy getOrCreateTypeHierarchy(ReadGraphImpl graph, int r) throws DatabaseException {
+    TypeHierarchy getOrCreateTypeHierarchy(QueryProcessor processor, int r) throws DatabaseException {
         TypeHierarchy existing = null;
         synchronized(typeHierarchyMap) {
             existing = (TypeHierarchy)typeHierarchyMap.get(r);
@@ -851,7 +851,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -863,7 +863,7 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerTypeHierarchy(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<IntSet> procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph, r);
+        TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph.processor, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -874,7 +874,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    SuperRelations getOrCreateSuperRelations(ReadGraphImpl graph, int r) throws DatabaseException {
+    SuperRelations getOrCreateSuperRelations(QueryProcessor processor, int r) throws DatabaseException {
         SuperRelations existing = null;
         synchronized(superRelationsMap) {
             existing = (SuperRelations)superRelationsMap.get(r);
@@ -891,7 +891,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -903,7 +903,7 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure<IntSet> procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph, r);
+        SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph.processor, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -914,7 +914,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    MultiReadEntry getOrCreateMultiReadEntry(ReadGraphImpl graph, MultiRead<?> r) throws DatabaseException {
+    MultiReadEntry getOrCreateMultiReadEntry(QueryProcessor processor, MultiRead<?> r) throws DatabaseException {
         MultiReadEntry existing = null;
         synchronized(multiReadEntryMap) {
             existing = (MultiReadEntry)multiReadEntryMap.get(r);
@@ -931,7 +931,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -943,7 +943,7 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerMultiReadEntry(ReadGraphImpl graph, MultiRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncMultiProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph, r);
+        MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph.processor, r);
         AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -954,7 +954,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead<?> r) throws DatabaseException {
+    AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(QueryProcessor processor, AsyncMultiRead<?> r) throws DatabaseException {
         AsyncMultiReadEntry existing = null;
         synchronized(asyncMultiReadEntryMap) {
             existing = (AsyncMultiReadEntry)asyncMultiReadEntryMap.get(r);
@@ -971,7 +971,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -983,7 +983,7 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncMultiProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph, r);
+        AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph.processor, r);
         AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
@@ -994,7 +994,7 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    ExternalReadEntry getOrCreateExternalReadEntry(ReadGraphImpl graph, ExternalRead<?> r) throws DatabaseException {
+    ExternalReadEntry getOrCreateExternalReadEntry(QueryProcessor processor, ExternalRead<?> r) throws DatabaseException {
         ExternalReadEntry existing = null;
         synchronized(externalReadEntryMap) {
             existing = (ExternalReadEntry)externalReadEntryMap.get(r);
@@ -1011,7 +1011,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(graph, existing);
+        if(existing.isPending()) waitPending(processor, existing);
         return existing;
     }
     
@@ -1023,7 +1023,7 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerExternalReadEntry(ReadGraphImpl graph, ExternalRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph, r);
+        ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph.processor, r);
         AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
index a75d6901079fa6d4bab47e41fe24c8524825c5fe..322e820a2aa76fe0eda4f0bfdc2f971d472d30da 100644 (file)
@@ -635,9 +635,7 @@ public class QueryCacheBase {
                }
        }
        
-       public static void waitPending(ReadGraphImpl graph, CacheEntry entry) throws DatabaseException {
-               
-               QueryProcessor processor = graph.processor;
+       public static void waitPending(QueryProcessor processor, CacheEntry entry) throws DatabaseException {
                
                int counter = 0;
                while(entry.isPending()) {