From: Antti Villberg Date: Sun, 5 Aug 2018 07:58:52 +0000 (+0300) Subject: Attempt to fix regressions in new code base X-Git-Tag: v1.43.0~136^2~413 X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=commitdiff_plain;h=277b1c7fb3fdd8effb4bf2b447358d0e4ef0e302 Attempt to fix regressions in new code base gitlab #5 Change-Id: I0a72d8f68f448b9b775deb403146947bac107049 --- diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncMultiProcedure.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncMultiProcedure.java index cc50ea9b8..45facb864 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncMultiProcedure.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncMultiProcedure.java @@ -31,7 +31,7 @@ public class BlockingAsyncMultiProcedure implements AsyncMultiProcedure< public BlockingAsyncMultiProcedure(ReadGraphImpl graph, AsyncMultiProcedure procedure, Object key) { this.procedure = procedure; this.key = key; - this.graph = ReadGraphImpl.newAsync(graph); + this.graph = graph; this.graph.asyncBarrier.inc(); } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java index 4ae530317..ad9f16438 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java @@ -31,7 +31,7 @@ public class BlockingAsyncProcedure implements AsyncProcedure { public BlockingAsyncProcedure(ReadGraphImpl graph, AsyncProcedure procedure, Object key) { this.procedure = procedure; this.key = key; - this.graph = ReadGraphImpl.newAsync(graph); + this.graph = graph; this.graph.asyncBarrier.inc(); } @@ -58,6 +58,10 @@ public class BlockingAsyncProcedure implements AsyncProcedure { } } + public void waitBarrier() { + graph.asyncBarrier.waitBarrier(key, graph); + } + @SuppressWarnings("unchecked") public Result get() throws DatabaseException { @@ -72,6 +76,10 @@ public class BlockingAsyncProcedure implements AsyncProcedure { } + public boolean isDone() { + return graph.asyncBarrier.get() == 0; + } + @SuppressWarnings("unchecked") public Result getResult() { return (Result)result; diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java index 4ef46e6c6..c95cd93d9 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java @@ -22,6 +22,7 @@ import java.nio.BufferUnderflowException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; @@ -89,6 +90,7 @@ import org.simantics.db.common.primitiverequest.Value; import org.simantics.db.common.primitiverequest.ValueImplied; import org.simantics.db.common.primitiverequest.VariantValueImplied; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; +import org.simantics.db.common.procedure.adapter.AsyncProcedureAdapter; import org.simantics.db.common.procedure.adapter.ProcedureAdapter; import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter; import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener; @@ -198,9 +200,10 @@ public class ReadGraphImpl implements AsyncReadGraph { final static boolean EMPTY_RESOURCE_CHECK = false; final public CacheEntry parent; + public final ReadGraphImpl parentGraph; final public QueryProcessor processor; - public AsyncBarrierImpl asyncBarrier = null; + public final AsyncBarrierImpl asyncBarrier; final static Binding DATA_TYPE_BINDING_INTERNAL = Bindings.getBindingUnchecked(Datatype.class); final static Serializer DATA_TYPE_SERIALIZER = Bindings.getSerializerUnchecked(DATA_TYPE_BINDING_INTERNAL); @@ -1930,11 +1933,8 @@ public class ReadGraphImpl implements AsyncReadGraph { @Override public T syncRequest(final Read request) throws DatabaseException { - assert (request != null); - - return QueryCache.resultReadEntry(this, request, parent, null, null); - + return (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true); } @Override @@ -2012,10 +2012,7 @@ public class ReadGraphImpl implements AsyncReadGraph { throws DatabaseException { assert (request != null); - asyncBarrier = new AsyncBarrierImpl(null); - BlockingAsyncProcedure ap = new BlockingAsyncProcedure<>(this, null, request); - syncRequest(request, ap); - return ap.get(); + return syncRequest(request, new AsyncProcedureAdapter<>() ); } @@ -2045,11 +2042,9 @@ public class ReadGraphImpl implements AsyncReadGraph { ListenerBase listener = getListenerBase(procedure); - BlockingAsyncProcedure ap = new BlockingAsyncProcedure<>(this, procedure, request); - - QueryCache.runnerAsyncReadEntry(this, request, parent, listener, ap, true); - - return ap.get(); +// BlockingAsyncProcedure ap = new BlockingAsyncProcedure<>(this, procedure, request); + return (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true); +// return ap.get(); } @@ -5175,7 +5170,7 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); assert (procedure != null); - processor.schedule(new SessionTask(false) { + processor.schedule(new SessionTask(this) { @Override public void run(int thread) { @@ -5192,7 +5187,7 @@ public class ReadGraphImpl implements AsyncReadGraph { } public static ReadGraphImpl createAsync(QueryProcessor support) { - return new ReadGraphImpl(null, support); + return new ReadGraphImpl(null, null, support); } @Override @@ -5255,7 +5250,7 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); assert (procedure != null); - processor.schedule(new SessionTask(false) { + processor.schedule(new SessionTask(this) { @Override public void run(int thread) { @@ -5645,27 +5640,31 @@ public class ReadGraphImpl implements AsyncReadGraph { * this execution state.syncParent is the blocking request */ - final private boolean isExternal(int thread) { - return thread == Integer.MIN_VALUE; + ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support) { + this.parentGraph = parentGraph; + this.parent = parent; + this.processor = support; + this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null); + } + + ReadGraphImpl(ReadGraphImpl graph, CacheEntry parent) { + this(graph, parent, graph.processor); } ReadGraphImpl(ReadGraphImpl graph) { - this(graph.parent, graph.processor); + this(graph, graph.parent); } - ReadGraphImpl(CacheEntry parent, QueryProcessor support) { - this.parent = parent; - this.processor = support; + public ReadGraphImpl withParent(CacheEntry parent) { + return new ReadGraphImpl(this, parent); } - public static ReadGraphImpl create(QueryProcessor support) { - return new ReadGraphImpl(null, support); + public ReadGraphImpl forRecompute(CacheEntry parent) { + return new ReadGraphImpl(null, parent, processor); } - public static ReadGraphImpl newAsync(ReadGraphImpl parent) { - ReadGraphImpl result = new ReadGraphImpl(parent); - result.asyncBarrier = new AsyncBarrierImpl(parent.asyncBarrier); - return result; + public static ReadGraphImpl create(QueryProcessor support) { + return new ReadGraphImpl(null, null, support); } public ReadGraphImpl newRestart(ReadGraphImpl impl) { @@ -5677,11 +5676,6 @@ public class ReadGraphImpl implements AsyncReadGraph { } - public ReadGraphImpl withParent(CacheEntry parent) { - if(parent == this.parent) return this; - else return new ReadGraphImpl(parent, processor); - } - final private ListenerBase getListenerBase(final Object procedure) { if (procedure instanceof ListenerBase) return (ListenerBase) procedure; @@ -6324,7 +6318,17 @@ public class ReadGraphImpl implements AsyncReadGraph { @Override public boolean performPending() { - return processor.performPending(processor.thread.get()); + return processor.performPending(this); + } + + public Set ancestorSet() { + HashSet result = new HashSet<>(); + ReadGraphImpl g = this; + while(g != null) { + result.add(g); + g = g.parentGraph; + } + return result; } } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/WriteGraphImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/WriteGraphImpl.java index 56fb9ec72..0c4e68138 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/WriteGraphImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/WriteGraphImpl.java @@ -98,7 +98,7 @@ final public class WriteGraphImpl extends ReadGraphImpl implements WriteGraph { private WriteGraphImpl(CacheEntry parent2, QueryProcessor readSupport, WriteSupport writeSupport, VirtualGraph provider) { - super(parent2, readSupport); + super(null, parent2, readSupport); this.writeSupport = writeSupport; this.provider = provider; } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java index 00abbebf6..e6d42750d 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java @@ -16,7 +16,9 @@ import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.impl.BlockingAsyncProcedure; import org.simantics.db.impl.DebugPolicy; +import org.simantics.db.impl.graph.AsyncBarrierImpl; import org.simantics.db.impl.graph.ReadGraphImpl; +import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.request.AsyncRead; import org.slf4j.Logger; @@ -133,7 +135,8 @@ final public class AsyncReadEntry extends CacheEntryBase> i } else { try { - proc.execute(graph, (T) getResult()); + T result = (T) getResult(); + proc.execute(graph, result); } catch (Throwable t) { LOGGER.error("performFromCache proc.execute failed", t); } @@ -144,46 +147,62 @@ final public class AsyncReadEntry extends CacheEntryBase> i } - public static void computeForEach(ReadGraphImpl parentGraph, AsyncRead request, AsyncReadEntry entry, - AsyncProcedure procedure_) throws DatabaseException { + public static T computeForEach(ReadGraphImpl graph, AsyncRead request, AsyncReadEntry entry, + AsyncProcedure procedure_, boolean needsToBlock) throws DatabaseException { AsyncProcedure procedure = entry != null ? entry : procedure_; - ReadGraphImpl queryGraph = parentGraph.withParent(entry); + ReadGraphImpl queryGraph = graph.withParent(entry); - BlockingAsyncProcedure proc = new BlockingAsyncProcedure<>(queryGraph, new AsyncProcedure() { + BlockingAsyncProcedure proc = new BlockingAsyncProcedure<>(queryGraph, null, request); + + class AsyncTask extends SessionTask { - @Override - public void execute(AsyncReadGraph returnGraph, T result) { - try { - procedure.execute(parentGraph, result); - } catch (Throwable t) { - LOGGER.error("computeForEach procedure.execute failed", t); - } + T result; + DatabaseException exception; + + public AsyncTask(ReadGraphImpl graph) { + super(graph); } @Override - public void exception(AsyncReadGraph returnGraph, Throwable t) { - try { - procedure.exception(parentGraph, t); - } catch (Throwable t2) { - LOGGER.error("computeForEach procedure.exception failed", t2); + public void run(int thread) { + if(needsToBlock) proc.waitBarrier(); + if(proc.isDone()) { + try { + result = (T)proc.get(); + if(procedure != null) procedure.execute(graph, result); + } catch (DatabaseException e) { + if(procedure != null) procedure.exception(graph, e); + exception = e; + } catch (Throwable t) { + DatabaseException dbe = new DatabaseException(t); + if(procedure != null) procedure.exception(graph, dbe); + exception = dbe; + } finally { + if (entry != null) + entry.performFromCache(queryGraph, procedure_); + } + } else { + graph.processor.schedule(this); } } - - @Override - public String toString() { - return procedure.toString(); - } - - }, request); - + + } + request.perform(queryGraph, proc); - proc.get(); + AsyncTask task = new AsyncTask(graph); - if (entry != null) - entry.performFromCache(parentGraph, procedure_); + if(needsToBlock) task.run(0); + else if (proc.isDone()) task.run(0); + else { + graph.processor.schedule(task); + return null; + } + + if(task.exception != null) throw task.exception; + else return task.result; } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CacheEntryBase.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CacheEntryBase.java index 7c87b50e3..7fc0432f8 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CacheEntryBase.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CacheEntryBase.java @@ -34,8 +34,8 @@ public abstract class CacheEntryBase extends CacheEntry { final public static CacheEntryBase[] NONE = new CacheEntryBase[0]; - static Object NO_RESULT = new Object(); - static protected Object INVALID_RESULT = new Object(); + static Object NO_RESULT = new Object() { public String toString() { return "NO_RESULT"; }}; + static protected Object INVALID_RESULT = new Object() { public String toString() { return "INVALID_RESULT"; }}; // // Just created // static protected Object FRESH = new Object() { public String toString() { return "CREATED"; }}; @@ -74,6 +74,7 @@ public abstract class CacheEntryBase extends CacheEntry { } public void setReady() { + assert(result != NO_RESULT); statusOrException = READY; } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java index 89892ed56..2f14b98a5 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java @@ -35,47 +35,32 @@ public class CodeGen { content.append("\n"); } - public void generateQuery(StringBuilder content, String clazz, String[] signature, boolean runnerShortcut, boolean genAsync) { - generateGetOrCreate(content, clazz, signature, genAsync); + public void generateQuery(StringBuilder content, String clazz, String[] signature, boolean runnerShortcut, boolean genReturn) { + generateGetOrCreate(content, clazz, signature); generateRemove(content, clazz, signature); - generateRunner(content, clazz, signature, runnerShortcut, genAsync); + generateRunner(content, clazz, signature, runnerShortcut, genReturn); } - public void generateRunner(StringBuilder content, String clazz, String[] signature, boolean shortcut, boolean genAsync) { + public void generateRunner(StringBuilder content, String clazz, String[] signature, boolean shortcut, boolean genReturn) { - line(content, "public static void runner" + clazz + "(ReadGraphImpl graph, " + signature[0] + ", CacheEntry parent, ListenerBase listener, final " + signature[4] + " procedure" + (genAsync ? ", boolean isSync" : "") + ") throws DatabaseException {"); + line(content, "public static " + (genReturn ? "Object" : "void") + " runner" + clazz + "(ReadGraphImpl graph, " + signature[0] + ", CacheEntry parent, ListenerBase listener, final " + signature[4] + " procedure) throws DatabaseException {"); line(content, " QueryCache cache = graph.processor.cache;"); if(shortcut) { line(content, " if(parent == null && listener == null && !cache.shouldCache(graph.processor, " + signature[1] + ")) {"); - line(content, " " + clazz + ".computeForEach(graph, " + signature[1] + ", null, procedure);"); - line(content, " return;"); - line(content, " }"); - } - line(content, " " + clazz + " entry = (" + clazz + ")cache.getOrCreate" + clazz + "(graph.processor, " + signature[1] + (genAsync ? ", isSync" : "") + ");"); - if(genAsync) { - line(content, " if(entry == null) {"); - line(content, " graph.processor.schedule(new SessionTask(false) {"); - line(content, " @Override"); - line(content, " public void run(int thread) {"); - line(content, " try {"); - line(content, " assert(!isSync);"); - line(content, " runner" + clazz + "(graph, r, parent, listener, procedure, isSync);"); - line(content, " } catch (DatabaseException e) {"); - line(content, " Logger.defaultLogError(e);"); - line(content, " }"); - line(content, " }"); - line(content, " });"); - line(content, " return;"); + line(content, " " + (genReturn ? "return " : "") + clazz + ".computeForEach(graph, " + signature[1] + ", null, procedure);"); + if(!genReturn) line(content, " return;"); line(content, " }"); } + line(content, " " + clazz + " entry = (" + clazz + ")cache.getOrCreate" + clazz + "(graph, " + signature[1] + ");"); line(content, " " + signature[4] + " procedure_ = procedure != null ? procedure : emptyProcedure" + clazz + ";"); line(content, " ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false);"); - line(content, " if(entry.isReady()) entry.performFromCache(graph, procedure_);"); + line(content, " if(entry.isReady()) " + (genReturn ? "return " : "") + "entry.performFromCache(graph, procedure_);"); line(content, " else {"); line(content, " assert(entry.isPending());"); - if(shortcut) line(content, " " + clazz + ".computeForEach(graph, " + signature[1] + ", entry, procedure_);"); - else line(content, " entry.compute(graph, procedure_);"); - line(content, " if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());"); + if(shortcut) line(content, " " + (genReturn ? "Object result = " : "") + clazz + ".computeForEach(graph, " + signature[1] + ", entry, procedure_);"); + else line(content, " entry.compute(graph, procedure_);"); + line(content, " if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult());"); + if(genReturn) line(content, " return result;"); line(content, " }"); line(content, "}"); line(content, ""); @@ -95,11 +80,11 @@ public class CodeGen { } - public void generateGetOrCreate(StringBuilder content, String clazz, String[] signature, boolean genAsync) { + public void generateGetOrCreate(StringBuilder content, String clazz, String[] signature) { String lower = Character.toLowerCase(clazz.charAt(0)) + clazz.substring(1); - line(content, "" + clazz + " getOrCreate" + clazz + "(QueryProcessor processor, " + signature[0] + (genAsync ? ", boolean isSync" : "") + ") throws DatabaseException {"); + line(content, "" + clazz + " getOrCreate" + clazz + "(ReadGraphImpl graph, " + signature[0] + ") throws DatabaseException {"); line(content, " " + clazz + " existing = null;"); line(content, " synchronized(" + lower + "Map) {"); line(content, " existing = (" + clazz + ")" + lower + "Map.get(" + signature[1] + ");"); @@ -116,14 +101,7 @@ public class CodeGen { line(content, " return existing;"); line(content, " }"); line(content, " }"); - if(genAsync) { - line(content, " if(existing.isPending()) {"); - line(content, " if(isSync) waitPending(processor, existing);"); - line(content, " else return null;"); - line(content, " }"); - } else { - line(content, " if(existing.isPending()) waitPending(processor, existing);"); - } + line(content, " if(existing.isPending()) waitPending(graph, existing);"); line(content, " return existing;"); line(content, "}"); line(content, ""); diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java index 7b591abb7..86f759362 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java @@ -23,7 +23,7 @@ public class QueryCache extends QueryCacheBase { super(querySupport, threads); } - Objects getOrCreateObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException { + Objects getOrCreateObjects(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { Objects existing = null; synchronized(objectsMap) { existing = (Objects)objectsMap.get(r1,r2); @@ -40,7 +40,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -56,18 +56,18 @@ public class QueryCache extends QueryCacheBase { Objects.computeForEach(graph, r1,r2, null, procedure); return; } - Objects entry = (Objects)cache.getOrCreateObjects(graph.processor, r1,r2); + Objects entry = (Objects)cache.getOrCreateObjects(graph, r1,r2); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - Objects.computeForEach(graph, r1,r2, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + Objects.computeForEach(graph, r1,r2, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - Statements getOrCreateStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException { + Statements getOrCreateStatements(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { Statements existing = null; synchronized(statementsMap) { existing = (Statements)statementsMap.get(r1,r2); @@ -84,7 +84,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -100,18 +100,18 @@ public class QueryCache extends QueryCacheBase { Statements.computeForEach(graph, r1,r2, null, procedure); return; } - Statements entry = (Statements)cache.getOrCreateStatements(graph.processor, r1,r2); + Statements entry = (Statements)cache.getOrCreateStatements(graph, r1,r2); TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - Statements.computeForEach(graph, r1,r2, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + Statements.computeForEach(graph, r1,r2, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - DirectObjects getOrCreateDirectObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException { + DirectObjects getOrCreateDirectObjects(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { DirectObjects existing = null; synchronized(directObjectsMap) { existing = (DirectObjects)directObjectsMap.get(r1,r2); @@ -128,7 +128,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -144,18 +144,18 @@ public class QueryCache extends QueryCacheBase { DirectObjects.computeForEach(graph, r1,r2, null, procedure); return; } - DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph.processor, r1,r2); + DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph, r1,r2); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - DirectObjects.computeForEach(graph, r1,r2, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + DirectObjects.computeForEach(graph, r1,r2, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - RelationInfoQuery getOrCreateRelationInfoQuery(QueryProcessor processor, int r) throws DatabaseException { + RelationInfoQuery getOrCreateRelationInfoQuery(ReadGraphImpl graph, int r) throws DatabaseException { RelationInfoQuery existing = null; synchronized(relationInfoQueryMap) { existing = (RelationInfoQuery)relationInfoQueryMap.get(r); @@ -172,7 +172,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -188,18 +188,18 @@ public class QueryCache extends QueryCacheBase { RelationInfoQuery.computeForEach(graph, r, null, procedure); return; } - RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph.processor, r); + RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - RelationInfoQuery.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + RelationInfoQuery.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - URIToResource getOrCreateURIToResource(QueryProcessor processor, String id) throws DatabaseException { + URIToResource getOrCreateURIToResource(ReadGraphImpl graph, String id) throws DatabaseException { URIToResource existing = null; synchronized(uRIToResourceMap) { existing = (URIToResource)uRIToResourceMap.get(id); @@ -216,7 +216,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -232,18 +232,18 @@ public class QueryCache extends QueryCacheBase { URIToResource.computeForEach(graph, id, null, procedure); return; } - URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph.processor, id); + URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph, id); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureURIToResource; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - URIToResource.computeForEach(graph, id, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + URIToResource.computeForEach(graph, id, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - ValueQuery getOrCreateValueQuery(QueryProcessor processor, int r) throws DatabaseException { + ValueQuery getOrCreateValueQuery(ReadGraphImpl graph, int r) throws DatabaseException { ValueQuery existing = null; synchronized(valueQueryMap) { existing = (ValueQuery)valueQueryMap.get(r); @@ -260,7 +260,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -276,18 +276,18 @@ public class QueryCache extends QueryCacheBase { ValueQuery.computeForEach(graph, r, null, procedure); return; } - ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph.processor, r); + ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureValueQuery; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - ValueQuery.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + ValueQuery.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - OrderedSet getOrCreateOrderedSet(QueryProcessor processor, int r) throws DatabaseException { + OrderedSet getOrCreateOrderedSet(ReadGraphImpl graph, int r) throws DatabaseException { OrderedSet existing = null; synchronized(orderedSetMap) { existing = (OrderedSet)orderedSetMap.get(r); @@ -304,7 +304,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -320,18 +320,18 @@ public class QueryCache extends QueryCacheBase { OrderedSet.computeForEach(graph, r, null, procedure); return; } - OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph.processor, r); + OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - OrderedSet.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + OrderedSet.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - PrincipalTypes getOrCreatePrincipalTypes(QueryProcessor processor, int r) throws DatabaseException { + PrincipalTypes getOrCreatePrincipalTypes(ReadGraphImpl graph, int r) throws DatabaseException { PrincipalTypes existing = null; synchronized(principalTypesMap) { existing = (PrincipalTypes)principalTypesMap.get(r); @@ -348,7 +348,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -364,18 +364,18 @@ public class QueryCache extends QueryCacheBase { PrincipalTypes.computeForEach(graph, r, null, procedure); return; } - PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph.processor, r); + PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - PrincipalTypes.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + PrincipalTypes.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - DirectPredicates getOrCreateDirectPredicates(QueryProcessor processor, int r) throws DatabaseException { + DirectPredicates getOrCreateDirectPredicates(ReadGraphImpl graph, int r) throws DatabaseException { DirectPredicates existing = null; synchronized(directPredicatesMap) { existing = (DirectPredicates)directPredicatesMap.get(r); @@ -392,7 +392,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -408,18 +408,18 @@ public class QueryCache extends QueryCacheBase { DirectPredicates.computeForEach(graph, r, null, procedure); return; } - DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph.processor, r); + DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - DirectPredicates.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + DirectPredicates.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - Predicates getOrCreatePredicates(QueryProcessor processor, int r) throws DatabaseException { + Predicates getOrCreatePredicates(ReadGraphImpl graph, int r) throws DatabaseException { Predicates existing = null; synchronized(predicatesMap) { existing = (Predicates)predicatesMap.get(r); @@ -436,7 +436,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -452,18 +452,18 @@ public class QueryCache extends QueryCacheBase { Predicates.computeForEach(graph, r, null, procedure); return; } - Predicates entry = (Predicates)cache.getOrCreatePredicates(graph.processor, r); + Predicates entry = (Predicates)cache.getOrCreatePredicates(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedurePredicates; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - Predicates.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + Predicates.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - ReadEntry getOrCreateReadEntry(QueryProcessor processor, Read r, boolean isSync) throws DatabaseException { + ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read r, boolean needsToBlock) throws DatabaseException { ReadEntry existing = null; synchronized(readEntryMap) { existing = (ReadEntry)readEntryMap.get(r); @@ -481,8 +481,8 @@ public class QueryCache extends QueryCacheBase { } } if(existing.isPending()) { - if(isSync) waitPending(processor, existing); - else return null; + if(needsToBlock) waitPending(graph, existing); + else return null; } return existing; } @@ -493,38 +493,37 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean isSync) throws DatabaseException { + public static Object runnerReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, final boolean needsToBlock) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { - ReadEntry.computeForEach(graph, r, null, procedure); - return; + return ReadEntry.computeForEach(graph, r, null, procedure); } - ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph.processor, r, isSync); + ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock); if(entry == null) { - graph.processor.schedule(new SessionTask(false) { + graph.processor.schedule(new SessionTask(graph) { @Override public void run(int thread) { try { - assert(!isSync); - runnerReadEntry(graph, r, parent, listener, procedure, isSync); + runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock); } catch (DatabaseException e) { Logger.defaultLogError(e); } } }); - return; + return null; } AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureReadEntry; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) return entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - ReadEntry.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + Object result = ReadEntry.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + return result; } } - AsyncReadEntry getOrCreateAsyncReadEntry(QueryProcessor processor, AsyncRead r, boolean isSync) throws DatabaseException { + AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, boolean needsToBlock) throws DatabaseException { AsyncReadEntry existing = null; synchronized(asyncReadEntryMap) { existing = (AsyncReadEntry)asyncReadEntryMap.get(r); @@ -542,8 +541,8 @@ public class QueryCache extends QueryCacheBase { } } if(existing.isPending()) { - if(isSync) waitPending(processor, existing); - else return null; + if(needsToBlock) waitPending(graph, existing); + else return null; } return existing; } @@ -554,38 +553,37 @@ public class QueryCache extends QueryCacheBase { } } - public static void runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean isSync) throws DatabaseException { + public static Object runnerAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, final boolean needsToBlock) throws DatabaseException { QueryCache cache = graph.processor.cache; if(parent == null && listener == null && !cache.shouldCache(graph.processor, r)) { - AsyncReadEntry.computeForEach(graph, r, null, procedure); - return; + return AsyncReadEntry.computeForEach(graph, r, null, procedure, needsToBlock); } - AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph.processor, r, isSync); + AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock); if(entry == null) { - graph.processor.schedule(new SessionTask(false) { + graph.processor.schedule(new SessionTask(graph) { @Override public void run(int thread) { try { - assert(!isSync); - runnerAsyncReadEntry(graph, r, parent, listener, procedure, isSync); + runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock); } catch (DatabaseException e) { Logger.defaultLogError(e); } } }); - return; + return null; } AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); - if(entry.isReady()) entry.performFromCache(graph, procedure_); + if(entry.isReady()) return entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - AsyncReadEntry.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + Object result = AsyncReadEntry.computeForEach(graph, r, entry, procedure_, needsToBlock); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + return result; } } - Types getOrCreateTypes(QueryProcessor processor, int r) throws DatabaseException { + Types getOrCreateTypes(ReadGraphImpl graph, int r) throws DatabaseException { Types existing = null; synchronized(typesMap) { existing = (Types)typesMap.get(r); @@ -602,7 +600,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -618,18 +616,18 @@ public class QueryCache extends QueryCacheBase { Types.computeForEach(graph, r, null, procedure); return; } - Types entry = (Types)cache.getOrCreateTypes(graph.processor, r); + Types entry = (Types)cache.getOrCreateTypes(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureTypes; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - Types.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + Types.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - ChildMap getOrCreateChildMap(QueryProcessor processor, int r) throws DatabaseException { + ChildMap getOrCreateChildMap(ReadGraphImpl graph, int r) throws DatabaseException { ChildMap existing = null; synchronized(childMapMap) { existing = (ChildMap)childMapMap.get(r); @@ -646,7 +644,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -662,18 +660,18 @@ public class QueryCache extends QueryCacheBase { ChildMap.computeForEach(graph, r, null, procedure); return; } - ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph.processor, r); + ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph, r); InternalProcedure> procedure_ = procedure != null ? procedure : emptyProcedureChildMap; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - ChildMap.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + ChildMap.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - TypeHierarchy getOrCreateTypeHierarchy(QueryProcessor processor, int r) throws DatabaseException { + TypeHierarchy getOrCreateTypeHierarchy(ReadGraphImpl graph, int r) throws DatabaseException { TypeHierarchy existing = null; synchronized(typeHierarchyMap) { existing = (TypeHierarchy)typeHierarchyMap.get(r); @@ -690,7 +688,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -706,18 +704,18 @@ public class QueryCache extends QueryCacheBase { TypeHierarchy.computeForEach(graph, r, null, procedure); return; } - TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph.processor, r); + TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - TypeHierarchy.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + TypeHierarchy.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - SuperTypes getOrCreateSuperTypes(QueryProcessor processor, int r) throws DatabaseException { + SuperTypes getOrCreateSuperTypes(ReadGraphImpl graph, int r) throws DatabaseException { SuperTypes existing = null; synchronized(superTypesMap) { existing = (SuperTypes)superTypesMap.get(r); @@ -734,7 +732,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -750,18 +748,18 @@ public class QueryCache extends QueryCacheBase { SuperTypes.computeForEach(graph, r, null, procedure); return; } - SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph.processor, r); + SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - SuperTypes.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + SuperTypes.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - SuperRelations getOrCreateSuperRelations(QueryProcessor processor, int r) throws DatabaseException { + SuperRelations getOrCreateSuperRelations(ReadGraphImpl graph, int r) throws DatabaseException { SuperRelations existing = null; synchronized(superRelationsMap) { existing = (SuperRelations)superRelationsMap.get(r); @@ -778,7 +776,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -794,18 +792,18 @@ public class QueryCache extends QueryCacheBase { SuperRelations.computeForEach(graph, r, null, procedure); return; } - SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph.processor, r); + SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - SuperRelations.computeForEach(graph, r, entry, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + SuperRelations.computeForEach(graph, r, entry, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - AssertedPredicates getOrCreateAssertedPredicates(QueryProcessor processor, int r) throws DatabaseException { + AssertedPredicates getOrCreateAssertedPredicates(ReadGraphImpl graph, int r) throws DatabaseException { AssertedPredicates existing = null; synchronized(assertedPredicatesMap) { existing = (AssertedPredicates)assertedPredicatesMap.get(r); @@ -822,7 +820,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -834,18 +832,18 @@ public class QueryCache extends QueryCacheBase { public static void runnerAssertedPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph.processor, r); + AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + entry.compute(graph, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - AssertedStatements getOrCreateAssertedStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException { + AssertedStatements getOrCreateAssertedStatements(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { AssertedStatements existing = null; synchronized(assertedStatementsMap) { existing = (AssertedStatements)assertedStatementsMap.get(r1,r2); @@ -862,7 +860,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -874,18 +872,18 @@ public class QueryCache extends QueryCacheBase { public static void runnerAssertedStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final TripleIntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph.processor, r1,r2); + AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph, r1,r2); TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + entry.compute(graph, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - DirectSuperRelations getOrCreateDirectSuperRelations(QueryProcessor processor, int r) throws DatabaseException { + DirectSuperRelations getOrCreateDirectSuperRelations(ReadGraphImpl graph, int r) throws DatabaseException { DirectSuperRelations existing = null; synchronized(directSuperRelationsMap) { existing = (DirectSuperRelations)directSuperRelationsMap.get(r); @@ -902,7 +900,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -914,18 +912,18 @@ public class QueryCache extends QueryCacheBase { public static void runnerDirectSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph.processor, r); + DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + entry.compute(graph, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - MultiReadEntry getOrCreateMultiReadEntry(QueryProcessor processor, MultiRead r) throws DatabaseException { + MultiReadEntry getOrCreateMultiReadEntry(ReadGraphImpl graph, MultiRead r) throws DatabaseException { MultiReadEntry existing = null; synchronized(multiReadEntryMap) { existing = (MultiReadEntry)multiReadEntryMap.get(r); @@ -942,7 +940,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -954,18 +952,18 @@ public class QueryCache extends QueryCacheBase { public static void runnerMultiReadEntry(ReadGraphImpl graph, MultiRead r, CacheEntry parent, ListenerBase listener, final SyncMultiProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph.processor, r); + MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph, r); SyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + entry.compute(graph, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(QueryProcessor processor, AsyncMultiRead r) throws DatabaseException { + AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead r) throws DatabaseException { AsyncMultiReadEntry existing = null; synchronized(asyncMultiReadEntryMap) { existing = (AsyncMultiReadEntry)asyncMultiReadEntryMap.get(r); @@ -982,7 +980,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -994,18 +992,18 @@ public class QueryCache extends QueryCacheBase { public static void runnerAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead r, CacheEntry parent, ListenerBase listener, final AsyncMultiProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph.processor, r); + AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph, r); AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + entry.compute(graph, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } - ExternalReadEntry getOrCreateExternalReadEntry(QueryProcessor processor, ExternalRead r) throws DatabaseException { + ExternalReadEntry getOrCreateExternalReadEntry(ReadGraphImpl graph, ExternalRead r) throws DatabaseException { ExternalReadEntry existing = null; synchronized(externalReadEntryMap) { existing = (ExternalReadEntry)externalReadEntryMap.get(r); @@ -1022,7 +1020,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(processor, existing); + if(existing.isPending()) waitPending(graph, existing); return existing; } @@ -1034,14 +1032,14 @@ public class QueryCache extends QueryCacheBase { public static void runnerExternalReadEntry(ReadGraphImpl graph, ExternalRead r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph.processor, r); + ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph, r); AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); else { assert(entry.isPending()); - entry.compute(graph, procedure_); - if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); + entry.compute(graph, procedure_); + if(listenerEntry != null) cache.primeListenerEntry(listenerEntry, entry.getResult()); } } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java index b3d1c5f88..d7a235fc8 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java @@ -637,18 +637,16 @@ public class QueryCacheBase { } } - public static void waitPending(QueryProcessor processor, CacheEntry entry) throws DatabaseException { + public static void waitPending(ReadGraphImpl graph, CacheEntry entry) throws DatabaseException { int counter = 0; while(entry.isPending()) { try { - SessionTask task = null;//processor.getOwnTask(processor.thread.get()); - if(task != null) { - task.run(processor.thread.get()); - } else { + boolean performed = graph.performPending(); + if(!performed) { Thread.sleep(1); counter++; - if(counter > 5000) { + if(counter > 30000) { CacheEntryBase base = ((CacheEntryBase)entry); // if(base.created != null) { // System.err.println("created:"); @@ -662,8 +660,8 @@ public class QueryCacheBase { // System.err.println("ready:"); // base.ready.printStackTrace(); // } - new Exception("Timeout waiting for request to complete: " + entry.getOriginalRequest().toString()).printStackTrace(); - throw new DatabaseException("Timeout waiting for request to complete."); + new Exception("Timeout waiting for request to complete: " + entry.getOriginalRequest()).printStackTrace(); + throw new DatabaseException("Timeout waiting for request to complete." + entry.getOriginalRequest()); //System.err.println("asd"); //base.getQuery().recompute(null, null, entry); } @@ -1129,15 +1127,11 @@ public class QueryCacheBase { } public static T resultReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, AsyncProcedure procedure) throws DatabaseException { - AsyncProcedureWrapper wrap = new AsyncProcedureWrapper<>(procedure); - QueryCache.runnerReadEntry(graph, r, parent, listener, wrap, true); - return wrap.get(); + return (T)QueryCache.runnerReadEntry(graph, r, parent, listener, procedure, true); } public static T resultAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, AsyncProcedure procedure) throws DatabaseException { - AsyncProcedureWrapper wrap = new AsyncProcedureWrapper<>(procedure); - QueryCache.runnerAsyncReadEntry(graph, r, parent, listener, wrap, true); - return wrap.get(); + return (T)QueryCache.runnerAsyncReadEntry(graph, r, parent, listener, procedure, true); } public static byte[] resultValueQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener) throws DatabaseException { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 9c60691fa..65b277e6c 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -171,23 +171,25 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - SessionTask getOwnTask(int thread) { + SessionTask getOwnTask(ReadGraphImpl impl) { + Set ancestors = impl.ancestorSet(); synchronized(querySupportLock) { int index = 0; while(index < freeScheduling.size()) { SessionTask task = freeScheduling.get(index); - if(task.thread == thread && !task.systemCall) + if(task.hasCommonParent(ancestors)) { return freeScheduling.remove(index); + } index++; } } return null; } - - public boolean performPending(int thread) { - SessionTask task = getOwnTask(thread); + + public boolean performPending(ReadGraphImpl graph) { + SessionTask task = getOwnTask(graph); if(task != null) { - task.run(thread); + task.run(QueryProcessor.thread.get()); return true; } else { return false; @@ -200,10 +202,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final public void schedule(SessionTask request) { - int performer = request.thread; + //int performer = request.thread; - if(DebugPolicy.SCHEDULE) - System.out.println("schedule " + request + " " + " -> " + performer); +// if(DebugPolicy.SCHEDULE) +// System.out.println("schedule " + request + " " + " -> " + performer); //assert(performer >= 0); @@ -256,34 +258,29 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final int THREADS; final public int THREAD_MASK; - - final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group"); + + final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group"); public static abstract class SessionTask { - final public int thread; - final public boolean systemCall; -// final public int syncCaller; - //final public Object object; + public final ReadGraphImpl graph; + private Set ancestors; - public SessionTask(boolean systemCall) { - this.thread = QueryProcessor.thread.get(); - this.systemCall = systemCall; -// this.syncCaller = -1; - //this.object = object; + public SessionTask(ReadGraphImpl graph) { + this.graph = graph; } -// public SessionTask(Object object, int syncCaller) { -// this.thread = QueryProcessor.thread.get(); -// this.syncCaller = syncCaller; -// this.object = object; -// } + public boolean hasCommonParent(Set otherAncestors) { + if(graph == null) return false; + if(ancestors == null) ancestors = graph.ancestorSet(); + return !Collections.disjoint(ancestors, otherAncestors); + } public abstract void run(int thread); @Override public String toString() { - return "SessionTask[" + super.toString() + "]"; + return "SessionTask[" + graph.parent + "]"; } } @@ -294,7 +291,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final public DataContainer throwable; public SessionRead(DataContainer throwable, Semaphore notify) { - super(true); + super(null); this.throwable = throwable; this.notify = notify; } @@ -1593,7 +1590,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap entry.prepareRecompute(querySupport); - ReadGraphImpl parentGraph = graph.withParent(entry); + ReadGraphImpl parentGraph = graph.forRecompute(entry); query.recompute(parentGraph); @@ -1765,6 +1762,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } else { // If not changed, keep the old value immediate.setResult(oldValue); + immediate.setReady(); listenersUnknown = true; } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java index c95242def..61e934fe9 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ReadEntry.java @@ -95,56 +95,68 @@ public final class ReadEntry extends CacheEntryBase> implem } - public static void computeForEach(ReadGraphImpl graph, Read request, ReadEntry entry, + public static T computeForEach(ReadGraphImpl graph, Read request, ReadEntry entry, AsyncProcedure procedure_) throws DatabaseException { AsyncProcedure procedure = entry != null ? entry : procedure_; - ReadGraphImpl queryGraph = entry != null ? graph.withParent(entry) : graph; + ReadGraphImpl queryGraph = graph.withParent(entry); try { T result = request.perform(queryGraph); - procedure.execute(graph, result); + if(procedure != null) procedure.execute(graph, result); + return (T)result; } catch (DatabaseException e) { - procedure.exception(graph, e); + if(procedure != null) procedure.exception(graph, e); + throw e; } catch (Throwable t) { DatabaseException dbe = new DatabaseException(t); - procedure.exception(graph, dbe); + if(procedure != null) procedure.exception(graph, dbe); + throw dbe; - } + } finally { + + if (entry != null) + entry.performFromCache(queryGraph, procedure_); - if (entry != null) - entry.performFromCache(queryGraph, procedure_); + } } - public Object performFromCache(ReadGraphImpl graph, AsyncProcedure procedure) { + public Object performFromCache(ReadGraphImpl graph, AsyncProcedure procedure) throws DatabaseException { AsyncProcedure proc = (AsyncProcedure) procedure; - if (proc != null) { - if (isExcepted()) { - try { - proc.exception(graph, (Throwable) getResult()); - } catch (Throwable t) { - LOGGER.error("performFromCache proc.exception failed", t); + if (isExcepted()) { + if(proc != null) { + try { + proc.exception(graph, (Throwable) getResult()); + } catch (Throwable t) { + LOGGER.error("performFromCache proc.exception failed", t); + } } - } else { - try { - proc.execute(graph, (T) getResult()); - } catch (Throwable t) { - LOGGER.error("performFromCache proc.execute failed", t); + Throwable t = (Throwable) getResult(); + if(t instanceof DatabaseException) { + throw (DatabaseException)t; + } else { + throw new DatabaseException(t); } - } + } else { + if(proc != null) { + try { + proc.execute(graph, (T) getResult()); + } catch (Throwable t) { + LOGGER.error("performFromCache proc.execute failed", t); + } + } + return (T)getResult(); } - return (T) getResult(); - } @Override diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/RelationInfoQuery.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/RelationInfoQuery.java index 0b60fdd0c..cffd2984a 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/RelationInfoQuery.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/RelationInfoQuery.java @@ -95,12 +95,6 @@ public final class RelationInfoQuery extends UnaryQueryP { return "RelationInfoQuery[" + id + "]"; } - @Override - public void setResult(Object result) { - super.setResult(result); - setReady(); - } - @Override public int type() { return RequestFlags.IMMEDIATE_UPDATE; diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 30e0dde52..8a634d1cf 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -414,9 +414,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } } - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(true) { + requestManager.scheduleWrite(new SessionTask(null) { @Override public void run(int thread) { @@ -552,9 +550,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(true) { + requestManager.scheduleWrite(new SessionTask(null) { @Override public void run(int thread) { @@ -638,9 +634,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(true) { + requestManager.scheduleWrite(new SessionTask(null) { @Override public void run(int thread) { @@ -1377,9 +1371,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(true) { + requestManager.scheduleWrite(new SessionTask(null) { @Override public void run(int thread) { @@ -1477,9 +1469,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(true) { + requestManager.scheduleWrite(new SessionTask(null) { @Override public void run(int thread) { diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java index 2902274b9..c11242163 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionRequestManager.java @@ -118,7 +118,7 @@ public class SessionRequestManager { public synchronized void startRead(int thread, final SessionRead task) { - session.queryProvider2.schedule(new SessionTask(true) { + session.queryProvider2.schedule(new SessionTask(null) { @Override public void run(int thread) { @@ -142,7 +142,7 @@ public class SessionRequestManager { public synchronized void startReadUpdate(int thread) { - session.queryProvider2.schedule(new SessionTask(true) { + session.queryProvider2.schedule(new SessionTask(null) { @Override public void run(int thread) { @@ -163,7 +163,7 @@ public class SessionRequestManager { public synchronized void startWrite(int thread, final SessionTask task) { - session.queryProvider2.schedule(new SessionTask(true) { + session.queryProvider2.schedule(new SessionTask(null) { @Override public void run(int thread) { @@ -184,7 +184,7 @@ public class SessionRequestManager { public synchronized void startWriteUpdate(int thread) { - session.queryProvider2.schedule(new SessionTask(true) { + session.queryProvider2.schedule(new SessionTask(null) { @Override public void run(int thread) { @@ -271,7 +271,7 @@ public class SessionRequestManager { if (!reads.isEmpty()) { final SessionRead read = reads.poll(); - session.queryProvider2.schedule(new SessionTask(true) { + session.queryProvider2.schedule(new SessionTask(null) { @Override public void run(int thread) { @@ -312,7 +312,7 @@ public class SessionRequestManager { assert(State.INIT != state); if(State.READ == state) { - session.queryProvider2.schedule(new SessionTask(true) { + session.queryProvider2.schedule(new SessionTask(null) { @Override public void run(int thread) { diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java index 9a0097b11..ba0ce846b 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/UndoRedoSupportImpl.java @@ -44,7 +44,7 @@ public class UndoRedoSupportImpl implements UndoRedoSupport { final Operation fop = (Operation)ops.toArray()[0]; final DataContainer id = new DataContainer(0L); final TaskHelper th = new TaskHelper("Undo"); - session.requestManager.scheduleWrite(new SessionTask(true) { + session.requestManager.scheduleWrite(new SessionTask(null) { @Override public void run(int thread) { session.flushCounter = 0; diff --git a/bundles/org.simantics.db.services/src/org/simantics/db/services/adaption/reflection/ReflectionAdapter2.java b/bundles/org.simantics.db.services/src/org/simantics/db/services/adaption/reflection/ReflectionAdapter2.java index ed750f538..187fc5219 100644 --- a/bundles/org.simantics.db.services/src/org/simantics/db/services/adaption/reflection/ReflectionAdapter2.java +++ b/bundles/org.simantics.db.services/src/org/simantics/db/services/adaption/reflection/ReflectionAdapter2.java @@ -17,9 +17,8 @@ import java.util.Arrays; import org.simantics.db.AsyncReadGraph; import org.simantics.db.Resource; -import org.simantics.db.ReadGraph; import org.simantics.db.adaption.Adapter; -import org.simantics.db.common.request.ReadRequest; +import org.simantics.db.common.request.AsyncReadRequest; import org.simantics.db.exception.DatabaseException; import org.simantics.db.procedure.AsyncProcedure; @@ -79,16 +78,16 @@ public class ReflectionAdapter2 implements Adapter { } else { - g.asyncRequest(new ReadRequest() { + g.asyncRequest(new AsyncReadRequest() { @Override - public void run(ReadGraph graph) throws DatabaseException { + public void run(AsyncReadGraph graph) { Object[] args = new Object[parameters.length]; try { for(int i=0;i