]> gerrit.simantics Code Review - simantics/platform.git/commitdiff
Attempt to fix regressions in new code base 92/1992/3
authorAntti Villberg <antti.villberg@semantum.fi>
Sun, 5 Aug 2018 07:58:52 +0000 (10:58 +0300)
committerTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Tue, 7 Aug 2018 08:34:13 +0000 (08:34 +0000)
gitlab #5

Change-Id: I0a72d8f68f448b9b775deb403146947bac107049

16 files changed:
bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncMultiProcedure.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/WriteGraphImpl.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CacheEntryBase.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/RelationInfoQuery.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java
bundles/org.simantics.db.services/src/org/simantics/db/services/adaption/reflection/ReflectionAdapter2.java

index cc50ea9b8fcc4447f209f55fd370c4b477380a89..45facb864efe25b4d3e07cbab237841e8df341e6 100644 (file)
@@ -31,7 +31,7 @@ public class BlockingAsyncMultiProcedure<Result> implements AsyncMultiProcedure<
     public BlockingAsyncMultiProcedure(ReadGraphImpl graph, AsyncMultiProcedure<Result> procedure, Object key) {
         this.procedure = procedure;
         this.key = key;
-        this.graph = ReadGraphImpl.newAsync(graph);
+        this.graph = graph;
         this.graph.asyncBarrier.inc();
     }
 
index 4ae530317908ce32118b867edaa6c85b52932d0d..ad9f1643829fe506ac57125edaf2bc724a249d05 100644 (file)
@@ -31,7 +31,7 @@ public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
     public BlockingAsyncProcedure(ReadGraphImpl graph, AsyncProcedure<Result> procedure, Object key) {
         this.procedure = procedure;
         this.key = key;
-        this.graph = ReadGraphImpl.newAsync(graph);
+        this.graph = graph;
         this.graph.asyncBarrier.inc();
     }
 
@@ -58,6 +58,10 @@ public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
         }
     }
 
+    public void waitBarrier() {
+        graph.asyncBarrier.waitBarrier(key, graph);
+    }
+
     @SuppressWarnings("unchecked")
     public Result get() throws DatabaseException {
 
@@ -72,6 +76,10 @@ public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
 
     }
 
+    public boolean isDone() {
+        return graph.asyncBarrier.get() == 0;
+    }
+
     @SuppressWarnings("unchecked")
     public Result getResult() {
         return (Result)result;
index 4ef46e6c681e35e5b041387e21942a0908793869..c95cd93d94c836e4455a98fbed12b97176ffb47d 100644 (file)
@@ -22,6 +22,7 @@ import java.nio.BufferUnderflowException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -89,6 +90,7 @@ import org.simantics.db.common.primitiverequest.Value;
 import org.simantics.db.common.primitiverequest.ValueImplied;
 import org.simantics.db.common.primitiverequest.VariantValueImplied;
 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.AsyncProcedureAdapter;
 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener;
@@ -198,9 +200,10 @@ public class ReadGraphImpl implements AsyncReadGraph {
     final static boolean EMPTY_RESOURCE_CHECK = false;
     
        final public CacheEntry parent;
+       public final ReadGraphImpl parentGraph;
        final public QueryProcessor processor;
        
-       public AsyncBarrierImpl asyncBarrier = null;
+       public final AsyncBarrierImpl asyncBarrier;
        
        final static Binding DATA_TYPE_BINDING_INTERNAL = Bindings.getBindingUnchecked(Datatype.class);
        final static Serializer DATA_TYPE_SERIALIZER = Bindings.getSerializerUnchecked(DATA_TYPE_BINDING_INTERNAL);
@@ -1930,11 +1933,8 @@ public class ReadGraphImpl implements AsyncReadGraph {
 
        @Override
        public <T> T syncRequest(final Read<T> request) throws DatabaseException {
-
                assert (request != null);
-
-               return QueryCache.resultReadEntry(this, request, parent, null, null);
-
+               return (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true);
        }
 
        @Override
@@ -2012,10 +2012,7 @@ public class ReadGraphImpl implements AsyncReadGraph {
                        throws DatabaseException {
 
                assert (request != null);
-               asyncBarrier = new AsyncBarrierImpl(null);
-               BlockingAsyncProcedure<T> ap = new BlockingAsyncProcedure<>(this, null, request);
-               syncRequest(request, ap);
-               return ap.get();
+               return syncRequest(request, new AsyncProcedureAdapter<>() );
 
        }
 
@@ -2045,11 +2042,9 @@ public class ReadGraphImpl implements AsyncReadGraph {
 
                ListenerBase listener = getListenerBase(procedure);
 
-               BlockingAsyncProcedure<T> ap = new BlockingAsyncProcedure<>(this, procedure, request);
-
-               QueryCache.runnerAsyncReadEntry(this, request, parent, listener, ap, true);
-
-               return ap.get();
+//             BlockingAsyncProcedure<T> ap = new BlockingAsyncProcedure<>(this, procedure, request);
+               return (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true);
+//             return ap.get();
 
        }
 
@@ -5175,7 +5170,7 @@ public class ReadGraphImpl implements AsyncReadGraph {
                assert (request != null);
                assert (procedure != null);
        
-               processor.schedule(new SessionTask(false) {
+               processor.schedule(new SessionTask(this) {
 
                        @Override
                        public void run(int thread) {
@@ -5192,7 +5187,7 @@ public class ReadGraphImpl implements AsyncReadGraph {
        }
 
     public static ReadGraphImpl createAsync(QueryProcessor support) {
-        return new ReadGraphImpl(null, support);
+        return new ReadGraphImpl(null, null, support);
     }
 
        @Override
@@ -5255,7 +5250,7 @@ public class ReadGraphImpl implements AsyncReadGraph {
                assert (request != null);
                assert (procedure != null);
 
-               processor.schedule(new SessionTask(false) {
+               processor.schedule(new SessionTask(this) {
 
                        @Override
                        public void run(int thread) {
@@ -5645,27 +5640,31 @@ public class ReadGraphImpl implements AsyncReadGraph {
         * this execution state.syncParent is the blocking request
         */
 
-       final private boolean isExternal(int thread) {
-               return thread == Integer.MIN_VALUE;
+       ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support) {
+               this.parentGraph = parentGraph;
+               this.parent = parent;
+               this.processor = support;
+               this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null);
+       }
+
+       ReadGraphImpl(ReadGraphImpl graph, CacheEntry parent) {
+               this(graph, parent, graph.processor);
        }
 
        ReadGraphImpl(ReadGraphImpl graph) {
-               this(graph.parent, graph.processor);
+               this(graph, graph.parent);
        }
 
-       ReadGraphImpl(CacheEntry parent, QueryProcessor support) {
-               this.parent = parent;
-               this.processor = support;
+       public ReadGraphImpl withParent(CacheEntry parent) {
+               return new ReadGraphImpl(this, parent);
        }
 
-       public static ReadGraphImpl create(QueryProcessor support) {
-               return new ReadGraphImpl(null, support);
+       public ReadGraphImpl forRecompute(CacheEntry parent) {
+               return new ReadGraphImpl(null, parent, processor);
        }
 
-       public static ReadGraphImpl newAsync(ReadGraphImpl parent) {
-               ReadGraphImpl result = new ReadGraphImpl(parent);
-               result.asyncBarrier = new AsyncBarrierImpl(parent.asyncBarrier);
-               return result;
+       public static ReadGraphImpl create(QueryProcessor support) {
+               return new ReadGraphImpl(null, null, support);
        }
 
        public ReadGraphImpl newRestart(ReadGraphImpl impl) {
@@ -5677,11 +5676,6 @@ public class ReadGraphImpl implements AsyncReadGraph {
 
        }
 
-       public ReadGraphImpl withParent(CacheEntry parent) {
-               if(parent == this.parent) return this;
-               else return new ReadGraphImpl(parent, processor);
-       }
-
        final private ListenerBase getListenerBase(final Object procedure) {
                if (procedure instanceof ListenerBase)
                        return (ListenerBase) procedure;
@@ -6324,7 +6318,17 @@ public class ReadGraphImpl implements AsyncReadGraph {
 
     @Override
     public boolean performPending() {
-        return processor.performPending(processor.thread.get());
+        return processor.performPending(this);
+    }
+
+    public Set<ReadGraphImpl> ancestorSet() {
+        HashSet<ReadGraphImpl> result = new HashSet<>();
+        ReadGraphImpl g = this;
+        while(g != null) {
+            result.add(g);
+            g = g.parentGraph;
+        }
+        return result;
     }
 
 }
index 56fb9ec72f977e967b6a3db6a9366d4d20433657..0c4e681385da7ab25a4c5afae9217a649c1a7966 100644 (file)
@@ -98,7 +98,7 @@ final public class WriteGraphImpl extends ReadGraphImpl implements WriteGraph {
 
     private WriteGraphImpl(CacheEntry parent2, QueryProcessor readSupport,
             WriteSupport writeSupport, VirtualGraph provider) {
-        super(parent2, readSupport);
+        super(null, parent2, readSupport);
         this.writeSupport = writeSupport;
         this.provider = provider;
     }
index 00abbebf65859e76d0e8dad481d01097cab04621..e6d42750d37b9caf62196bd50c043b802093d7e6 100644 (file)
@@ -16,7 +16,9 @@ import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.exception.RuntimeDatabaseException;
 import org.simantics.db.impl.BlockingAsyncProcedure;
 import org.simantics.db.impl.DebugPolicy;
+import org.simantics.db.impl.graph.AsyncBarrierImpl;
 import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 import org.simantics.db.procedure.AsyncProcedure;
 import org.simantics.db.request.AsyncRead;
 import org.slf4j.Logger;
@@ -133,7 +135,8 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
         } else {
 
             try {
-                proc.execute(graph, (T) getResult());
+                T result = (T) getResult();
+                proc.execute(graph, result);
             } catch (Throwable t) {
                 LOGGER.error("performFromCache proc.execute failed", t);
             }
@@ -144,46 +147,62 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
 
     }
 
-    public static <T> void computeForEach(ReadGraphImpl parentGraph, AsyncRead<T> request, AsyncReadEntry<T> entry,
-            AsyncProcedure<T> procedure_) throws DatabaseException {
+    public static <T> T computeForEach(ReadGraphImpl graph, AsyncRead<T> request, AsyncReadEntry<T> entry,
+            AsyncProcedure<T> procedure_, boolean needsToBlock) throws DatabaseException {
 
         AsyncProcedure<T> procedure = entry != null ? entry : procedure_;
 
-        ReadGraphImpl queryGraph = parentGraph.withParent(entry);
+        ReadGraphImpl queryGraph = graph.withParent(entry);
         
-        BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph, new AsyncProcedure<T>() {
+        BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph, null, request);
+        
+        class AsyncTask extends SessionTask {
 
-            @Override
-            public void execute(AsyncReadGraph returnGraph, T result) {
-                try {
-                    procedure.execute(parentGraph, result);
-                } catch (Throwable t) {
-                    LOGGER.error("computeForEach procedure.execute failed", t);
-                }
+            T result;
+            DatabaseException exception;
+            
+            public AsyncTask(ReadGraphImpl graph) {
+                super(graph);
             }
 
             @Override
-            public void exception(AsyncReadGraph returnGraph, Throwable t) {
-                try {
-                    procedure.exception(parentGraph, t);
-                } catch (Throwable t2) {
-                    LOGGER.error("computeForEach procedure.exception failed", t2);
+            public void run(int thread) {
+                if(needsToBlock) proc.waitBarrier();
+                if(proc.isDone()) {
+                    try {
+                        result = (T)proc.get();
+                        if(procedure != null) procedure.execute(graph, result);
+                    } catch (DatabaseException e) {
+                        if(procedure != null) procedure.exception(graph, e);
+                        exception = e;
+                    } catch (Throwable t) {
+                        DatabaseException dbe = new DatabaseException(t);
+                        if(procedure != null) procedure.exception(graph, dbe);
+                        exception = dbe;
+                    } finally {
+                        if (entry != null)
+                            entry.performFromCache(queryGraph, procedure_);
+                    }
+                } else {
+                    graph.processor.schedule(this);            
                 }
             }
-
-            @Override
-            public String toString() {
-                return procedure.toString();
-            }
-
-        }, request);
-
+            
+        }
+        
         request.perform(queryGraph, proc);
         
-        proc.get();
+        AsyncTask task = new AsyncTask(graph);
 
-        if (entry != null)
-            entry.performFromCache(parentGraph, procedure_);
+        if(needsToBlock) task.run(0);
+        else if (proc.isDone()) task.run(0);
+        else  {
+            graph.processor.schedule(task);
+            return null;
+        }
+        
+        if(task.exception != null) throw task.exception;
+        else return task.result;
 
     }
 
index 7c87b50e3acfa0e4630f3eb93ba8859c1eb13c90..7fc0432f832f2f2d3fceaca007c2d1866c12a8b8 100644 (file)
@@ -34,8 +34,8 @@ public abstract class CacheEntryBase<Procedure> extends CacheEntry<Procedure> {
        
     final public static CacheEntryBase[] NONE = new CacheEntryBase[0];
 
-       static Object NO_RESULT = new Object();
-       static protected Object INVALID_RESULT = new Object();
+       static Object NO_RESULT = new Object() { public String toString() { return "NO_RESULT"; }};
+       static protected Object INVALID_RESULT = new Object() { public String toString() { return "INVALID_RESULT"; }};
        
 //     // Just created
 //    static protected Object FRESH = new Object() { public String toString() { return "CREATED"; }};
@@ -74,6 +74,7 @@ public abstract class CacheEntryBase<Procedure> extends CacheEntry<Procedure> {
     }
 
     public void setReady() {
+        assert(result != NO_RESULT);
        statusOrException = READY;
     }
 
index 89892ed56950c8f73846f0f55dba1d7a579210f6..2f14b98a54cf6725adc148cc49d623973711bb20 100644 (file)
@@ -35,47 +35,32 @@ public class CodeGen {
                content.append("\n");
        }
 
-       public void generateQuery(StringBuilder content, String clazz, String[] signature, boolean runnerShortcut, boolean genAsync) {
-               generateGetOrCreate(content, clazz, signature, genAsync);
+    public void generateQuery(StringBuilder content, String clazz, String[] signature, boolean runnerShortcut, boolean genReturn) {
+               generateGetOrCreate(content, clazz, signature);
                generateRemove(content, clazz, signature);
-               generateRunner(content, clazz, signature, runnerShortcut, genAsync);
+               generateRunner(content, clazz, signature, runnerShortcut, genReturn);
        }
 
-       public void generateRunner(StringBuilder content, String clazz, String[] signature, boolean shortcut, boolean genAsync) {
+       public void generateRunner(StringBuilder content, String clazz, String[] signature, boolean shortcut, boolean genReturn) {
 
-               line(content, "public static void runner" + clazz + "(ReadGraphImpl graph, " + signature[0] + ", CacheEntry parent, ListenerBase listener, final " + signature[4] + " procedure" + (genAsync ? ", boolean isSync" : "") + ") throws DatabaseException {");
+               line(content, "public static " + (genReturn ? "Object" : "void") + " runner" + clazz + "(ReadGraphImpl graph, " + signature[0] + ", CacheEntry parent, ListenerBase listener, final " + signature[4] + " procedure) throws DatabaseException {");
                line(content, "    QueryCache cache  = graph.processor.cache;");
                if(shortcut) {
                        line(content, "    if(parent == null && listener == null && !cache.shouldCache(graph.processor, " + signature[1] + ")) {");
-                       line(content, "        " + clazz + ".computeForEach(graph, " + signature[1] + ", null, procedure);");
-                       line(content, "        return;");
-                       line(content, "    }");
-               }
-               line(content, "    " + clazz + " entry = (" + clazz + ")cache.getOrCreate" + clazz + "(graph.processor, " + signature[1] + (genAsync ? ", isSync" : "") + ");");
-               if(genAsync) {
-                       line(content, "    if(entry == null) {");
-                       line(content, "        graph.processor.schedule(new SessionTask(false) {");
-                       line(content, "            @Override");
-                       line(content, "            public void run(int thread) {");
-                       line(content, "                try {");
-                       line(content, "                    assert(!isSync);");
-                       line(content, "                    runner" + clazz + "(graph, r, parent, listener, procedure, isSync);");
-                       line(content, "                } catch (DatabaseException e) {");
-                       line(content, "                    Logger.defaultLogError(e);");
-                       line(content, "                }");
-                       line(content, "            }");
-                       line(content, "        });");
-                       line(content, "        return;");
+                       line(content, "        " + (genReturn ? "return " : "") + clazz + ".computeForEach(graph, " + signature[1] + ", null, procedure);");
+                       if(!genReturn) line(content, "        return;");
                        line(content, "    }");
                }
+               line(content, "    " + clazz + " entry = (" + clazz + ")cache.getOrCreate" + clazz + "(graph, " + signature[1] + ");");
                line(content, "    " + signature[4] + " procedure_ = procedure != null ? procedure : emptyProcedure" + clazz + ";");
                line(content, "    ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);");
-               line(content, "    if(entry.isReady()) entry.performFromCache(graph, procedure_);");
+               line(content, "    if(entry.isReady()) " + (genReturn ? "return " : "") + "entry.performFromCache(graph, procedure_);");
                line(content, "    else {");
                line(content, "      assert(entry.isPending());");
-               if(shortcut) line(content, "        " + clazz + ".computeForEach(graph, " + signature[1] + ", entry, procedure_);");
-               else line(content, "        entry.compute(graph, procedure_);"); 
-               line(content, "        if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());");
+               if(shortcut) line(content, "      " + (genReturn ? "Object result = " : "") + clazz + ".computeForEach(graph, " + signature[1] + ", entry, procedure_);");
+               else line(content, "      entry.compute(graph, procedure_);"); 
+               line(content, "      if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());");
+               if(genReturn) line(content, "      return result;");
                line(content, "    }");
                line(content, "}");
                line(content, "");
@@ -95,11 +80,11 @@ public class CodeGen {
 
        }
 
-       public void generateGetOrCreate(StringBuilder content, String clazz, String[] signature, boolean genAsync) {
+       public void generateGetOrCreate(StringBuilder content, String clazz, String[] signature) {
 
                String lower = Character.toLowerCase(clazz.charAt(0)) + clazz.substring(1);
 
-               line(content, "" + clazz + " getOrCreate" + clazz + "(QueryProcessor processor, " + signature[0] + (genAsync ? ", boolean isSync" : "") + ") throws DatabaseException {");
+               line(content, "" + clazz + " getOrCreate" + clazz + "(ReadGraphImpl graph, " + signature[0] + ") throws DatabaseException {");
                line(content, "    " + clazz + " existing = null;");
                line(content, "    synchronized(" + lower + "Map) {");
                line(content, "        existing = (" + clazz + ")" + lower + "Map.get(" + signature[1] + ");");
@@ -116,14 +101,7 @@ public class CodeGen {
                line(content, "            return existing;");
                line(content, "        }");
                line(content, "    }");
-               if(genAsync) {
-                       line(content, "    if(existing.isPending()) {");
-                       line(content, "      if(isSync) waitPending(processor, existing);");
-                       line(content, "      else return null;");
-                       line(content, "    }");
-               } else {
-                       line(content, "    if(existing.isPending()) waitPending(processor, existing);");
-               }
+               line(content, "    if(existing.isPending()) waitPending(graph, existing);");
                line(content, "    return existing;");
                line(content, "}");
                line(content, "");
index 7b591abb7fd80e062308ce3df88c86e872cfe27e..86f759362bdd3a5ec216949d467ffd3a8b8749c3 100644 (file)
@@ -23,7 +23,7 @@ public class QueryCache extends QueryCacheBase {
         super(querySupport, threads);
     }
 
-    Objects getOrCreateObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException {
+    Objects getOrCreateObjects(ReadGraphImpl graph, int r1, int r2) throws DatabaseException {
         Objects existing = null;
         synchronized(objectsMap) {
             existing = (Objects)objectsMap.get(r1,r2);
@@ -40,7 +40,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -56,18 +56,18 @@ public class QueryCache extends QueryCacheBase {
             Objects.computeForEach(graph, r1,r2, null, procedure);
             return;
         }
-        Objects entry = (Objects)cache.getOrCreateObjects(graph.processor, r1,r2);
+        Objects entry = (Objects)cache.getOrCreateObjects(graph, r1,r2);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            Objects.computeForEach(graph, r1,r2, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          Objects.computeForEach(graph, r1,r2, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    Statements getOrCreateStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException {
+    Statements getOrCreateStatements(ReadGraphImpl graph, int r1, int r2) throws DatabaseException {
         Statements existing = null;
         synchronized(statementsMap) {
             existing = (Statements)statementsMap.get(r1,r2);
@@ -84,7 +84,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -100,18 +100,18 @@ public class QueryCache extends QueryCacheBase {
             Statements.computeForEach(graph, r1,r2, null, procedure);
             return;
         }
-        Statements entry = (Statements)cache.getOrCreateStatements(graph.processor, r1,r2);
+        Statements entry = (Statements)cache.getOrCreateStatements(graph, r1,r2);
         TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            Statements.computeForEach(graph, r1,r2, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          Statements.computeForEach(graph, r1,r2, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    DirectObjects getOrCreateDirectObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException {
+    DirectObjects getOrCreateDirectObjects(ReadGraphImpl graph, int r1, int r2) throws DatabaseException {
         DirectObjects existing = null;
         synchronized(directObjectsMap) {
             existing = (DirectObjects)directObjectsMap.get(r1,r2);
@@ -128,7 +128,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -144,18 +144,18 @@ public class QueryCache extends QueryCacheBase {
             DirectObjects.computeForEach(graph, r1,r2, null, procedure);
             return;
         }
-        DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph.processor, r1,r2);
+        DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph, r1,r2);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            DirectObjects.computeForEach(graph, r1,r2, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          DirectObjects.computeForEach(graph, r1,r2, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    RelationInfoQuery getOrCreateRelationInfoQuery(QueryProcessor processor, int r) throws DatabaseException {
+    RelationInfoQuery getOrCreateRelationInfoQuery(ReadGraphImpl graph, int r) throws DatabaseException {
         RelationInfoQuery existing = null;
         synchronized(relationInfoQueryMap) {
             existing = (RelationInfoQuery)relationInfoQueryMap.get(r);
@@ -172,7 +172,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -188,18 +188,18 @@ public class QueryCache extends QueryCacheBase {
             RelationInfoQuery.computeForEach(graph, r, null, procedure);
             return;
         }
-        RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph.processor, r);
+        RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph, r);
         InternalProcedure<RelationInfo> procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            RelationInfoQuery.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          RelationInfoQuery.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    URIToResource getOrCreateURIToResource(QueryProcessor processor, String id) throws DatabaseException {
+    URIToResource getOrCreateURIToResource(ReadGraphImpl graph, String id) throws DatabaseException {
         URIToResource existing = null;
         synchronized(uRIToResourceMap) {
             existing = (URIToResource)uRIToResourceMap.get(id);
@@ -216,7 +216,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -232,18 +232,18 @@ public class QueryCache extends QueryCacheBase {
             URIToResource.computeForEach(graph, id, null, procedure);
             return;
         }
-        URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph.processor, id);
+        URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph, id);
         InternalProcedure<Integer> procedure_ = procedure != null ? procedure : emptyProcedureURIToResource;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            URIToResource.computeForEach(graph, id, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          URIToResource.computeForEach(graph, id, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    ValueQuery getOrCreateValueQuery(QueryProcessor processor, int r) throws DatabaseException {
+    ValueQuery getOrCreateValueQuery(ReadGraphImpl graph, int r) throws DatabaseException {
         ValueQuery existing = null;
         synchronized(valueQueryMap) {
             existing = (ValueQuery)valueQueryMap.get(r);
@@ -260,7 +260,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -276,18 +276,18 @@ public class QueryCache extends QueryCacheBase {
             ValueQuery.computeForEach(graph, r, null, procedure);
             return;
         }
-        ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph.processor, r);
+        ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph, r);
         InternalProcedure<byte[]> procedure_ = procedure != null ? procedure : emptyProcedureValueQuery;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            ValueQuery.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          ValueQuery.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    OrderedSet getOrCreateOrderedSet(QueryProcessor processor, int r) throws DatabaseException {
+    OrderedSet getOrCreateOrderedSet(ReadGraphImpl graph, int r) throws DatabaseException {
         OrderedSet existing = null;
         synchronized(orderedSetMap) {
             existing = (OrderedSet)orderedSetMap.get(r);
@@ -304,7 +304,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -320,18 +320,18 @@ public class QueryCache extends QueryCacheBase {
             OrderedSet.computeForEach(graph, r, null, procedure);
             return;
         }
-        OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph.processor, r);
+        OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            OrderedSet.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          OrderedSet.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    PrincipalTypes getOrCreatePrincipalTypes(QueryProcessor processor, int r) throws DatabaseException {
+    PrincipalTypes getOrCreatePrincipalTypes(ReadGraphImpl graph, int r) throws DatabaseException {
         PrincipalTypes existing = null;
         synchronized(principalTypesMap) {
             existing = (PrincipalTypes)principalTypesMap.get(r);
@@ -348,7 +348,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -364,18 +364,18 @@ public class QueryCache extends QueryCacheBase {
             PrincipalTypes.computeForEach(graph, r, null, procedure);
             return;
         }
-        PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph.processor, r);
+        PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            PrincipalTypes.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          PrincipalTypes.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    DirectPredicates getOrCreateDirectPredicates(QueryProcessor processor, int r) throws DatabaseException {
+    DirectPredicates getOrCreateDirectPredicates(ReadGraphImpl graph, int r) throws DatabaseException {
         DirectPredicates existing = null;
         synchronized(directPredicatesMap) {
             existing = (DirectPredicates)directPredicatesMap.get(r);
@@ -392,7 +392,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -408,18 +408,18 @@ public class QueryCache extends QueryCacheBase {
             DirectPredicates.computeForEach(graph, r, null, procedure);
             return;
         }
-        DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph.processor, r);
+        DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            DirectPredicates.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          DirectPredicates.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    Predicates getOrCreatePredicates(QueryProcessor processor, int r) throws DatabaseException {
+    Predicates getOrCreatePredicates(ReadGraphImpl graph, int r) throws DatabaseException {
         Predicates existing = null;
         synchronized(predicatesMap) {
             existing = (Predicates)predicatesMap.get(r);
@@ -436,7 +436,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -452,18 +452,18 @@ public class QueryCache extends QueryCacheBase {
             Predicates.computeForEach(graph, r, null, procedure);
             return;
         }
-        Predicates entry = (Predicates)cache.getOrCreatePredicates(graph.processor, r);
+        Predicates entry = (Predicates)cache.getOrCreatePredicates(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedurePredicates;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            Predicates.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          Predicates.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    ReadEntry getOrCreateReadEntry(QueryProcessor processor, Read<?> r, boolean isSync) throws DatabaseException {
+    ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read<?> r, boolean needsToBlock) throws DatabaseException {
         ReadEntry existing = null;
         synchronized(readEntryMap) {
             existing = (ReadEntry)readEntryMap.get(r);
@@ -481,8 +481,8 @@ public class QueryCache extends QueryCacheBase {
             }
         }
         if(existing.isPending()) {
-          if(isSync) waitPending(processor, existing);
-          else return null;
+            if(needsToBlock) waitPending(graph, existing);
+            else return null;
         }
         return existing;
     }
@@ -493,38 +493,37 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    public static void runnerReadEntry(ReadGraphImpl graph, Read<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean isSync) throws DatabaseException {
+    public static Object runnerReadEntry(ReadGraphImpl graph, Read<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, final boolean needsToBlock) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
         if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
-            ReadEntry.computeForEach(graph, r, null, procedure);
-            return;
+            return ReadEntry.computeForEach(graph, r, null, procedure);
         }
-        ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph.processor, r, isSync);
+        ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock);
         if(entry == null) {
-            graph.processor.schedule(new SessionTask(false) {
+            graph.processor.schedule(new SessionTask(graph) {
                 @Override
                 public void run(int thread) {
                     try {
-                        assert(!isSync);
-                        runnerReadEntry(graph, r, parent, listener, procedure, isSync);
+                        runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock);
                     } catch (DatabaseException e) {
                         Logger.defaultLogError(e);
                     }
                 }
             });
-            return;
+            return null;
         }
         AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureReadEntry;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) return entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            ReadEntry.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          Object result = ReadEntry.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          return result;
         }
     }
     
-    AsyncReadEntry getOrCreateAsyncReadEntry(QueryProcessor processor, AsyncRead<?> r, boolean isSync) throws DatabaseException {
+    AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, boolean needsToBlock) throws DatabaseException {
         AsyncReadEntry existing = null;
         synchronized(asyncReadEntryMap) {
             existing = (AsyncReadEntry)asyncReadEntryMap.get(r);
@@ -542,8 +541,8 @@ public class QueryCache extends QueryCacheBase {
             }
         }
         if(existing.isPending()) {
-          if(isSync) waitPending(processor, existing);
-          else return null;
+            if(needsToBlock) waitPending(graph, existing);
+            else return null;
         }
         return existing;
     }
@@ -554,38 +553,37 @@ public class QueryCache extends QueryCacheBase {
         }
     }
     
-    public static void runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean isSync) throws DatabaseException {
+    public static Object runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, final boolean needsToBlock) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
         if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) {
-            AsyncReadEntry.computeForEach(graph, r, null, procedure);
-            return;
+            return AsyncReadEntry.computeForEach(graph, r, null, procedure, needsToBlock);
         }
-        AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph.processor, r, isSync);
+        AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock);
         if(entry == null) {
-            graph.processor.schedule(new SessionTask(false) {
+            graph.processor.schedule(new SessionTask(graph) {
                 @Override
                 public void run(int thread) {
                     try {
-                        assert(!isSync);
-                        runnerAsyncReadEntry(graph, r, parent, listener, procedure, isSync);
+                        runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
                     } catch (DatabaseException e) {
                         Logger.defaultLogError(e);
                     }
                 }
             });
-            return;
+            return null;
         }
         AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
-        if(entry.isReady()) entry.performFromCache(graph, procedure_);
+        if(entry.isReady()) return entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            AsyncReadEntry.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          Object result = AsyncReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          return result;
         }
     }
     
-    Types getOrCreateTypes(QueryProcessor processor, int r) throws DatabaseException {
+    Types getOrCreateTypes(ReadGraphImpl graph, int r) throws DatabaseException {
         Types existing = null;
         synchronized(typesMap) {
             existing = (Types)typesMap.get(r);
@@ -602,7 +600,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -618,18 +616,18 @@ public class QueryCache extends QueryCacheBase {
             Types.computeForEach(graph, r, null, procedure);
             return;
         }
-        Types entry = (Types)cache.getOrCreateTypes(graph.processor, r);
+        Types entry = (Types)cache.getOrCreateTypes(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypes;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            Types.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          Types.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    ChildMap getOrCreateChildMap(QueryProcessor processor, int r) throws DatabaseException {
+    ChildMap getOrCreateChildMap(ReadGraphImpl graph, int r) throws DatabaseException {
         ChildMap existing = null;
         synchronized(childMapMap) {
             existing = (ChildMap)childMapMap.get(r);
@@ -646,7 +644,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -662,18 +660,18 @@ public class QueryCache extends QueryCacheBase {
             ChildMap.computeForEach(graph, r, null, procedure);
             return;
         }
-        ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph.processor, r);
+        ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph, r);
         InternalProcedure<ObjectResourceIdMap<String>> procedure_ = procedure != null ? procedure : emptyProcedureChildMap;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            ChildMap.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          ChildMap.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    TypeHierarchy getOrCreateTypeHierarchy(QueryProcessor processor, int r) throws DatabaseException {
+    TypeHierarchy getOrCreateTypeHierarchy(ReadGraphImpl graph, int r) throws DatabaseException {
         TypeHierarchy existing = null;
         synchronized(typeHierarchyMap) {
             existing = (TypeHierarchy)typeHierarchyMap.get(r);
@@ -690,7 +688,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -706,18 +704,18 @@ public class QueryCache extends QueryCacheBase {
             TypeHierarchy.computeForEach(graph, r, null, procedure);
             return;
         }
-        TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph.processor, r);
+        TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            TypeHierarchy.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          TypeHierarchy.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    SuperTypes getOrCreateSuperTypes(QueryProcessor processor, int r) throws DatabaseException {
+    SuperTypes getOrCreateSuperTypes(ReadGraphImpl graph, int r) throws DatabaseException {
         SuperTypes existing = null;
         synchronized(superTypesMap) {
             existing = (SuperTypes)superTypesMap.get(r);
@@ -734,7 +732,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -750,18 +748,18 @@ public class QueryCache extends QueryCacheBase {
             SuperTypes.computeForEach(graph, r, null, procedure);
             return;
         }
-        SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph.processor, r);
+        SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            SuperTypes.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          SuperTypes.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    SuperRelations getOrCreateSuperRelations(QueryProcessor processor, int r) throws DatabaseException {
+    SuperRelations getOrCreateSuperRelations(ReadGraphImpl graph, int r) throws DatabaseException {
         SuperRelations existing = null;
         synchronized(superRelationsMap) {
             existing = (SuperRelations)superRelationsMap.get(r);
@@ -778,7 +776,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -794,18 +792,18 @@ public class QueryCache extends QueryCacheBase {
             SuperRelations.computeForEach(graph, r, null, procedure);
             return;
         }
-        SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph.processor, r);
+        SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph, r);
         InternalProcedure<IntSet> procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            SuperRelations.computeForEach(graph, r, entry, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          SuperRelations.computeForEach(graph, r, entry, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    AssertedPredicates getOrCreateAssertedPredicates(QueryProcessor processor, int r) throws DatabaseException {
+    AssertedPredicates getOrCreateAssertedPredicates(ReadGraphImpl graph, int r) throws DatabaseException {
         AssertedPredicates existing = null;
         synchronized(assertedPredicatesMap) {
             existing = (AssertedPredicates)assertedPredicatesMap.get(r);
@@ -822,7 +820,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -834,18 +832,18 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerAssertedPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph.processor, r);
+        AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            entry.compute(graph, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          entry.compute(graph, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    AssertedStatements getOrCreateAssertedStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException {
+    AssertedStatements getOrCreateAssertedStatements(ReadGraphImpl graph, int r1, int r2) throws DatabaseException {
         AssertedStatements existing = null;
         synchronized(assertedStatementsMap) {
             existing = (AssertedStatements)assertedStatementsMap.get(r1,r2);
@@ -862,7 +860,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -874,18 +872,18 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerAssertedStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final TripleIntProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph.processor, r1,r2);
+        AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph, r1,r2);
         TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            entry.compute(graph, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          entry.compute(graph, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    DirectSuperRelations getOrCreateDirectSuperRelations(QueryProcessor processor, int r) throws DatabaseException {
+    DirectSuperRelations getOrCreateDirectSuperRelations(ReadGraphImpl graph, int r) throws DatabaseException {
         DirectSuperRelations existing = null;
         synchronized(directSuperRelationsMap) {
             existing = (DirectSuperRelations)directSuperRelationsMap.get(r);
@@ -902,7 +900,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -914,18 +912,18 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerDirectSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph.processor, r);
+        DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph, r);
         IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            entry.compute(graph, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          entry.compute(graph, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    MultiReadEntry getOrCreateMultiReadEntry(QueryProcessor processor, MultiRead<?> r) throws DatabaseException {
+    MultiReadEntry getOrCreateMultiReadEntry(ReadGraphImpl graph, MultiRead<?> r) throws DatabaseException {
         MultiReadEntry existing = null;
         synchronized(multiReadEntryMap) {
             existing = (MultiReadEntry)multiReadEntryMap.get(r);
@@ -942,7 +940,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -954,18 +952,18 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerMultiReadEntry(ReadGraphImpl graph, MultiRead<?> r, CacheEntry parent, ListenerBase listener, final SyncMultiProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph.processor, r);
+        MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph, r);
         SyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            entry.compute(graph, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          entry.compute(graph, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(QueryProcessor processor, AsyncMultiRead<?> r) throws DatabaseException {
+    AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead<?> r) throws DatabaseException {
         AsyncMultiReadEntry existing = null;
         synchronized(asyncMultiReadEntryMap) {
             existing = (AsyncMultiReadEntry)asyncMultiReadEntryMap.get(r);
@@ -982,7 +980,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -994,18 +992,18 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncMultiProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph.processor, r);
+        AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph, r);
         AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            entry.compute(graph, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          entry.compute(graph, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
-    ExternalReadEntry getOrCreateExternalReadEntry(QueryProcessor processor, ExternalRead<?> r) throws DatabaseException {
+    ExternalReadEntry getOrCreateExternalReadEntry(ReadGraphImpl graph, ExternalRead<?> r) throws DatabaseException {
         ExternalReadEntry existing = null;
         synchronized(externalReadEntryMap) {
             existing = (ExternalReadEntry)externalReadEntryMap.get(r);
@@ -1022,7 +1020,7 @@ public class QueryCache extends QueryCacheBase {
                 return existing;
             }
         }
-        if(existing.isPending()) waitPending(processor, existing);
+        if(existing.isPending()) waitPending(graph, existing);
         return existing;
     }
     
@@ -1034,14 +1032,14 @@ public class QueryCache extends QueryCacheBase {
     
     public static void runnerExternalReadEntry(ReadGraphImpl graph, ExternalRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure) throws DatabaseException {
         QueryCache cache  = graph.processor.cache;
-        ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph.processor, r);
+        ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph, r);
         AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry;
         ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);
         if(entry.isReady()) entry.performFromCache(graph, procedure_);
         else {
           assert(entry.isPending());
-            entry.compute(graph, procedure_);
-            if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
+          entry.compute(graph, procedure_);
+          if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());
         }
     }
     
index b3d1c5f8836dcfdc82637dc01fec5bd3dfebce82..d7a235fc813d4f302e719bb66647cd2cef972d91 100644 (file)
@@ -637,18 +637,16 @@ public class QueryCacheBase {
                }
        }
 
-       public static void waitPending(QueryProcessor processor, CacheEntry entry) throws DatabaseException {
+       public static void waitPending(ReadGraphImpl graph, CacheEntry entry) throws DatabaseException {
 
                int counter = 0;
                while(entry.isPending()) {
                        try {
-                               SessionTask task = null;//processor.getOwnTask(processor.thread.get());
-                               if(task != null) {
-                                       task.run(processor.thread.get());
-                               } else {
+                           boolean performed = graph.performPending();
+                           if(!performed) {
                                        Thread.sleep(1);
                                        counter++;
-                                       if(counter > 5000) {
+                                       if(counter > 30000) {
                                                CacheEntryBase base = ((CacheEntryBase)entry);
 //                                             if(base.created != null) {
 //                                                     System.err.println("created:");
@@ -662,8 +660,8 @@ public class QueryCacheBase {
 //                                                     System.err.println("ready:");
 //                                                     base.ready.printStackTrace();
 //                                             }
-                                               new Exception("Timeout waiting for request to complete: " + entry.getOriginalRequest().toString()).printStackTrace();
-                                               throw new DatabaseException("Timeout waiting for request to complete.");
+                                               new Exception("Timeout waiting for request to complete: " + entry.getOriginalRequest()).printStackTrace();
+                                               throw new DatabaseException("Timeout waiting for request to complete." +  entry.getOriginalRequest());
                                                //System.err.println("asd");
                                                //base.getQuery().recompute(null, null, entry);
                                        }
@@ -1129,15 +1127,11 @@ public class QueryCacheBase {
        }
 
        public static <T> T resultReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, AsyncProcedure<T> procedure) throws DatabaseException {
-               AsyncProcedureWrapper<T> wrap = new AsyncProcedureWrapper<>(procedure);
-               QueryCache.runnerReadEntry(graph, r, parent, listener, wrap, true);
-               return wrap.get();
+               return (T)QueryCache.runnerReadEntry(graph, r, parent, listener, procedure, true);
        }
 
        public static <T> T resultAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, AsyncProcedure<T> procedure) throws DatabaseException {
-               AsyncProcedureWrapper<T> wrap = new AsyncProcedureWrapper<>(procedure);
-               QueryCache.runnerAsyncReadEntry(graph, r, parent, listener, wrap, true);
-               return wrap.get();
+               return (T)QueryCache.runnerAsyncReadEntry(graph, r, parent, listener, procedure, true);
        }
 
        public static byte[] resultValueQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener) throws DatabaseException {
index 9c60691fad7a0a850228997e85ffb47168ca1aae..65b277e6c8d2985410b82e12ce4edca3f1fea09d 100644 (file)
@@ -171,23 +171,25 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-       SessionTask getOwnTask(int thread) {
+       SessionTask getOwnTask(ReadGraphImpl impl) {
+               Set<ReadGraphImpl> ancestors = impl.ancestorSet();
                synchronized(querySupportLock) {
                        int index = 0;
                        while(index < freeScheduling.size()) {
                                SessionTask task = freeScheduling.get(index);
-                               if(task.thread == thread && !task.systemCall)
+                               if(task.hasCommonParent(ancestors)) {
                                        return freeScheduling.remove(index);
+                               }
                                index++;
                        }
                }
                return null;
        }
-       
-       public boolean performPending(int thread) {
-               SessionTask task = getOwnTask(thread);
+
+       public boolean performPending(ReadGraphImpl graph) {
+               SessionTask task = getOwnTask(graph);
                if(task != null) {
-                       task.run(thread);
+                       task.run(QueryProcessor.thread.get());
                        return true;
                } else {
                        return false;
@@ -200,10 +202,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        final public void schedule(SessionTask request) {
 
-               int performer = request.thread;
+               //int performer = request.thread;
 
-               if(DebugPolicy.SCHEDULE)
-                       System.out.println("schedule " + request + " " + " -> " + performer);
+//             if(DebugPolicy.SCHEDULE)
+//                     System.out.println("schedule " + request + " " + " -> " + performer);
 
                //assert(performer >= 0);
 
@@ -256,34 +258,29 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        final int THREADS;
        final public int  THREAD_MASK;
-       
-       final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group"); 
+
+       final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
 
        public static abstract class SessionTask {
 
-               final public int thread;
-               final public boolean systemCall;
-//             final public int syncCaller;
-               //final public Object object;
+               public final ReadGraphImpl graph;
+               private Set<ReadGraphImpl> ancestors;
 
-               public SessionTask(boolean systemCall) {
-                       this.thread = QueryProcessor.thread.get();
-                       this.systemCall = systemCall;
-//                     this.syncCaller = -1;
-                       //this.object = object;
+               public SessionTask(ReadGraphImpl graph) {
+                       this.graph = graph;
                }
 
-//             public SessionTask(Object object, int syncCaller) {
-//                     this.thread = QueryProcessor.thread.get();
-//                     this.syncCaller = syncCaller;
-//                     this.object = object;
-//             }
+               public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
+                       if(graph == null) return false;
+                       if(ancestors == null) ancestors = graph.ancestorSet();
+                       return !Collections.disjoint(ancestors, otherAncestors);
+               }
 
                public abstract void run(int thread);
 
                @Override
                public String toString() {
-                       return "SessionTask[" + super.toString() + "]";
+                       return "SessionTask[" + graph.parent + "]";
                }
 
        }
@@ -294,7 +291,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                final public DataContainer<Throwable> throwable; 
 
                public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
-                       super(true);
+                       super(null);
                        this.throwable = throwable;
                        this.notify = notify;
                }
@@ -1593,7 +1590,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                        entry.prepareRecompute(querySupport);
                        
-                       ReadGraphImpl parentGraph = graph.withParent(entry);
+                       ReadGraphImpl parentGraph = graph.forRecompute(entry);
 
                        query.recompute(parentGraph);
 
@@ -1765,6 +1762,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                                                } else {
                                                        // If not changed, keep the old value
                                                        immediate.setResult(oldValue);
+                                                       immediate.setReady();
                                                        listenersUnknown = true;
                                                }
 
index c95242defc77a45ae859f49042a2fc1e13df4f20..61e934fe9cc59f7458aa8f247e8652c280006a89 100644 (file)
@@ -95,56 +95,68 @@ public final class ReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implem
 
     }
 
-    public static <T> void computeForEach(ReadGraphImpl graph, Read<T> request, ReadEntry<T> entry,
+    public static <T> T computeForEach(ReadGraphImpl graph, Read<T> request, ReadEntry<T> entry,
             AsyncProcedure<T> procedure_) throws DatabaseException {
 
         AsyncProcedure<T> procedure = entry != null ? entry : procedure_;
 
-        ReadGraphImpl queryGraph = entry != null ? graph.withParent(entry) : graph;
+        ReadGraphImpl queryGraph = graph.withParent(entry);
 
         try {
 
             T result = request.perform(queryGraph);
-            procedure.execute(graph, result);
+            if(procedure != null) procedure.execute(graph, result);
+            return (T)result;
 
         } catch (DatabaseException e) {
 
-            procedure.exception(graph, e);
+            if(procedure != null) procedure.exception(graph, e);
+            throw e;
 
         } catch (Throwable t) {
 
             DatabaseException dbe = new DatabaseException(t);
-            procedure.exception(graph, dbe);
+            if(procedure != null) procedure.exception(graph, dbe);
+            throw dbe;
 
-        }
+        } finally {
+
+            if (entry != null)
+                entry.performFromCache(queryGraph, procedure_);
 
-        if (entry != null)
-            entry.performFromCache(queryGraph, procedure_);
+        }
 
     }
 
-    public Object performFromCache(ReadGraphImpl graph, AsyncProcedure<T> procedure) {
+    public Object performFromCache(ReadGraphImpl graph, AsyncProcedure<T> procedure) throws DatabaseException {
 
         AsyncProcedure<T> proc = (AsyncProcedure<T>) procedure;
 
-        if (proc != null) {
-            if (isExcepted()) {
-                try {
-                    proc.exception(graph, (Throwable) getResult());
-                } catch (Throwable t) {
-                    LOGGER.error("performFromCache proc.exception failed", t);
+        if (isExcepted()) {
+                if(proc != null) {
+                    try {
+                        proc.exception(graph, (Throwable) getResult());
+                    } catch (Throwable t) {
+                        LOGGER.error("performFromCache proc.exception failed", t);
+                    }
                 }
-            } else {
-                try {
-                    proc.execute(graph, (T) getResult());
-                } catch (Throwable t) {
-                    LOGGER.error("performFromCache proc.execute failed", t);
+                Throwable t = (Throwable) getResult();
+                if(t instanceof DatabaseException) {
+                    throw (DatabaseException)t;
+                } else {
+                    throw new DatabaseException(t);
                 }
-            }
+        } else {
+                if(proc != null) {
+                    try {
+                        proc.execute(graph, (T) getResult());
+                    } catch (Throwable t) {
+                        LOGGER.error("performFromCache proc.execute failed", t);
+                    }
+                }
+                return (T)getResult();
         }
 
-        return (T) getResult();
-
     }
 
     @Override
index 0b60fdd0c596a144fd9ccf5f7211786fdaf5c839..cffd2984a0c31219acfcb740aa7cf921df0e709e 100644 (file)
@@ -95,12 +95,6 @@ public final class RelationInfoQuery extends UnaryQueryP<RelationInfo> {
                return "RelationInfoQuery[" + id + "]";
        }
 
-       @Override
-       public void setResult(Object result) {
-               super.setResult(result);
-               setReady();
-       }
-
        @Override
        public int type() {
                return RequestFlags.IMMEDIATE_UPDATE;
index 30e0dde5244de43dbd11ff9582015dad0d831f76..8a634d1cfb05504a7ffa4d86db2ed12c308aa2e1 100644 (file)
@@ -414,9 +414,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
             }
         }
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(true) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
             public void run(int thread) {
@@ -552,9 +550,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(true) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
             public void run(int thread) {
@@ -638,9 +634,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(true) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
             public void run(int thread) {
@@ -1377,9 +1371,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(true) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
             public void run(int thread) {
@@ -1477,9 +1469,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(true) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
             public void run(int thread) {
index 2902274b94202c9c79610f006f5acbf91365338b..c11242163a314d8c9b69a21cb02821df18f00000 100644 (file)
@@ -118,7 +118,7 @@ public class SessionRequestManager {
        
        public synchronized void startRead(int thread, final SessionRead task) {
                
-               session.queryProvider2.schedule(new SessionTask(true) {
+               session.queryProvider2.schedule(new SessionTask(null) {
 
                        @Override
             public void run(int thread) {
@@ -142,7 +142,7 @@ public class SessionRequestManager {
        
        public synchronized void startReadUpdate(int thread) {
                
-               session.queryProvider2.schedule(new SessionTask(true) {
+               session.queryProvider2.schedule(new SessionTask(null) {
 
                        @Override
                        public void run(int thread) {
@@ -163,7 +163,7 @@ public class SessionRequestManager {
 
        public synchronized void startWrite(int thread, final SessionTask task) {
                
-               session.queryProvider2.schedule(new SessionTask(true) {
+               session.queryProvider2.schedule(new SessionTask(null) {
 
                        @Override
                        public void run(int thread) {
@@ -184,7 +184,7 @@ public class SessionRequestManager {
 
        public synchronized void startWriteUpdate(int thread) {
                
-               session.queryProvider2.schedule(new SessionTask(true) {
+               session.queryProvider2.schedule(new SessionTask(null) {
 
                        @Override
                        public void run(int thread) {
@@ -271,7 +271,7 @@ public class SessionRequestManager {
                        if (!reads.isEmpty()) {
 
                                final SessionRead read = reads.poll();
-                               session.queryProvider2.schedule(new SessionTask(true) {
+                               session.queryProvider2.schedule(new SessionTask(null) {
 
                                        @Override
                                        public void run(int thread) {
@@ -312,7 +312,7 @@ public class SessionRequestManager {
                assert(State.INIT != state);
                
                if(State.READ == state) {
-                       session.queryProvider2.schedule(new SessionTask(true) {
+                       session.queryProvider2.schedule(new SessionTask(null) {
 
                                @Override
                                public void run(int thread) {
index 9a0097b114138b30c767f7c32953ff6f5903ee06..ba0ce846b34a91b4e44dba931e9cbdddbe5f28d3 100644 (file)
@@ -44,7 +44,7 @@ public class UndoRedoSupportImpl implements UndoRedoSupport {
         final Operation fop = (Operation)ops.toArray()[0];
         final DataContainer<Long> id = new DataContainer<Long>(0L);
         final TaskHelper th = new TaskHelper("Undo");
-        session.requestManager.scheduleWrite(new SessionTask(true) {
+        session.requestManager.scheduleWrite(new SessionTask(null) {
             @Override
             public void run(int thread) {
                 session.flushCounter = 0;
index ed750f538f615140b893d3ece55f3711d670226d..187fc52199a390f37b68b3a48a940d20faa4d359 100644 (file)
@@ -17,9 +17,8 @@ import java.util.Arrays;
 
 import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.Resource;
-import org.simantics.db.ReadGraph;
 import org.simantics.db.adaption.Adapter;
-import org.simantics.db.common.request.ReadRequest;
+import org.simantics.db.common.request.AsyncReadRequest;
 import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.procedure.AsyncProcedure;
 
@@ -79,16 +78,16 @@ public class ReflectionAdapter2<T> implements Adapter<T, Resource> {
                
        } else {
        
-            g.asyncRequest(new ReadRequest() {
+            g.asyncRequest(new AsyncReadRequest() {
     
                 @Override
-                public void run(ReadGraph graph) throws DatabaseException {
+                public void run(AsyncReadGraph graph) {
                        
                     Object[] args = new Object[parameters.length];
                     try {
                        for(int i=0;i<parameters.length;++i)
                                args[i] = parameters[i].adapt(graph, r);
-                       procedure.execute(g, constructor.newInstance(args));
+                       procedure.execute(graph, constructor.newInstance(args));
                                } catch (IllegalArgumentException e) {
                                    procedure.exception(g, e);
                                        e.printStackTrace();