From: Antti Villberg Date: Wed, 11 Jul 2018 05:57:39 +0000 (+0300) Subject: Fixed asynchronous recompute problems X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=commitdiff_plain;h=5d26dc0201bd7fafbaafa4f74ff1a10929a14e5b;p=simantics%2Fplatform.git Fixed asynchronous recompute problems gitlab #5 Change-Id: Ia80fec89736dbef1f6bd44e730d6c4186921836d --- diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java index 6998ee55e..3a735e6c2 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java @@ -5401,11 +5401,6 @@ public class ReadGraphImpl implements ReadGraph { return processor.getSession(); } -// @Override -// final public Builtins getBuiltins() { -// return processor.getSession().getBuiltins(); -// } - @Override public void asyncRequest(final Read request) { @@ -5471,10 +5466,6 @@ public class ReadGraphImpl implements ReadGraph { return new ReadGraphImpl(null, support); } - public static ReadGraphImpl forRecompute(CacheEntry entry, QueryProcessor support) { - return new ReadGraphImpl(entry, support); - } - @Override public void asyncRequest(Read request, SyncProcedure procedure) { asyncRequest(request, new SyncToAsyncProcedure(procedure)); @@ -6015,53 +6006,17 @@ public class ReadGraphImpl implements ReadGraph { return thread == Integer.MIN_VALUE; } -// final private boolean isSync(int thread) { -// return thread < -1 && thread > Integer.MIN_VALUE; -// } - ReadGraphImpl(ReadGraphImpl graph) { this(graph.parent, graph.processor); } ReadGraphImpl(CacheEntry parent, QueryProcessor support) { -// this.state = new ReadGraphState(barrier, support); this.parent = parent; this.processor = support; } - ReadGraphImpl(final QueryProcessor support) { - -// this.state = state; - this.processor = support; - this.parent = null; - - } - -// public static ReadGraphImpl createSync(int syncThread, Object syncParent, -// ReadGraphSupportImpl support) { -// return new ReadGraphImpl(syncThread, syncThread, syncParent, null, -// support, new AsyncBarrierImpl(null)); -// } - public static ReadGraphImpl create(QueryProcessor support) { - return new ReadGraphImpl(support); - } - -// public ReadGraphImpl newAsync() { -// return this; -//// if(!state.synchronizedExecution) { -//// return this; -//// } else { -//// return new ReadGraphImpl(false, parent, state.support, state.barrier); -//// } -// } - -// public ReadGraphImpl newSync() { -// return new ReadGraphImpl(parent, processor); -// } - - public ReadGraphImpl newSync(CacheEntry parentEntry) { - return new ReadGraphImpl(parentEntry, processor); + return new ReadGraphImpl(null, support); } public ReadGraphImpl newRestart(ReadGraphImpl impl) { @@ -6069,41 +6024,10 @@ public class ReadGraphImpl implements ReadGraph { WriteGraphImpl write = processor.getSession().getService( WriteGraphImpl.class); -// if (write.callerThread != impl.callerThread) -// return new WriteGraphImpl(impl.callerThread, parent, state.support, write.writeSupport, write.provider, write.state.barrier); return write; } -// public ReadGraphImpl newSync(Object parentRequest) { -// return new ReadGraphImpl(callerThread, state.parent, state.support, new AsyncBarrierImpl(state.barrier)); -// } - -// public ReadGraphImpl newSync(final int callerThread, Object parentRequest) { -// assert (state.syncThread == callerThread || (state.syncThread == Integer.MIN_VALUE && callerThread != Integer.MIN_VALUE)); -// return new ReadGraphImpl(callerThread, callerThread, parentRequest, -// state.parent, state.support, new AsyncBarrierImpl(state.barrier)); -// } -// -// public ReadGraphImpl newSyncAsync(Object parentRequest) { -//// assert (callerThread < 0); -// return new ReadGraphImpl(callerThread, state.syncThread, parentRequest, -// state.parent, state.support, new AsyncBarrierImpl(state.barrier)); -// } -// -// public ReadGraphImpl newSyncAsync(final int callerThread, -// Object parentRequest) { -//// assert (callerThread < 0); -// // assert(state.syncThread == callerThread || (state.syncThread == Integer.MIN_VALUE -// // && callerThread != Integer.MIN_VALUE) ); -// return new ReadGraphImpl(callerThread, callerThread, parentRequest, -// state.parent, state.support, new AsyncBarrierImpl(state.barrier)); -// } - - public ReadGraphImpl withAsyncParent(CacheEntry parent) { - return new ReadGraphImpl(parent, processor); - } - public ReadGraphImpl withParent(CacheEntry parent) { if(parent == this.parent) return this; else return new ReadGraphImpl(parent, processor); @@ -6120,56 +6044,13 @@ public class ReadGraphImpl implements ReadGraph { assert(procedure.done()); -// while (!procedure.done()) { -// -// boolean executed = processor.resumeTasks(callerThread, null, null); -// if (!executed) { -// try { -// Thread.sleep(1); -// // sema.tryAcquire(1, TimeUnit.MILLISECONDS); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// } -// -// } - } public void waitAsyncProcedure(AsyncReadProcedure procedure) { assert(procedure.done()); -// while (!procedure.done()) { -// -// boolean executed = processor.processor.resume(this); -// if (!executed) { -// try { -// Thread.sleep(1); -// // sema.tryAcquire(1, TimeUnit.MILLISECONDS); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// } -// -// } - } - -// public void waitAsync(Object request) { -// try { -// state.barrier.waitBarrier(request, this); -// } catch (Throwable t) { -// t.printStackTrace(); -// processor.scanPending(); -// processor.querySupport.checkTasks(); -// throw new RuntimeDatabaseException(t); -// } -// } - -// public void restart() { -// state.barrier.restart(); -// } public boolean resumeTasks() { return processor.resumeTasks(this); diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/WriteGraphImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/WriteGraphImpl.java index 8462433f1..410a8fe69 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/WriteGraphImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/WriteGraphImpl.java @@ -128,11 +128,6 @@ final public class WriteGraphImpl extends ReadGraphImpl implements WriteGraph { return new WriteGraphImpl(parent, processor, writeSupport, provider); } - @Override - final public ReadGraphImpl withAsyncParent(CacheEntry parent2) { - return new WriteGraphImpl(parent2, processor, writeSupport, provider); - } - @Override public ReadGraphImpl newRestart(ReadGraphImpl impl) { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncMultiReadEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncMultiReadEntry.java index 6f718f1fe..1791c50ac 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncMultiReadEntry.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncMultiReadEntry.java @@ -14,6 +14,7 @@ package org.simantics.db.impl.query; import java.util.ArrayList; import org.simantics.db.AsyncReadGraph; +import org.simantics.db.common.GraphSemaphore; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.procedure.AsyncMultiProcedure; @@ -88,23 +89,37 @@ final public class AsyncMultiReadEntry extends CacheEntryBase() { + try { - @Override - public void execute(AsyncReadGraph graph, T result) { - addOrSet(result); - } + GraphSemaphore s = new GraphSemaphore(graph, 0); - public void finished(AsyncReadGraph graph) { - finish(graph); - }; + request.perform(graph , new AsyncMultiProcedure() { - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - except(t); - } + @Override + public void execute(AsyncReadGraph graph, T result) { + addOrSet(result); + } - }); + public void finished(AsyncReadGraph graph) { + finish(graph); + s.release(); + }; + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + except(t); + s.release(); + } + + }); + + s.waitFor(1); + + } catch (Throwable t) { + + except(t); + + } } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java index 08f0ad8b5..75169d02e 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java @@ -12,6 +12,7 @@ package org.simantics.db.impl.query; import org.simantics.db.AsyncReadGraph; +import org.simantics.db.common.GraphSemaphore; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.DebugPolicy; import org.simantics.db.impl.graph.ReadGraphImpl; @@ -78,20 +79,26 @@ final public class AsyncReadEntry extends CacheEntryBase> { try { + GraphSemaphore s = new GraphSemaphore(graph, 0); + request.perform(graph , new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, T result) { addOrSet(graph, result); + s.release(); } @Override public void exception(AsyncReadGraph graph, Throwable t) { except(t); + s.release(); } }); + s.waitFor(1); + } catch (Throwable t) { except(t); } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java index cd540eaad..deecbabbc 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/CodeGen.java @@ -54,7 +54,7 @@ public class CodeGen { line(content, " return;"); line(content, " }"); } - line(content, " " + clazz + " entry = (" + clazz + ")cache.getOrCreate" + clazz + "(graph, " + signature[1] + (genAsync ? ", isSync" : "") + ");"); + line(content, " " + clazz + " entry = (" + clazz + ")cache.getOrCreate" + clazz + "(graph.processor, " + signature[1] + (genAsync ? ", isSync" : "") + ");"); if(genAsync) { line(content, " if(entry == null) {"); line(content, " graph.processor.schedule(new SessionTask(false) {"); @@ -102,7 +102,7 @@ public class CodeGen { String lower = Character.toLowerCase(clazz.charAt(0)) + clazz.substring(1); - line(content, "" + clazz + " getOrCreate" + clazz + "(ReadGraphImpl graph, " + signature[0] + (genAsync ? ", boolean isSync" : "") + ") throws DatabaseException {"); + line(content, "" + clazz + " getOrCreate" + clazz + "(QueryProcessor processor, " + signature[0] + (genAsync ? ", boolean isSync" : "") + ") throws DatabaseException {"); line(content, " " + clazz + " existing = null;"); line(content, " synchronized(" + lower + "Map) {"); line(content, " existing = (" + clazz + ")" + lower + "Map.get(" + signature[1] + ");"); @@ -121,11 +121,11 @@ public class CodeGen { line(content, " }"); if(genAsync) { line(content, " if(existing.isPending()) {"); - line(content, " if(isSync) waitPending(graph, existing);"); + line(content, " if(isSync) waitPending(processor, existing);"); line(content, " else return null;"); line(content, " }"); } else { - line(content, " if(existing.isPending()) waitPending(graph, existing);"); + line(content, " if(existing.isPending()) waitPending(processor, existing);"); } line(content, " return existing;"); line(content, "}"); diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java index 5a6f5d0ba..38ed21026 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java @@ -24,7 +24,7 @@ public class QueryCache extends QueryCacheBase { super(querySupport, threads); } - Objects getOrCreateObjects(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { + Objects getOrCreateObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException { Objects existing = null; synchronized(objectsMap) { existing = (Objects)objectsMap.get(r1,r2); @@ -41,7 +41,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -57,7 +57,7 @@ public class QueryCache extends QueryCacheBase { Objects.computeForEach(graph, r1,r2, null, procedure); return; } - Objects entry = (Objects)cache.getOrCreateObjects(graph, r1,r2); + Objects entry = (Objects)cache.getOrCreateObjects(graph.processor, r1,r2); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureObjects; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -68,7 +68,7 @@ public class QueryCache extends QueryCacheBase { } } - Statements getOrCreateStatements(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { + Statements getOrCreateStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException { Statements existing = null; synchronized(statementsMap) { existing = (Statements)statementsMap.get(r1,r2); @@ -85,7 +85,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -101,7 +101,7 @@ public class QueryCache extends QueryCacheBase { Statements.computeForEach(graph, r1,r2, null, procedure); return; } - Statements entry = (Statements)cache.getOrCreateStatements(graph, r1,r2); + Statements entry = (Statements)cache.getOrCreateStatements(graph.processor, r1,r2); TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureStatements; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -112,7 +112,7 @@ public class QueryCache extends QueryCacheBase { } } - DirectObjects getOrCreateDirectObjects(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { + DirectObjects getOrCreateDirectObjects(QueryProcessor processor, int r1, int r2) throws DatabaseException { DirectObjects existing = null; synchronized(directObjectsMap) { existing = (DirectObjects)directObjectsMap.get(r1,r2); @@ -129,7 +129,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -145,7 +145,7 @@ public class QueryCache extends QueryCacheBase { DirectObjects.computeForEach(graph, r1,r2, null, procedure); return; } - DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph, r1,r2); + DirectObjects entry = (DirectObjects)cache.getOrCreateDirectObjects(graph.processor, r1,r2); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectObjects; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -156,7 +156,7 @@ public class QueryCache extends QueryCacheBase { } } - RelationInfoQuery getOrCreateRelationInfoQuery(ReadGraphImpl graph, int r) throws DatabaseException { + RelationInfoQuery getOrCreateRelationInfoQuery(QueryProcessor processor, int r) throws DatabaseException { RelationInfoQuery existing = null; synchronized(relationInfoQueryMap) { existing = (RelationInfoQuery)relationInfoQueryMap.get(r); @@ -173,7 +173,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -189,7 +189,7 @@ public class QueryCache extends QueryCacheBase { RelationInfoQuery.computeForEach(graph, r, null, procedure); return; } - RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph, r); + RelationInfoQuery entry = (RelationInfoQuery)cache.getOrCreateRelationInfoQuery(graph.processor, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureRelationInfoQuery; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -200,7 +200,7 @@ public class QueryCache extends QueryCacheBase { } } - URIToResource getOrCreateURIToResource(ReadGraphImpl graph, String id) throws DatabaseException { + URIToResource getOrCreateURIToResource(QueryProcessor processor, String id) throws DatabaseException { URIToResource existing = null; synchronized(uRIToResourceMap) { existing = (URIToResource)uRIToResourceMap.get(id); @@ -217,7 +217,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -233,7 +233,7 @@ public class QueryCache extends QueryCacheBase { URIToResource.computeForEach(graph, id, null, procedure); return; } - URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph, id); + URIToResource entry = (URIToResource)cache.getOrCreateURIToResource(graph.processor, id); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureURIToResource; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -244,7 +244,7 @@ public class QueryCache extends QueryCacheBase { } } - ValueQuery getOrCreateValueQuery(ReadGraphImpl graph, int r) throws DatabaseException { + ValueQuery getOrCreateValueQuery(QueryProcessor processor, int r) throws DatabaseException { ValueQuery existing = null; synchronized(valueQueryMap) { existing = (ValueQuery)valueQueryMap.get(r); @@ -261,7 +261,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -277,7 +277,7 @@ public class QueryCache extends QueryCacheBase { ValueQuery.computeForEach(graph, r, null, procedure); return; } - ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph, r); + ValueQuery entry = (ValueQuery)cache.getOrCreateValueQuery(graph.processor, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureValueQuery; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -288,7 +288,7 @@ public class QueryCache extends QueryCacheBase { } } - OrderedSet getOrCreateOrderedSet(ReadGraphImpl graph, int r) throws DatabaseException { + OrderedSet getOrCreateOrderedSet(QueryProcessor processor, int r) throws DatabaseException { OrderedSet existing = null; synchronized(orderedSetMap) { existing = (OrderedSet)orderedSetMap.get(r); @@ -305,7 +305,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -321,7 +321,7 @@ public class QueryCache extends QueryCacheBase { OrderedSet.computeForEach(graph, r, null, procedure); return; } - OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph, r); + OrderedSet entry = (OrderedSet)cache.getOrCreateOrderedSet(graph.processor, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureOrderedSet; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -332,7 +332,7 @@ public class QueryCache extends QueryCacheBase { } } - PrincipalTypes getOrCreatePrincipalTypes(ReadGraphImpl graph, int r) throws DatabaseException { + PrincipalTypes getOrCreatePrincipalTypes(QueryProcessor processor, int r) throws DatabaseException { PrincipalTypes existing = null; synchronized(principalTypesMap) { existing = (PrincipalTypes)principalTypesMap.get(r); @@ -349,7 +349,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -365,7 +365,7 @@ public class QueryCache extends QueryCacheBase { PrincipalTypes.computeForEach(graph, r, null, procedure); return; } - PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph, r); + PrincipalTypes entry = (PrincipalTypes)cache.getOrCreatePrincipalTypes(graph.processor, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedurePrincipalTypes; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -376,7 +376,7 @@ public class QueryCache extends QueryCacheBase { } } - DirectPredicates getOrCreateDirectPredicates(ReadGraphImpl graph, int r) throws DatabaseException { + DirectPredicates getOrCreateDirectPredicates(QueryProcessor processor, int r) throws DatabaseException { DirectPredicates existing = null; synchronized(directPredicatesMap) { existing = (DirectPredicates)directPredicatesMap.get(r); @@ -393,7 +393,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -409,7 +409,7 @@ public class QueryCache extends QueryCacheBase { DirectPredicates.computeForEach(graph, r, null, procedure); return; } - DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph, r); + DirectPredicates entry = (DirectPredicates)cache.getOrCreateDirectPredicates(graph.processor, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectPredicates; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -420,7 +420,7 @@ public class QueryCache extends QueryCacheBase { } } - Predicates getOrCreatePredicates(ReadGraphImpl graph, int r) throws DatabaseException { + Predicates getOrCreatePredicates(QueryProcessor processor, int r) throws DatabaseException { Predicates existing = null; synchronized(predicatesMap) { existing = (Predicates)predicatesMap.get(r); @@ -437,7 +437,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -453,7 +453,7 @@ public class QueryCache extends QueryCacheBase { Predicates.computeForEach(graph, r, null, procedure); return; } - Predicates entry = (Predicates)cache.getOrCreatePredicates(graph, r); + Predicates entry = (Predicates)cache.getOrCreatePredicates(graph.processor, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedurePredicates; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -464,7 +464,7 @@ public class QueryCache extends QueryCacheBase { } } - ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read r, boolean isSync) throws DatabaseException { + ReadEntry getOrCreateReadEntry(QueryProcessor processor, Read r, boolean isSync) throws DatabaseException { ReadEntry existing = null; synchronized(readEntryMap) { existing = (ReadEntry)readEntryMap.get(r); @@ -482,7 +482,7 @@ public class QueryCache extends QueryCacheBase { } } if(existing.isPending()) { - if(isSync) waitPending(graph, existing); + if(isSync) waitPending(processor, existing); else return null; } return existing; @@ -500,7 +500,7 @@ public class QueryCache extends QueryCacheBase { ReadEntry.computeForEach(graph, r, null, procedure); return; } - ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, isSync); + ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph.processor, r, isSync); if(entry == null) { graph.processor.schedule(new SessionTask(false) { @Override @@ -525,7 +525,7 @@ public class QueryCache extends QueryCacheBase { } } - AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, boolean isSync) throws DatabaseException { + AsyncReadEntry getOrCreateAsyncReadEntry(QueryProcessor processor, AsyncRead r, boolean isSync) throws DatabaseException { AsyncReadEntry existing = null; synchronized(asyncReadEntryMap) { existing = (AsyncReadEntry)asyncReadEntryMap.get(r); @@ -543,7 +543,7 @@ public class QueryCache extends QueryCacheBase { } } if(existing.isPending()) { - if(isSync) waitPending(graph, existing); + if(isSync) waitPending(processor, existing); else return null; } return existing; @@ -561,7 +561,7 @@ public class QueryCache extends QueryCacheBase { AsyncReadEntry.computeForEach(graph, r, null, procedure); return; } - AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, isSync); + AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph.processor, r, isSync); if(entry == null) { graph.processor.schedule(new SessionTask(false) { @Override @@ -586,7 +586,7 @@ public class QueryCache extends QueryCacheBase { } } - Types getOrCreateTypes(ReadGraphImpl graph, int r) throws DatabaseException { + Types getOrCreateTypes(QueryProcessor processor, int r) throws DatabaseException { Types existing = null; synchronized(typesMap) { existing = (Types)typesMap.get(r); @@ -603,7 +603,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -619,7 +619,7 @@ public class QueryCache extends QueryCacheBase { Types.computeForEach(graph, r, null, procedure); return; } - Types entry = (Types)cache.getOrCreateTypes(graph, r); + Types entry = (Types)cache.getOrCreateTypes(graph.processor, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureTypes; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -630,7 +630,7 @@ public class QueryCache extends QueryCacheBase { } } - ChildMap getOrCreateChildMap(ReadGraphImpl graph, int r) throws DatabaseException { + ChildMap getOrCreateChildMap(QueryProcessor processor, int r) throws DatabaseException { ChildMap existing = null; synchronized(childMapMap) { existing = (ChildMap)childMapMap.get(r); @@ -647,7 +647,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -663,7 +663,7 @@ public class QueryCache extends QueryCacheBase { ChildMap.computeForEach(graph, r, null, procedure); return; } - ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph, r); + ChildMap entry = (ChildMap)cache.getOrCreateChildMap(graph.processor, r); InternalProcedure> procedure_ = procedure != null ? procedure : emptyProcedureChildMap; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -674,7 +674,7 @@ public class QueryCache extends QueryCacheBase { } } - AssertedStatements getOrCreateAssertedStatements(ReadGraphImpl graph, int r1, int r2) throws DatabaseException { + AssertedStatements getOrCreateAssertedStatements(QueryProcessor processor, int r1, int r2) throws DatabaseException { AssertedStatements existing = null; synchronized(assertedStatementsMap) { existing = (AssertedStatements)assertedStatementsMap.get(r1,r2); @@ -691,7 +691,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -703,7 +703,7 @@ public class QueryCache extends QueryCacheBase { public static void runnerAssertedStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, final TripleIntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph, r1,r2); + AssertedStatements entry = (AssertedStatements)cache.getOrCreateAssertedStatements(graph.processor, r1,r2); TripleIntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedStatements; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -714,7 +714,7 @@ public class QueryCache extends QueryCacheBase { } } - AssertedPredicates getOrCreateAssertedPredicates(ReadGraphImpl graph, int r) throws DatabaseException { + AssertedPredicates getOrCreateAssertedPredicates(QueryProcessor processor, int r) throws DatabaseException { AssertedPredicates existing = null; synchronized(assertedPredicatesMap) { existing = (AssertedPredicates)assertedPredicatesMap.get(r); @@ -731,7 +731,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -743,7 +743,7 @@ public class QueryCache extends QueryCacheBase { public static void runnerAssertedPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph, r); + AssertedPredicates entry = (AssertedPredicates)cache.getOrCreateAssertedPredicates(graph.processor, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureAssertedPredicates; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -754,7 +754,7 @@ public class QueryCache extends QueryCacheBase { } } - DirectSuperRelations getOrCreateDirectSuperRelations(ReadGraphImpl graph, int r) throws DatabaseException { + DirectSuperRelations getOrCreateDirectSuperRelations(QueryProcessor processor, int r) throws DatabaseException { DirectSuperRelations existing = null; synchronized(directSuperRelationsMap) { existing = (DirectSuperRelations)directSuperRelationsMap.get(r); @@ -771,7 +771,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -783,7 +783,7 @@ public class QueryCache extends QueryCacheBase { public static void runnerDirectSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final IntProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph, r); + DirectSuperRelations entry = (DirectSuperRelations)cache.getOrCreateDirectSuperRelations(graph.processor, r); IntProcedure procedure_ = procedure != null ? procedure : emptyProcedureDirectSuperRelations; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -794,7 +794,7 @@ public class QueryCache extends QueryCacheBase { } } - SuperTypes getOrCreateSuperTypes(ReadGraphImpl graph, int r) throws DatabaseException { + SuperTypes getOrCreateSuperTypes(QueryProcessor processor, int r) throws DatabaseException { SuperTypes existing = null; synchronized(superTypesMap) { existing = (SuperTypes)superTypesMap.get(r); @@ -811,7 +811,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -823,7 +823,7 @@ public class QueryCache extends QueryCacheBase { public static void runnerSuperTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph, r); + SuperTypes entry = (SuperTypes)cache.getOrCreateSuperTypes(graph.processor, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureSuperTypes; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -834,7 +834,7 @@ public class QueryCache extends QueryCacheBase { } } - TypeHierarchy getOrCreateTypeHierarchy(ReadGraphImpl graph, int r) throws DatabaseException { + TypeHierarchy getOrCreateTypeHierarchy(QueryProcessor processor, int r) throws DatabaseException { TypeHierarchy existing = null; synchronized(typeHierarchyMap) { existing = (TypeHierarchy)typeHierarchyMap.get(r); @@ -851,7 +851,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -863,7 +863,7 @@ public class QueryCache extends QueryCacheBase { public static void runnerTypeHierarchy(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph, r); + TypeHierarchy entry = (TypeHierarchy)cache.getOrCreateTypeHierarchy(graph.processor, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureTypeHierarchy; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -874,7 +874,7 @@ public class QueryCache extends QueryCacheBase { } } - SuperRelations getOrCreateSuperRelations(ReadGraphImpl graph, int r) throws DatabaseException { + SuperRelations getOrCreateSuperRelations(QueryProcessor processor, int r) throws DatabaseException { SuperRelations existing = null; synchronized(superRelationsMap) { existing = (SuperRelations)superRelationsMap.get(r); @@ -891,7 +891,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -903,7 +903,7 @@ public class QueryCache extends QueryCacheBase { public static void runnerSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, final InternalProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph, r); + SuperRelations entry = (SuperRelations)cache.getOrCreateSuperRelations(graph.processor, r); InternalProcedure procedure_ = procedure != null ? procedure : emptyProcedureSuperRelations; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -914,7 +914,7 @@ public class QueryCache extends QueryCacheBase { } } - MultiReadEntry getOrCreateMultiReadEntry(ReadGraphImpl graph, MultiRead r) throws DatabaseException { + MultiReadEntry getOrCreateMultiReadEntry(QueryProcessor processor, MultiRead r) throws DatabaseException { MultiReadEntry existing = null; synchronized(multiReadEntryMap) { existing = (MultiReadEntry)multiReadEntryMap.get(r); @@ -931,7 +931,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -943,7 +943,7 @@ public class QueryCache extends QueryCacheBase { public static void runnerMultiReadEntry(ReadGraphImpl graph, MultiRead r, CacheEntry parent, ListenerBase listener, final AsyncMultiProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph, r); + MultiReadEntry entry = (MultiReadEntry)cache.getOrCreateMultiReadEntry(graph.processor, r); AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureMultiReadEntry; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -954,7 +954,7 @@ public class QueryCache extends QueryCacheBase { } } - AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead r) throws DatabaseException { + AsyncMultiReadEntry getOrCreateAsyncMultiReadEntry(QueryProcessor processor, AsyncMultiRead r) throws DatabaseException { AsyncMultiReadEntry existing = null; synchronized(asyncMultiReadEntryMap) { existing = (AsyncMultiReadEntry)asyncMultiReadEntryMap.get(r); @@ -971,7 +971,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -983,7 +983,7 @@ public class QueryCache extends QueryCacheBase { public static void runnerAsyncMultiReadEntry(ReadGraphImpl graph, AsyncMultiRead r, CacheEntry parent, ListenerBase listener, final AsyncMultiProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph, r); + AsyncMultiReadEntry entry = (AsyncMultiReadEntry)cache.getOrCreateAsyncMultiReadEntry(graph.processor, r); AsyncMultiProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncMultiReadEntry; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); @@ -994,7 +994,7 @@ public class QueryCache extends QueryCacheBase { } } - ExternalReadEntry getOrCreateExternalReadEntry(ReadGraphImpl graph, ExternalRead r) throws DatabaseException { + ExternalReadEntry getOrCreateExternalReadEntry(QueryProcessor processor, ExternalRead r) throws DatabaseException { ExternalReadEntry existing = null; synchronized(externalReadEntryMap) { existing = (ExternalReadEntry)externalReadEntryMap.get(r); @@ -1011,7 +1011,7 @@ public class QueryCache extends QueryCacheBase { return existing; } } - if(existing.isPending()) waitPending(graph, existing); + if(existing.isPending()) waitPending(processor, existing); return existing; } @@ -1023,7 +1023,7 @@ public class QueryCache extends QueryCacheBase { public static void runnerExternalReadEntry(ReadGraphImpl graph, ExternalRead r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure) throws DatabaseException { QueryCache cache = graph.processor.cache; - ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph, r); + ExternalReadEntry entry = (ExternalReadEntry)cache.getOrCreateExternalReadEntry(graph.processor, r); AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureExternalReadEntry; ListenerEntry listenerEntry = cache.registerDependencies(graph, entry, parent, listener, procedure_, false); if(entry.isReady()) entry.performFromCache(graph, procedure_); diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java index a75d69010..322e820a2 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java @@ -635,9 +635,7 @@ public class QueryCacheBase { } } - public static void waitPending(ReadGraphImpl graph, CacheEntry entry) throws DatabaseException { - - QueryProcessor processor = graph.processor; + public static void waitPending(QueryProcessor processor, CacheEntry entry) throws DatabaseException { int counter = 0; while(entry.isPending()) {