From: Antti Villberg Date: Thu, 23 Jan 2020 12:17:13 +0000 (+0200) Subject: Multiple readers in db client X-Git-Tag: v1.43.0~112^2 X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F11%2F3811%2F4;p=simantics%2Fplatform.git Multiple readers in db client gitlab #5 Change-Id: I5a44b7d19f6529f1ceceab1875e227db326e1c24 --- diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java index 2d3e2804c..c491fb377 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/BlockingAsyncProcedure.java @@ -12,64 +12,79 @@ package org.simantics.db.impl; import org.simantics.db.AsyncReadGraph; -import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; -import org.simantics.db.impl.graph.AsyncBarrierImpl; +import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; +import org.simantics.db.impl.query.AsyncReadEntry; +import org.simantics.db.impl.query.PendingTaskSupport; import org.simantics.db.procedure.AsyncProcedure; public class BlockingAsyncProcedure implements AsyncProcedure { private static final Object NO_RESULT = new Object(); - private final Object key; - private final AsyncBarrierImpl barrier; - private final ReadGraphImpl procedureGraph; - private final AsyncProcedure procedure; + public final Object key; + public final ReadGraphImpl queryGraph; + public final ReadGraphImpl callerGraph; + public final AsyncProcedure procedure; + public PendingTaskSupport pendingTaskSupport; + public Object result = NO_RESULT; + public Throwable exception = null; - private Object result = NO_RESULT; - private Throwable exception = null; + private ReadGraphImpl queryGraph() { + return queryGraph; + } + + public BlockingAsyncProcedure(ReadGraphImpl callerGraph, AsyncReadEntry entry, AsyncProcedure procedure, Object key, boolean needsToBlock) { + + // A new graph for evaluating the query with correct parent and asyncBarrier + queryGraph = callerGraph.withParent(entry, () -> { + + dispatchProcedure(queryGraph(), callerGraph, entry, procedure, needsToBlock); + + }); + + queryGraph.asyncBarrier.inc(); - public BlockingAsyncProcedure(AsyncBarrierImpl barrier, ReadGraphImpl procedureGraph, AsyncProcedure procedure, Object key) { this.procedure = procedure; this.key = key; - this.barrier = barrier; - this.barrier.inc(); - this.procedureGraph = procedureGraph; + this.queryGraph.asyncBarrier.inc(); + this.callerGraph = callerGraph; + if (BarrierTracing.BOOKKEEPING) { + BarrierTracing.registerBAP(this); + } } @Override public void execute(AsyncReadGraph graph_, Result result) { + this.result = result; - try { - if(procedure != null) procedure.execute(procedureGraph, result); - } catch (Throwable throwable) { - Logger.defaultLogError("AsyncProcedure.execute threw for " + procedure, throwable); - } finally { - barrier.dec(); - } + queryGraph.asyncBarrier.dec(); + } @Override public void exception(AsyncReadGraph graph_, Throwable t) { + this.exception = t; - try { - if(procedure != null) procedure.exception(procedureGraph, t); - } catch (Throwable throwable) { - Logger.defaultLogError("AsyncProcedure.exception threw for " + procedure, throwable); - } finally { - barrier.dec(); - } + queryGraph.asyncBarrier.dec(); + } - + public void waitBarrier() { - barrier.waitBarrier(key, procedureGraph); + queryGraph.asyncBarrier.waitBarrier(key, queryGraph); + } + + public void dec() { + + queryGraph.asyncBarrier.dec(); + } @SuppressWarnings("unchecked") public Result get() throws DatabaseException { - barrier.waitBarrier(key, procedureGraph); + queryGraph.asyncBarrier.waitBarrier(key, queryGraph); if(exception != null) { if(exception instanceof DatabaseException) throw (DatabaseException)exception; @@ -80,10 +95,6 @@ public class BlockingAsyncProcedure implements AsyncProcedure { } - public boolean isDone() { - return barrier.get() == 0; - } - @SuppressWarnings("unchecked") public Result getResult() { return (Result)result; @@ -97,5 +108,41 @@ public class BlockingAsyncProcedure implements AsyncProcedure { public String toString() { return "." + procedure; } + + private void dispatchProcedure(ReadGraphImpl queryGraph, ReadGraphImpl parentGraph, AsyncReadEntry entry, AsyncProcedure procedure_, boolean needsToBlock) { + + AsyncProcedure procedure = entry != null ? entry : procedure_; + + ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent); + executeGraph.asyncBarrier.inc(); + try { + if(procedure != null) { + procedure.execute(executeGraph, get()); + } + } catch (DatabaseException e) { + if(procedure != null) procedure.exception(executeGraph, e); + exception = e; + } catch (Throwable t) { + DatabaseException dbe = new DatabaseException(t); + if(procedure != null) procedure.exception(executeGraph, dbe); + exception = dbe; + } finally { + + if (entry != null) { + assert(entry.isReady()); + // This does not throw + entry.performFromCache(executeGraph, procedure_); + } + + executeGraph.asyncBarrier.dec(); + if(needsToBlock) + executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph); + } + + if (BarrierTracing.BOOKKEEPING) { + BarrierTracing.unregisterBAP(this); + } + + } } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java index b94ec6f68..060fc22ae 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java @@ -18,20 +18,24 @@ import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.impl.query.CacheEntry; import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier; +import org.simantics.db.impl.query.QueryProcessor.SessionTask; final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier { private static final long serialVersionUID = 4724463372850048672L; - static final int WAIT_TIME = 600; + static final int WAIT_TIME = 60000; public static final boolean PRINT = false; final public AsyncBarrierImpl caller; + + final public Runnable callback; - public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry entry) { + public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry entry, Runnable callback) { super(0); this.caller = caller; + this.callback = callback; if (BarrierTracing.BOOKKEEPING) { BarrierTracing.trace(this, entry); } @@ -92,6 +96,10 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie new Exception()); } assert (count >= 0); + + if(callback != null) + callback.run(); + } } @@ -142,7 +150,7 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie e.printStackTrace(); } } - if(waitCount > WAIT_TIME*1000) { + if(waitCount > WAIT_TIME) { System.err.println("AsyncBarrierImpl.waitBarrier(" + request @@ -153,6 +161,11 @@ final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrie synchronized (BarrierTracing.reverseLookup) { printReverse(this, 0); } + BarrierTracing.printBAPS(); + } + + for(SessionTask t : impl.processor.freeScheduling) { + System.err.println("Pending task:" + t); } // if(Development.DEVELOPMENT) { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/BarrierTracing.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/BarrierTracing.java index 3ff77f6bc..2433b897f 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/BarrierTracing.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/BarrierTracing.java @@ -5,6 +5,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import org.simantics.db.impl.BlockingAsyncProcedure; import org.simantics.db.impl.query.CacheEntry; import org.simantics.db.impl.query.QueryProcessor.SessionTask; @@ -19,7 +20,31 @@ public class BarrierTracing { public static final HashMap> entryMap = new HashMap<>(); public static final HashMap restartMap = new HashMap<>(); public static final HashMap startMap = new HashMap<>(); + public static final HashMap baps = new HashMap<>(); + synchronized public static void registerBAP(BlockingAsyncProcedure bap) { + baps.put(bap, new Exception()); + } + + synchronized public static void unregisterBAP(BlockingAsyncProcedure bap) { + baps.remove(bap); + } + + synchronized public static void printBAPS() { + for(BlockingAsyncProcedure bap : baps.keySet()) { + Throwable e = baps.get(bap); + System.err.println("BlockingAsyncProcedure"); + System.err.println("-key: " + bap.key); + System.err.println("-queryGraph: " + bap.queryGraph); + System.err.println("-callerGraph: " + bap.callerGraph); + System.err.println("-procedure: " + bap.procedure); + System.err.println("-pendingTaskSupport: " + bap.pendingTaskSupport); + System.err.println("-result: " + bap.result); + System.err.println("-exception: " + bap.exception); + e.printStackTrace(); + } + } + public static void trace(AsyncBarrierImpl barrier, CacheEntry entry) { if (RESTART_GUARD) { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java index a693b2922..2eced2b7c 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java @@ -1926,9 +1926,7 @@ public class ReadGraphImpl implements AsyncReadGraph { public T syncRequest(final Read request) throws DatabaseException { assert (request != null); - ITask task = ThreadLogger.task(request); T result = (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true); - task.finish(); return result; } @@ -1950,10 +1948,8 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); - ITask task = ThreadLogger.task(request); ListenerBase listener = procedure != null ? getListenerBase(procedure) : null; - T result = QueryCache.resultReadEntry(this, request, parent, listener, procedure); - task.finish(); + T result = (T)QueryCache.runnerReadEntry(this, request, parent, listener, procedure, true); return result; } @@ -2038,10 +2034,8 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); - ITask task = ThreadLogger.task(request); ListenerBase listener = getListenerBase(procedure); T result = (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true); - task.finish(); return result; } @@ -5160,7 +5154,11 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); assert (procedure != null); - + + AsyncBarrierImpl barrier = asyncBarrier; + if(barrier != null) + barrier.inc(); + processor.scheduleNow(new SessionTask(this) { @Override @@ -5170,6 +5168,9 @@ public class ReadGraphImpl implements AsyncReadGraph { QueryCache.runnerReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false); } catch (DatabaseException e) { Logger.defaultLogError(e); + } finally { + if(barrier != null) + barrier.dec(); } } @@ -5241,29 +5242,39 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); assert (procedure != null); - ITask task = ThreadLogger.task(request); + AsyncBarrierImpl barrier = asyncBarrier; + if(barrier != null) + barrier.inc(); processor.scheduleNow(new SessionTask(this) { @Override public void run0(int thread) { - try { + + if(barrier != null) + barrier.inc(); + + try { final ListenerBase listener = getListenerBase(procedure); QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, T result) { - task.finish(); procedure.execute(graph, result); + if(barrier != null) + barrier.dec(); } @Override public void exception(AsyncReadGraph graph, Throwable throwable) { - task.finish(); procedure.exception(graph, throwable); + if(barrier != null) + barrier.dec(); } }, false); + if(barrier != null) + barrier.dec(); } catch (DatabaseException e) { Logger.defaultLogError(e); } @@ -5651,27 +5662,47 @@ public class ReadGraphImpl implements AsyncReadGraph { this.parentGraph = parentGraph; this.parent = parent; this.processor = support; - this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent); + this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, null); } + ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support, AsyncBarrierImpl asyncBarrier) { + this.parentGraph = parentGraph; + this.parent = parent; + this.processor = support; + this.asyncBarrier = asyncBarrier; + } + ReadGraphImpl(ReadGraphImpl graph, CacheEntry parent) { this(graph, parent, graph.processor); } + ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback) { + this(parentGraph, parent, parentGraph.processor, new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback)); + } + ReadGraphImpl(ReadGraphImpl graph) { this(graph, graph.parent); } - public ReadGraphImpl withParent(CacheEntry parent) { - return new ReadGraphImpl(this, parent); + public ReadGraphImpl withParent(CacheEntry parent, Runnable callback) { + return new ReadGraphImpl(this, parent, callback); } + public ReadGraphImpl withParent(CacheEntry parent) { + return withParent(parent, null); + } + + public ReadGraphImpl syncWithParent(CacheEntry parent) { + return new ReadGraphImpl(this, parent, processor, null); + } + public ReadGraphImpl forRecompute(CacheEntry parent) { return new ReadGraphImpl(null, parent, processor); } public static ReadGraphImpl create(QueryProcessor support) { - return new ReadGraphImpl(null, null, support); + ReadGraphImpl result = new ReadGraphImpl(null, null, support); + return result; } public ReadGraphImpl newRestart(ReadGraphImpl impl) { @@ -6357,6 +6388,12 @@ public class ReadGraphImpl implements AsyncReadGraph { else return 1 + getLevelStatic(impl.parentGraph); } + public boolean isParent(ReadGraphImpl impl) { + if(impl == null) return false; + if(this == impl) return true; + return isParent(impl.parentGraph); + } + public ReadGraphImpl getTopLevelGraph() { return getTopLevelGraphStatic(this); } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java index e13ecab72..0dd5730a5 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java @@ -11,13 +11,13 @@ *******************************************************************************/ package org.simantics.db.impl.query; +import java.util.Collection; + import org.simantics.databoard.Bindings; import org.simantics.db.AsyncReadGraph; import org.simantics.db.DevelopmentKeys; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.BlockingAsyncProcedure; -import org.simantics.db.impl.graph.AsyncBarrierImpl; -import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.procedure.AsyncProcedure; @@ -26,11 +26,12 @@ import org.simantics.utils.Development; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final public class AsyncReadEntry extends CacheEntryBase> implements AsyncProcedure { +final public class AsyncReadEntry extends CacheEntryBase> implements AsyncProcedure, IPending { private static final Logger LOGGER = LoggerFactory.getLogger(AsyncReadEntry.class); protected AsyncRead id; + protected PendingTaskSupport pendingTaskSupport; AsyncReadEntry(AsyncRead request) { this.id = request; @@ -77,7 +78,7 @@ final public class AsyncReadEntry extends CacheEntryBase> i try { - BlockingAsyncProcedure proc = new BlockingAsyncProcedure<>(graph.asyncBarrier, graph, new AsyncProcedure() { + BlockingAsyncProcedure proc = new BlockingAsyncProcedure(graph, AsyncReadEntry.this, new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, T result) { @@ -90,10 +91,11 @@ final public class AsyncReadEntry extends CacheEntryBase> i except(t); } - }, id); - - id.perform(graph, proc); + }, id, true); + id.perform(proc.queryGraph, proc); + + proc.dec(); proc.get(); } catch (Throwable t) { @@ -152,103 +154,24 @@ final public class AsyncReadEntry extends CacheEntryBase> i } - public static T computeForEach(ReadGraphImpl graph, AsyncRead request, AsyncReadEntry entry, + public static T computeForEach(ReadGraphImpl callerGraph, AsyncRead request, AsyncReadEntry entry, AsyncProcedure procedure_, boolean needsToBlock) throws DatabaseException { - AsyncProcedure procedure = entry != null ? entry : procedure_; - - ReadGraphImpl queryGraph = graph.withParent(entry); - queryGraph.asyncBarrier.inc(); - - BlockingAsyncProcedure proc = new BlockingAsyncProcedure<>(queryGraph.asyncBarrier, graph, null, request); - - class AsyncTask extends SessionTask { - - int counter = 0; - T result; - DatabaseException exception; - - public AsyncTask(ReadGraphImpl graph) { - this(graph, 1); - } - - public AsyncTask(ReadGraphImpl graph, int pos) { - super(graph); - this.position = pos; - if(this.position < 1024) - this.position *= 2; - } - - @Override - public void run0(int thread) { - if(needsToBlock) proc.waitBarrier(); - if(proc.isDone()) { - ReadGraphImpl executeGraph = graph.withParent(graph.parent); - executeGraph.asyncBarrier.inc(); - try { - result = (T)proc.get(); - if(procedure != null) { - procedure.execute(executeGraph, result); - } - } catch (DatabaseException e) { - if(procedure != null) procedure.exception(executeGraph, e); - exception = e; - } catch (Throwable t) { - DatabaseException dbe = new DatabaseException(t); - if(procedure != null) procedure.exception(executeGraph, dbe); - exception = dbe; - } finally { - if (entry != null) { - // This does not throw - entry.performFromCache(executeGraph, procedure_); - } - executeGraph.asyncBarrier.dec(); - executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph); - } - } else { - if(counter++ > 10000) { - if(BarrierTracing.BOOKKEEPING) { - AsyncBarrierImpl.printReverse(queryGraph.asyncBarrier, 2); - AsyncBarrierImpl caller = queryGraph.asyncBarrier.caller; - while(caller != null) { - System.err.println("called by " + AsyncBarrierImpl.report(caller)); - caller = caller.caller; - } - for(AsyncBarrierImpl ab : BarrierTracing.debuggerMap.keySet()) { - AsyncBarrierImpl.printReverse(ab, 2); - } - } - throw new IllegalStateException("Eternal loop in queries."); - } - graph.processor.scheduleLater(new AsyncTask(graph, position)); - } - } - - @Override - public boolean maybeReady() { - return proc.isDone(); - } - - } + BlockingAsyncProcedure proc = new BlockingAsyncProcedure(callerGraph, entry, procedure_, request, needsToBlock); try { - request.perform(queryGraph, proc); + request.perform(proc.queryGraph, proc); } finally { - queryGraph.asyncBarrier.dec(); + proc.queryGraph.asyncBarrier.dec(); } - AsyncTask task = new AsyncTask(graph); - - if(needsToBlock) task.run(0); - else if (proc.isDone()) task.run(0); - else { - graph.processor.scheduleLater(task); + if(needsToBlock) { + proc.waitBarrier(); + return proc.get(); + } else { return null; } - if(task.exception != null) throw task.exception; - else return task.result; - } @Override @@ -263,13 +186,41 @@ final public class AsyncReadEntry extends CacheEntryBase> i @Override public void execute(AsyncReadGraph graph, T result) { - setResult(result); - setReady(); + Collection tasks = null; + synchronized(this) { + setResult(result); + setReady(); + if(pendingTaskSupport != null) + tasks = pendingTaskSupport.executePending(); + } + if(tasks != null) + for(SessionTask task : tasks) + ((ReadGraphImpl)graph).processor.scheduleNow(task); } @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - except(throwable); + public synchronized void exception(AsyncReadGraph graph, Throwable throwable) { + Collection tasks = null; + synchronized(this) { + except(throwable); + if(pendingTaskSupport != null) + tasks = pendingTaskSupport.executePending(); + } + if(tasks != null) + for(SessionTask task : tasks) + ((ReadGraphImpl)graph).processor.scheduleNow(task); + } + + public void executeWhenResultIsAvailable(QueryProcessor processor, SessionTask task) { + boolean ready = false; + synchronized(this) { + if(pendingTaskSupport == null) + pendingTaskSupport = new PendingTaskSupport(this); + ready = pendingTaskSupport.executeWhenResultIsAvailable(task); + } + if(ready) { + processor.scheduleNow(task); + } } } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/IPending.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/IPending.java new file mode 100644 index 000000000..c7381353b --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/IPending.java @@ -0,0 +1,7 @@ +package org.simantics.db.impl.query; + +public interface IPending { + + boolean isPending(); + +} diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/PendingTaskSupport.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/PendingTaskSupport.java new file mode 100644 index 000000000..f63bad581 --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/PendingTaskSupport.java @@ -0,0 +1,45 @@ +package org.simantics.db.impl.query; + +import java.util.ArrayList; +import java.util.Collection; + +import org.simantics.db.impl.query.QueryProcessor.SessionTask; + +/* + * Support class for queuing pending tasks to be executed when result gets ready + */ +public class PendingTaskSupport { + + private ArrayList pendingTasks; + private IPending pending; + + public PendingTaskSupport(IPending pending) { + this.pending = pending; + } + + /* + * We assume here that the associated IPending performs this atomically + * The caller is responsible for execution of the returned task after the critical section + */ + public boolean executeWhenResultIsAvailable(SessionTask task) { + if(pending.isPending()) { + if(pendingTasks == null) + pendingTasks = new ArrayList(); + pendingTasks.add(task); + return false; + } else { + return true; + } + } + + /* + * We assume here that the associated IPending performs this atomically after changing the pending result + * The caller is responsible for execution of the returned task after the critical section + */ + public Collection executePending() { + ArrayList ret = pendingTasks; + pendingTasks = null; + return ret; + } + +} diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java index cc0ca919b..35ebdbc65 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCache.java @@ -625,7 +625,7 @@ public class QueryCache extends QueryCacheBase { } } - ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read r, boolean needsToBlock) throws DatabaseException { + private final ReadEntry getOrCreateReadEntry(ReadGraphImpl graph, Read r, boolean needsToBlock) throws DatabaseException { ReadEntry existing = null; synchronized(readEntryMap) { existing = (ReadEntry)readEntryMap.get(r); @@ -670,11 +670,13 @@ public class QueryCache extends QueryCacheBase { } ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock); if(entry == null) { - graph.processor.scheduleNow(new SessionTask(graph) { + graph.asyncBarrier.inc(); + graph.processor.scheduleNow(new SessionTask() { @Override public void run0(int thread) { try { runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock); + graph.asyncBarrier.dec(); } catch (DatabaseException e) { Logger.defaultLogError(e); } @@ -704,7 +706,7 @@ public class QueryCache extends QueryCacheBase { } } - AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, boolean needsToBlock) throws DatabaseException { + AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException { AsyncReadEntry existing = null; synchronized(asyncReadEntryMap) { existing = (AsyncReadEntry)asyncReadEntryMap.get(r); @@ -721,11 +723,21 @@ public class QueryCache extends QueryCacheBase { } } if(existing.isPending()) { - if(needsToBlock) - waitPending(graph, existing); - else { - return null; - } + if(needsToBlock) + waitPending(graph, existing); + else { + existing.executeWhenResultIsAvailable(graph.processor, new SessionTask(graph) { + @Override + public void run0(int thread) { + try { + runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + } + }); + return null; + } } return existing; } @@ -747,19 +759,10 @@ public class QueryCache extends QueryCacheBase { } return AsyncReadEntry.computeForEach(graph, r, null, procedure, needsToBlock); } - AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock); + AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock); if(entry == null) { - graph.processor.scheduleNow(new SessionTask(graph) { - @Override - public void run0(int thread) { - try { - runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - } - }); - return null; + // Entry was pending and this request has been queued + return null; } AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry; if(entry.isReady()) { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java index 54efda839..deeb65fdd 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java @@ -421,7 +421,7 @@ public class QueryCacheBase { int counter = 0; while(entry.isPending()) { try { - boolean performed = graph.performPending(); + boolean performed = false;//graph.performPending(); if(!performed) { Thread.sleep(1); counter++; @@ -905,10 +905,6 @@ public class QueryCacheBase { return wrap.get(); } - public static T resultReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, AsyncProcedure procedure) throws DatabaseException { - return (T)QueryCache.runnerReadEntry(graph, r, parent, listener, procedure, true); - } - public static T resultAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, AsyncProcedure procedure) throws DatabaseException { return (T)QueryCache.runnerAsyncReadEntry(graph, r, parent, listener, procedure, true); } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index ee4b11175..eaaa9b373 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -143,6 +143,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap QueryThread[] executors; public LinkedList freeScheduling = new LinkedList(); + + public LinkedList topLevelTasks = new LinkedList(); enum ThreadState { @@ -159,13 +161,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getSubTask(ReadGraphImpl impl) { + public SessionTask getSubTask(ReadGraphImpl parent) { synchronized(querySupportLock) { int index = 0; while(index < freeScheduling.size()) { SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) { - queueLength.decrementAndGet(); + if(task.isSubtask(parent) && task.maybeReady()) { return freeScheduling.remove(index); } index++; @@ -178,82 +179,46 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap * We are running errands while waiting for requests to complete. * We can only run work that is part of the current root request to avoid any deadlocks */ - public boolean performPending(ReadGraphImpl graph) { - SessionTask task = getSubTask(graph); + public boolean performPending(ReadGraphImpl under) { + SessionTask task = getSubTask(under); if(task != null) { - task.run(QueryProcessor.thread.get()); + task.run(thread.get()); return true; } return false; } - + final public void scheduleNow(SessionTask request) { - schedule(request, false); + SessionTask toExecute = scheduleOrReturnForExecution(request); + if(toExecute != null) + toExecute.run(thread.get()); } - final public void scheduleLater(SessionTask request) { - schedule(request, true); - } + final public SessionTask scheduleOrReturnForExecution(SessionTask request) { - AtomicInteger queueLength = new AtomicInteger(0); - - final public void schedule(SessionTask request, boolean late) { - - int queueLengthEstimate = queueLength.get(); - if(!late && queueLengthEstimate > 80) { - request.run(thread.get()); - return; - } - - assert(request != null); - - synchronized(querySupportLock) { + assert(request != null); - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } + synchronized(querySupportLock) { - if(late) { - int pos = request.position - 1; - if(pos < freeScheduling.size()) { - freeScheduling.add(pos, request); - queueLength.incrementAndGet(); - requests.release(); - } else { - freeScheduling.addLast(request); - queueLength.incrementAndGet(); - requests.release(); - } - } - else { - if(request.getLevel() < 4) { - if(freeScheduling.size() < 100) { - freeScheduling.addFirst(request); - queueLength.incrementAndGet(); - requests.release(); - } else { - request.run(thread.get()); - } - } else { - if(freeScheduling.size() < 20) { - freeScheduling.addFirst(request); - queueLength.incrementAndGet(); - requests.release(); - } else { - request.run(thread.get()); - } - } - } + LinkedList queue = request.rootGraph != null ? freeScheduling : topLevelTasks; + + if(BarrierTracing.BOOKKEEPING) { + Exception current = new Exception(); + Exception previous = BarrierTracing.tasks.put(request, current); + if(previous != null) { + previous.printStackTrace(); + current.printStackTrace(); + } + } + queue.addFirst(request); + requests.release(); - } + } - } + return null; + + } final int THREADS; @@ -263,20 +228,22 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public static abstract class SessionTask { - public final ReadGraphImpl graph; + final protected ReadGraphImpl rootGraph; private int counter = 0; protected int position = 1; private Exception trace; - public SessionTask(ReadGraphImpl graph) { - this.graph = graph; - if(graph != null) graph.asyncBarrier.inc(); + public SessionTask() { + this(null); } - - public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) { - if(r1 == null || r2 == null) return false; - return r1.getTopLevelGraph() == r2.getTopLevelGraph(); - } + + public SessionTask(ReadGraphImpl rootGraph) { + this.rootGraph = rootGraph; + } + + public boolean isSubtask(ReadGraphImpl graph) { + return graph.isParent(rootGraph); + } public abstract void run0(int thread); @@ -292,7 +259,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap trace = new Exception(); } run0(thread); - if(graph != null) graph.asyncBarrier.dec(); } public boolean maybeReady() { @@ -301,15 +267,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override public String toString() { - if(graph == null) + if(rootGraph == null) return "SessionTask[no graph]"; else - return "SessionTask[" + graph.parent + "]"; - } - - public int getLevel() { - if(graph == null) return 0; - else return graph.getLevel(); + return "SessionTask[" + rootGraph.parent + "]"; } } @@ -327,37 +288,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - long waitingTime = 0; - - static int koss = 0; - static int koss2 = 0; - public boolean resume(ReadGraphImpl graph) { return executors[0].runSynchronized(); } - - //private WeakReference garbageTracker; - - private class GarbageTracker { - - @Override - protected void finalize() throws Throwable { - -// System.err.println("GarbageTracker"); -// -// garbageTracker = new WeakReference(new GarbageTracker()); - - super.finalize(); - - } - - } public QueryProcessor(final int threads, QuerySupport core, Set threadSet) throws DatabaseException { - //garbageTracker = new WeakReference(new GarbageTracker()); - THREADS = threads; THREAD_MASK = threads - 1; @@ -368,31 +305,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap querySupportLock = core.getLock(); executors = new QueryThread[THREADS]; -// queues = new ArrayList[THREADS]; -// threadLocks = new ReentrantLock[THREADS]; -// threadConditions = new Condition[THREADS]; threadStates = new ThreadState[THREADS]; -// ownTasks = new ArrayList[THREADS]; -// ownSyncTasks = new ArrayList[THREADS]; -// delayQueues = new ArrayList[THREADS * THREADS]; - - // freeSchedule = new AtomicInteger(0); - -// for (int i = 0; i < THREADS * THREADS; i++) { -// delayQueues[i] = new ArrayList(); -// } for (int i = 0; i < THREADS; i++) { - - // tasks[i] = new ArrayList(); -// ownTasks[i] = new ArrayList(); -// ownSyncTasks[i] = new ArrayList(); -// queues[i] = new ArrayList(); -// threadLocks[i] = new ReentrantLock(); -// threadConditions[i] = threadLocks[i].newCondition(); - // limits[i] = false; threadStates[i] = ThreadState.INIT; - } for (int i = 0; i < THREADS; i++) { diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java index 5510944dc..cfa088a2d 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java @@ -98,11 +98,16 @@ class QueryThread extends Thread implements SessionThread { } private boolean pumpTask() { - if(!processor.freeScheduling.isEmpty()) { - tasks.add(processor.freeScheduling.removeFirst()); - processor.queueLength.decrementAndGet(); - return true; - } + // First finish existing executions + if(!processor.freeScheduling.isEmpty()) { + tasks.add(processor.freeScheduling.removeFirst()); + return true; + } + // Check for new tasks + if(!processor.topLevelTasks.isEmpty()) { + tasks.add(processor.topLevelTasks.removeFirst()); + return true; + } return false; } @@ -226,8 +231,6 @@ class QueryThread extends Thread implements SessionThread { @Override public void run() { - processor.thread.set(index); - QuerySupport support = this.querySupport; try { @@ -242,23 +245,10 @@ class QueryThread extends Thread implements SessionThread { while(!tasks.isEmpty()) { SessionTask task = tasks.remove(tasks.size()-1); -// System.err.println("QT " + index + " runs " + task); - task.run(index); + task.run(0); } -// for(int performer=0;performer extends CacheEntryBase> implem } finally { executeGraph.asyncBarrier.dec(); - executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph); + if(needsToBlock) + executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph); } diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 8220ba743..030d512eb 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -1639,7 +1639,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } else { - BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph.asyncBarrier, newGraph, procedure, request) { + BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph, null, procedure, request, true) { public void execute(AsyncReadGraph graph_, T result) { task.finish(); @@ -1656,6 +1656,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { request.perform(newGraph, wrap); + wrap.dec(); wrap.get(); } catch (DatabaseException e) { diff --git a/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/ConnectionRequest2.java b/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/ConnectionRequest2.java new file mode 100644 index 000000000..36a01b54c --- /dev/null +++ b/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/ConnectionRequest2.java @@ -0,0 +1,101 @@ +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.diagram.adapter; + +import org.simantics.db.AsyncReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.common.primitiverequest.Adapter; +import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener; +import org.simantics.db.procedure.AsyncProcedure; +import org.simantics.diagram.synchronization.ErrorHandler; +import org.simantics.g2d.canvas.ICanvasContext; +import org.simantics.g2d.diagram.IDiagram; +import org.simantics.g2d.element.ElementClass; +import org.simantics.g2d.element.IElement; +import org.simantics.scl.runtime.tuple.Tuple3; + +/** + * @author Antti Villberg + */ +public class ConnectionRequest2 extends BaseRequest2 { + + final IDiagram diagram; + final ErrorHandler errorHandler; + + public ConnectionRequest2(ICanvasContext canvas, IDiagram diagram, Resource resource, ErrorHandler errorHandler) { + super(canvas, resource); + this.diagram = diagram; + this.errorHandler = errorHandler; + } + + @Override + public void perform(AsyncReadGraph graph, final AsyncProcedure procedure) { + + graph.forHasStatement(data, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + procedure.exception(graph, throwable); + } + + @Override + public void execute(AsyncReadGraph graph, Boolean result) { + + if (!result) { + procedure.execute(graph, null); + return; + } + + graph.asyncRequest(new Adapter(data, ElementFactory.class), new TransientCacheAsyncListener() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + errorHandler.error("Unexpected ElementFactory adaption failure", throwable); + procedure.execute(graph, null); + } + + @Override + public void execute(AsyncReadGraph graph, final ElementFactory factory) { + + graph.asyncRequest(new GetElementClassRequest(factory, data, canvas, diagram), new TransientCacheAsyncListener() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + errorHandler.error("Unexpected ElementClass creation failure", throwable); + procedure.execute(graph, null); + } + + @Override + public void execute(AsyncReadGraph graph, final ElementClass ec) { + + graph.asyncRequest(new SpawnRequest(canvas, ec, data), new TransientCacheAsyncListener() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + errorHandler.error("Unexpected SpawnRequest failure", throwable); + procedure.execute(graph, null); + } + + @Override + public void execute(AsyncReadGraph graph, IElement element) { + procedure.execute(graph, new Tuple3(element, ec, factory)); + } + }); + } + }); + } + }); + } + }); + } + +} diff --git a/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/GraphToDiagramSynchronizer.java b/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/GraphToDiagramSynchronizer.java index 226f47daa..37471eb3a 100644 --- a/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/GraphToDiagramSynchronizer.java +++ b/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/GraphToDiagramSynchronizer.java @@ -138,6 +138,7 @@ import org.simantics.scenegraph.INode; import org.simantics.scenegraph.profile.DataNodeConstants; import org.simantics.scenegraph.profile.DataNodeMap; import org.simantics.scenegraph.profile.common.ProfileObserver; +import org.simantics.scl.runtime.tuple.Tuple3; import org.simantics.structural2.modelingRules.IModelingRules; import org.simantics.utils.datastructures.ArrayMap; import org.simantics.utils.datastructures.MapSet; @@ -1649,6 +1650,90 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID this.removedRouteGraphConnections.clear(); } + class LoadNodeListener extends DisposableListener { + + final Resource element; + public IElement lastLoaded; + + public LoadNodeListener(ListenerSupport support, Resource element) { + super(support); + this.element = element; + } + + @Override + public String toString() { + return "Node load listener for " + element; + } + + public void applyFirst(IElement loaded) { + + Object data = loaded.getHint(ElementHints.KEY_OBJECT); + + if (addedElementMap.containsKey(data)) { + // This element was just loaded, in + // which case its hints need to + // uploaded to the real mapped + // element immediately. + IElement mappedElement = getMappedElement(data); + if (DebugPolicy.DEBUG_NODE_LISTENER) + System.out.println("LOADED ADDED ELEMENT, currently mapped element: " + mappedElement); + if (mappedElement != null && (mappedElement instanceof Element)) { + if (DebugPolicy.DEBUG_NODE_LISTENER) { + System.out.println(" mapped hints: " + mappedElement.getHints()); + System.out.println(" loaded hints: " + loaded.getHints()); + } + updateMappedElement((Element) mappedElement, loaded); + } + } + + } + + @Override + public void execute(IElement loaded) { + + // Invoked when the element has been loaded. + if (DebugPolicy.DEBUG_NODE_LISTENER) + System.out.println("NODE LoadListener for " + loaded); + + if (loaded == null) { + disposeListener(); + return; + } + + + boolean first = lastLoaded == null; + + lastLoaded = loaded; + + /* + * The first invocation is postponed + */ + if(first) { + applyFirst(loaded); + return; + } + + Object data = loaded.getHint(ElementHints.KEY_OBJECT); + + // Logic for disposing listener + if (!previousContent.nodeSet.contains(data)) { + if (DebugPolicy.DEBUG_NODE_LISTENER) + System.out.println("NODE LoadListener, node not in current content: " + data + ". Disposing."); + disposeListener(); + return; + } + + // This element was already loaded. + // Just schedule an update some time + // in the future. + if (DebugPolicy.DEBUG_NODE_LISTENER) + System.out.println("PREVIOUSLY LOADED NODE UPDATED, scheduling update into the future"); + offerGraphUpdate( nodeUpdater(element, loaded) ); + + } + + } + void processNodes(AsyncReadGraph graph) throws DatabaseException { for (Map.Entry entry : changes.elements.entrySet()) { @@ -1745,6 +1830,13 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID @Override public void execute(AsyncReadGraph graph, final IElement e) { + mapElement(element, e); + synchronized (GraphToDiagramUpdater.this) { + addedElements.add(e); + addedElementMap.put(element, e); + addedConnectionMap.put(element, e); + } + // Read connection type graph.forSingleType(element, br.DIA.Connection, new Procedure() { @Override @@ -1778,87 +1870,33 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID }); } else if (content.nodeSet.contains(element)) { - Listener loadListener = new DisposableListener(canvasListenerSupport) { - - boolean firstTime = true; + graph.asyncRequest(new ReadRequest() { @Override - public String toString() { - return "Node load listener for " + element; - } - @Override - public void execute(IElement loaded) { - // Invoked when the element has been loaded. - if (DebugPolicy.DEBUG_NODE_LISTENER) - System.out.println("NODE LoadListener for " + loaded); - - if (loaded == null) { - disposeListener(); - return; - } - - if (firstTime) { - - // This is invoked before the element is actually loaded. - //System.out.println("NodeRequestProcedure " + e); - if (DebugPolicy.DEBUG_NODE_LOAD) - System.out.println("MAPPING ADDED NODE: " + element + " -> " + loaded); - mapElement(element, loaded); - synchronized (GraphToDiagramUpdater.this) { - addedElements.add(loaded); - addedElementMap.put(element, loaded); - } - - firstTime = false; - - } - - Object data = loaded.getHint(ElementHints.KEY_OBJECT); + public void run(ReadGraph graph) throws DatabaseException { - // Logic for disposing listener - if (!previousContent.nodeSet.contains(data)) { - if (DebugPolicy.DEBUG_NODE_LISTENER) - System.out.println("NODE LoadListener, node not in current content: " + data + ". Disposing."); - disposeListener(); + LoadNodeListener loadListener = new LoadNodeListener(canvasListenerSupport, element); + Tuple3 t = graph.syncRequest(new NodeRequest2(canvas, diagram, element)); + IElement e = (IElement)t.c0; + ElementClass ec = (ElementClass)t.c1; + org.simantics.diagram.adapter.ElementFactory ef = (org.simantics.diagram.adapter.ElementFactory)t.c2; + if (e == null) return; + + // This is invoked before the element is actually loaded. + //System.out.println("NodeRequestProcedure " + e); + if (DebugPolicy.DEBUG_NODE_LOAD) + System.out.println("MAPPING ADDED NODE: " + element + " -> " + e); + mapElement(element, e); + synchronized (GraphToDiagramUpdater.this) { + addedElements.add(e); + addedElementMap.put(element, e); } - if (addedElementMap.containsKey(data)) { - // This element was just loaded, in - // which case its hints need to - // uploaded to the real mapped - // element immediately. - IElement mappedElement = getMappedElement(data); - if (DebugPolicy.DEBUG_NODE_LISTENER) - System.out.println("LOADED ADDED ELEMENT, currently mapped element: " + mappedElement); - if (mappedElement != null && (mappedElement instanceof Element)) { - if (DebugPolicy.DEBUG_NODE_LISTENER) { - System.out.println(" mapped hints: " + mappedElement.getHints()); - System.out.println(" loaded hints: " + loaded.getHints()); - } - updateMappedElement((Element) mappedElement, loaded); - } - } else { - // This element was already loaded. - // Just schedule an update some time - // in the future. - if (DebugPolicy.DEBUG_NODE_LISTENER) - System.out.println("PREVIOUSLY LOADED NODE UPDATED, scheduling update into the future"); - offerGraphUpdate( nodeUpdater(element, loaded) ); - } - } - }; - - //System.out.println("NODE REQUEST: " + element); - graph.asyncRequest(new NodeRequest(canvas, diagram, element, loadListener), new AsyncProcedure() { - @Override - public void execute(AsyncReadGraph graph, IElement e) { - } - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - error(throwable); + graph.syncRequest(new LoadRequest(canvas, diagram, ef, ec, element), loadListener); + } + }); } else { @@ -1980,8 +2018,121 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID } } } + + class LoadRouteGraphConnectionListener extends DisposableListener { + + final Resource connection; + public IElement lastLoaded; + + public LoadRouteGraphConnectionListener(ListenerSupport support, Resource connection) { + super(support); + this.connection = connection; + } + + @Override + public String toString() { + return "processRouteGraphConnections " + connection; + } + + public void applyFirst(IElement loaded) { + + Object data = loaded.getHint(ElementHints.KEY_OBJECT); + if (addedElementMap.containsKey(data)) { + // This element was just loaded, in + // which case its hints need to + // uploaded to the real mapped + // element immediately. + IElement mappedElement = getMappedElement(data); + if (DebugPolicy.DEBUG_CONNECTION_LISTENER) + System.out.println("LOADED ADDED ROUTE GRAPH CONNECTION, currently mapped connection: " + mappedElement); + if (mappedElement instanceof Element) { + if (DebugPolicy.DEBUG_CONNECTION_LISTENER) { + System.out.println(" mapped hints: " + mappedElement.getHints()); + System.out.println(" loaded hints: " + loaded.getHints()); + } + updateMappedElement((Element) mappedElement, loaded); + } + } + + } + + @Override + public void execute(IElement loaded) { + + // Invoked when the element has been loaded. + if (DebugPolicy.DEBUG_CONNECTION_LISTENER) + System.out.println("ROUTE GRAPH CONNECTION LoadListener for " + loaded); + + if (loaded == null) { + disposeListener(); + return; + } + + boolean first = lastLoaded == null; + + lastLoaded = loaded; + + /* + * The first invocation is postponed + */ + if(first) { + applyFirst(loaded); + return; + } + + Object data = loaded.getHint(ElementHints.KEY_OBJECT); + + // Logic for disposing listener + if (!previousContent.routeGraphConnectionSet.contains(data)) { + if (DebugPolicy.DEBUG_CONNECTION_LISTENER) + System.out.println("ROUTE GRAPH CONNECTION LoadListener, connection not in current content: " + data + ". Disposing."); + disposeListener(); + return; + } + + if (addedElementMap.containsKey(data)) { + // This element was just loaded, in + // which case its hints need to + // uploaded to the real mapped + // element immediately. + IElement mappedElement = getMappedElement(data); + if (DebugPolicy.DEBUG_CONNECTION_LISTENER) + System.out.println("LOADED ADDED ROUTE GRAPH CONNECTION, currently mapped connection: " + mappedElement); + if (mappedElement instanceof Element) { + if (DebugPolicy.DEBUG_CONNECTION_LISTENER) { + System.out.println(" mapped hints: " + mappedElement.getHints()); + System.out.println(" loaded hints: " + loaded.getHints()); + } + updateMappedElement((Element) mappedElement, loaded); + } + } else { + // This element was already loaded. + // Just schedule an update some time + // in the future. + if (DebugPolicy.DEBUG_CONNECTION_LISTENER) + System.out.println("PREVIOUSLY LOADED ROUTE GRAPH CONNECTION UPDATED, scheduling update into the future: " + connection); + + Set dirtyNodes = new THashSet(4); + IElement mappedElement = getMappedElement(connection); + ConnectionEntity ce = mappedElement.getHint(ElementHints.KEY_CONNECTION_ENTITY); + if (ce != null) { + for (Connection conn : ce.getTerminalConnections(null)) { + Object o = conn.node.getHint(ElementHints.KEY_OBJECT); + if (o != null) { + dirtyNodes.add(o); + if (DebugPolicy.DEBUG_CONNECTION_LISTENER) + System.out.println("Marked connectivity dirty for node: " + conn.node); + } + } + } + + offerGraphUpdate( routeGraphConnectionUpdater(connection, loaded, dirtyNodes) ); + } + } + }; - void processRouteGraphConnections(ReadGraph graph) throws DatabaseException { + + void processRouteGraphConnections(AsyncReadGraph graph) throws DatabaseException { for (Map.Entry entry : changes.routeGraphConnections.entrySet()) { final Resource connection = entry.getKey(); @@ -1992,97 +2143,39 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID if (mappedElement != null) continue; - Listener loadListener = new DisposableListener(canvasListenerSupport) { - - boolean firstTime = true; + + graph.asyncRequest(new ReadRequest() { @Override - public String toString() { - return "processRouteGraphConnections " + connection; - } - @Override - public void execute(IElement loaded) { - // Invoked when the element has been loaded. - if (DebugPolicy.DEBUG_CONNECTION_LISTENER) - System.out.println("ROUTE GRAPH CONNECTION LoadListener for " + loaded); + public void run(ReadGraph graph) throws DatabaseException { - if (loaded == null) { - disposeListener(); - return; - } - - if(firstTime) { - if (DebugPolicy.DEBUG_NODE_LOAD) - System.out.println("MAPPING ADDED ROUTE GRAPH CONNECTION: " + connection + " -> " + loaded); - mapElement(connection, loaded); - synchronized (GraphToDiagramUpdater.this) { - addedElements.add(loaded); - addedElementMap.put(connection, loaded); - addedRouteGraphConnectionMap.put(connection, loaded); - } - firstTime = false; - } + LoadRouteGraphConnectionListener loadListener = new LoadRouteGraphConnectionListener(canvasListenerSupport, connection); - Object data = loaded.getHint(ElementHints.KEY_OBJECT); + Tuple3 t = graph.syncRequest(new ConnectionRequest2(canvas, diagram, connection, errorHandler)); + IElement e = (IElement)t.c0; + ElementClass ec = (ElementClass)t.c1; + org.simantics.diagram.adapter.ElementFactory ef = (org.simantics.diagram.adapter.ElementFactory)t.c2; - // Logic for disposing listener - if (!previousContent.routeGraphConnectionSet.contains(data)) { - if (DebugPolicy.DEBUG_CONNECTION_LISTENER) - System.out.println("ROUTE GRAPH CONNECTION LoadListener, connection not in current content: " + data + ". Disposing."); - disposeListener(); + if (e == null) return; + + //System.out.println("ConnectionRequestProcedure " + e); + if (DebugPolicy.DEBUG_NODE_LOAD) + System.out.println("MAPPING ADDED ROUTE GRAPH CONNECTION: " + connection + " -> " + e); + mapElement(connection, e); + synchronized (GraphToDiagramUpdater.this) { + addedElements.add(e); + addedElementMap.put(connection, e); + addedRouteGraphConnectionMap.put(connection, e); } - if (addedElementMap.containsKey(data)) { - // This element was just loaded, in - // which case its hints need to - // uploaded to the real mapped - // element immediately. - IElement mappedElement = getMappedElement(data); - if (DebugPolicy.DEBUG_CONNECTION_LISTENER) - System.out.println("LOADED ADDED ROUTE GRAPH CONNECTION, currently mapped connection: " + mappedElement); - if (mappedElement instanceof Element) { - if (DebugPolicy.DEBUG_CONNECTION_LISTENER) { - System.out.println(" mapped hints: " + mappedElement.getHints()); - System.out.println(" loaded hints: " + loaded.getHints()); - } - updateMappedElement((Element) mappedElement, loaded); - } - } else { - // This element was already loaded. - // Just schedule an update some time - // in the future. - if (DebugPolicy.DEBUG_CONNECTION_LISTENER) - System.out.println("PREVIOUSLY LOADED ROUTE GRAPH CONNECTION UPDATED, scheduling update into the future: " + connection); - - Set dirtyNodes = new THashSet(4); - IElement mappedElement = getMappedElement(connection); - ConnectionEntity ce = mappedElement.getHint(ElementHints.KEY_CONNECTION_ENTITY); - if (ce != null) { - for (Connection conn : ce.getTerminalConnections(null)) { - Object o = conn.node.getHint(ElementHints.KEY_OBJECT); - if (o != null) { - dirtyNodes.add(o); - if (DebugPolicy.DEBUG_CONNECTION_LISTENER) - System.out.println("Marked connectivity dirty for node: " + conn.node); - } - } - } + graph.syncRequest(new LoadRequest(canvas, diagram, ef, ec, connection), loadListener); - offerGraphUpdate( routeGraphConnectionUpdater(connection, loaded, dirtyNodes) ); - } } - }; - graph.syncRequest(new ConnectionRequest(canvas, diagram, connection, errorHandler, loadListener), new Procedure() { - @Override - public void execute(final IElement e) { - } - @Override - public void exception(Throwable throwable) { - error(throwable); - } }); + + break; } case REMOVED: { @@ -2127,8 +2220,6 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID Listener loadListener = new DisposableListener(canvasListenerSupport) { - boolean firstTime = true; - @Override public String toString() { return "processBranchPoints for " + element; @@ -2144,21 +2235,6 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID return; } - if (firstTime) { - - mapElement(element, loaded); - synchronized (GraphToDiagramUpdater.this) { - addedBranchPoints.add(loaded); - addedElementMap.put(element, loaded); - ConnectionEntityImpl ce = getConnectionEntity(element); - loaded.setHint(ElementHints.KEY_CONNECTION_ENTITY, ce); - loaded.setHint(ElementHints.KEY_PARENT_ELEMENT, ce.getConnectionElement()); - } - - firstTime = false; - - } - Object data = loaded.getHint(ElementHints.KEY_OBJECT); if (addedElementMap.containsKey(data)) { // This element was just loaded, in @@ -2360,6 +2436,7 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID return "processBranchPoints"; } }); + } //System.out.println("---- PROCESS BRANCH POINTS END"); @@ -2394,9 +2471,9 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID task = Timing.BEGIN("processRouteGraphConnections"); if (!changes.routeGraphConnections.isEmpty()) { - graph.syncRequest(new ReadRequest() { + graph.syncRequest(new AsyncReadRequest() { @Override - public void run(ReadGraph graph) throws DatabaseException { + public void run(AsyncReadGraph graph) throws DatabaseException { processRouteGraphConnections(graph); } @Override @@ -2724,8 +2801,11 @@ public class GraphToDiagramSynchronizer extends AbstractDisposable implements ID diagramUpdateLock.lock(); try { - if (DebugPolicy.DEBUG_DIAGRAM_UPDATE) + if (DebugPolicy.DEBUG_DIAGRAM_UPDATE) { System.out.println("In diagramGraphUpdater:"); + System.out.println("-content = " + content); + System.out.println("-previousContent = " + previousContent); + } // Find out what has changed since the last query. Object task = Timing.BEGIN("diagramContentDifference"); diff --git a/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/NodeRequest2.java b/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/NodeRequest2.java new file mode 100644 index 000000000..ddc2cc256 --- /dev/null +++ b/bundles/org.simantics.diagram/src/org/simantics/diagram/adapter/NodeRequest2.java @@ -0,0 +1,128 @@ +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.diagram.adapter; + +import java.util.List; + +import org.simantics.db.AsyncReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.common.primitiverequest.Adapter; +import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener; +import org.simantics.db.procedure.AsyncProcedure; +import org.simantics.diagram.synchronization.ErrorHandler; +import org.simantics.g2d.canvas.ICanvasContext; +import org.simantics.g2d.diagram.IDiagram; +import org.simantics.g2d.diagram.handler.SubstituteElementClass; +import org.simantics.g2d.element.ElementClass; +import org.simantics.g2d.element.IElement; +import org.simantics.scl.runtime.tuple.Tuple3; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Antti Villberg + */ +public class NodeRequest2 extends BaseRequest2 { + + private static final Logger LOGGER = LoggerFactory.getLogger(NodeRequest2.class); + + final IDiagram diagram; + + public NodeRequest2(ICanvasContext canvas, IDiagram diagram, Resource resource) { + super(canvas, resource); + this.diagram = diagram; + } + + @Override + public void perform(AsyncReadGraph graph, final AsyncProcedure procedure) { + // Keep this code from crashing the whole DB client by unexpected + // throwing of NPE's somewhere in the following code that leads to + // procedure not getting called properly. + if (diagram == null) { + procedure.exception(graph, new NullPointerException("null diagram specified for resource " + data)); + return; + } + + final ErrorHandler eh = ElementFactoryUtil.getErrorHandler(diagram); + + graph.forHasStatement(data, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + eh.error("NodeRequest.forHasStatement failed", throwable); + procedure.execute(graph, null); + } + + @Override + public void execute(AsyncReadGraph graph, Boolean result) { + + if(!result) { + procedure.execute(graph, null); + return; + } + + graph.asyncRequest(new Adapter(data, ElementFactory.class), new TransientCacheAsyncListener() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + eh.error("NodeRequest.asyncRequest(Adapter) failed", throwable); + procedure.execute(graph, null); + } + + @Override + public void execute(AsyncReadGraph graph, final ElementFactory factory) { + + graph.asyncRequest(new GetElementClassRequest(factory, data, canvas, diagram), new TransientCacheAsyncListener() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + LOGGER.error("Unexpected error in GetElementClassRequest", throwable); + procedure.execute(graph, null); + } + + @Override + public void execute(AsyncReadGraph graph, ElementClass mutableClazz) { + List substitutes = diagram.getDiagramClass().getItemsByClass(SubstituteElementClass.class); + for (SubstituteElementClass subs : substitutes) { + mutableClazz = subs.substitute(diagram, mutableClazz); + } + final ElementClass clazz = mutableClazz; + graph.asyncRequest(new SpawnRequest(canvas, clazz, data), new TransientCacheAsyncListener() { + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + LOGGER.error("Unexpected error in SpawnRequest", throwable); + procedure.execute(graph, null); + } + + @Override + public void execute(AsyncReadGraph graph, IElement element) { + procedure.execute(graph, new Tuple3(element, clazz, factory)); + } + + }); + + } + + }); + + } + + }); + + } + + }); + + } + +} diff --git a/bundles/org.simantics.document.server/src/org/simantics/document/server/request/DocumentRequest.java b/bundles/org.simantics.document.server/src/org/simantics/document/server/request/DocumentRequest.java index 9a53060be..8acc2f0d1 100644 --- a/bundles/org.simantics.document.server/src/org/simantics/document/server/request/DocumentRequest.java +++ b/bundles/org.simantics.document.server/src/org/simantics/document/server/request/DocumentRequest.java @@ -7,11 +7,14 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.simantics.db.AsyncReadGraph; import org.simantics.db.ReadGraph; import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener; +import org.simantics.db.common.request.AsyncReadRequest; import org.simantics.db.exception.DatabaseException; import org.simantics.db.layer0.request.VariableRead; import org.simantics.db.layer0.variable.Variable; +import org.simantics.db.procedure.AsyncProcedure; import org.simantics.document.server.JSONObject; public class DocumentRequest extends VariableRead> { @@ -35,20 +38,43 @@ public class DocumentRequest extends VariableRead> { if(nodes.isEmpty()) { return Collections.emptyList(); } - - - /*TreeMap nodeMap = new TreeMap(); - - for (Variable node : nodes) { - nodeMap.put(node.getURI(graph), node); + + if(PROFILE) { + long dura = System.nanoTime()-s; + System.err.println("DocumentRequest1 " + System.identityHashCode(this) + " in " + 1e-6*dura + "ms. " + variable.getURI(graph)); } - System.out.println("*************************************************************************"); - for (Variable node : nodeMap.values()) { - System.out.println(" " + node.getURI(graph)); - }*/ - - for(Variable node : nodes) { - rs.add(graph.syncRequest(new NodeRequest(node), TransientCacheAsyncListener.instance())); + + graph.syncRequest(new AsyncReadRequest() { + + @Override + public void run(AsyncReadGraph graph) throws DatabaseException { + + for(Variable node : nodes) { + graph.asyncRequest(new NodeRequest(node), new AsyncProcedure () { + + @Override + public void execute(AsyncReadGraph graph, JSONObject result) { + synchronized (rs) { + rs.add(result); + } + } + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + } + + }); + + } + + } + + }); + + + if(PROFILE) { + long dura = System.nanoTime()-s; + System.err.println("DocumentRequest2 " + System.identityHashCode(this) + " in " + 1e-6*dura + "ms. " + variable.getURI(graph)); } ArrayList result = new ArrayList(rs); @@ -63,7 +89,7 @@ public class DocumentRequest extends VariableRead> { if(PROFILE) { long dura = System.nanoTime()-s; - System.err.println("DocumentRequest " + System.identityHashCode(this) + " in " + 1e-6*dura + "ms. " + variable.getURI(graph)); + System.err.println("DocumentRequest3 " + System.identityHashCode(this) + " in " + 1e-6*dura + "ms. " + variable.getURI(graph)); } return result; diff --git a/bundles/org.simantics.document.server/src/org/simantics/document/server/request/NodesRequest.java b/bundles/org.simantics.document.server/src/org/simantics/document/server/request/NodesRequest.java index f3f1177fc..99526368e 100644 --- a/bundles/org.simantics.document.server/src/org/simantics/document/server/request/NodesRequest.java +++ b/bundles/org.simantics.document.server/src/org/simantics/document/server/request/NodesRequest.java @@ -1,47 +1,72 @@ package org.simantics.document.server.request; -import gnu.trove.set.hash.THashSet; - import java.util.Collection; import java.util.Collections; import java.util.Set; +import org.simantics.db.AsyncReadGraph; import org.simantics.db.ReadGraph; +import org.simantics.db.common.request.AsyncReadRequest; import org.simantics.db.exception.DatabaseException; import org.simantics.db.layer0.request.VariableChildren; import org.simantics.db.layer0.request.VariableRead; import org.simantics.db.layer0.variable.Variable; +import org.simantics.db.procedure.AsyncProcedure; import org.simantics.structural.stubs.StructuralResource2; +import org.simantics.utils.threads.logger.ITask; +import org.simantics.utils.threads.logger.ThreadLogger; + +import gnu.trove.set.hash.THashSet; public class NodesRequest extends VariableRead> { public NodesRequest(Variable var) { super(var); - } - - @Override - public Set perform(ReadGraph graph) throws DatabaseException { - - long s = System.nanoTime(); - - StructuralResource2.getInstance(graph); - if(variable == null) - return Collections.emptySet(); - - Set nodes = new THashSet(); - Collection children = graph.syncRequest(new VariableChildren(variable)); - for(Variable child : children) { - Set childNodes = graph.syncRequest(new NodesRequest2(child)); - nodes.addAll(childNodes); - } - - if(DocumentRequest.PROFILE) { - long dura = System.nanoTime()-s; - System.err.println("NodesRequest " + System.identityHashCode(this) + " in " + 1e-6*dura + "ms. " + variable.getURI(graph)); - } - - return nodes; - - } + } + + @Override + public Set perform(ReadGraph graph) throws DatabaseException { + + ITask task = DocumentRequest.PROFILE ? ThreadLogger.task(this) : null; + + StructuralResource2.getInstance(graph); + if(variable == null) + return Collections.emptySet(); + + Set nodes = new THashSet(); + + Collection children = graph.syncRequest(new VariableChildren(variable)); + + graph.syncRequest(new AsyncReadRequest() { + + @Override + public void run(AsyncReadGraph graph) throws DatabaseException { + + for(Variable child : children) { + graph.asyncRequest(new NodesRequest2(child), new AsyncProcedure>() { + + @Override + public void execute(AsyncReadGraph graph, Set result) { + synchronized(nodes) { + nodes.addAll(result); + } + } + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + } + + }); + } + + } + + }); + + if(DocumentRequest.PROFILE) task.finish(); + + return nodes; + + } } \ No newline at end of file diff --git a/bundles/org.simantics.issues.common/src/org/simantics/issues/common/ChildMaxIssueSeverity.java b/bundles/org.simantics.issues.common/src/org/simantics/issues/common/ChildMaxIssueSeverity.java index 1103b5f26..80f22b785 100644 --- a/bundles/org.simantics.issues.common/src/org/simantics/issues/common/ChildMaxIssueSeverity.java +++ b/bundles/org.simantics.issues.common/src/org/simantics/issues/common/ChildMaxIssueSeverity.java @@ -11,87 +11,106 @@ *******************************************************************************/ package org.simantics.issues.common; +import java.util.Collection; import java.util.Collections; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.simantics.db.AsyncReadGraph; import org.simantics.db.Resource; import org.simantics.db.common.request.TernaryAsyncRead; -import org.simantics.db.procedure.AsyncMultiProcedure; +import org.simantics.db.exception.DatabaseException; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.issues.Severity; /** * @author Tuukka Lehtonen */ -public class ChildMaxIssueSeverity extends TernaryAsyncRead, Severity>{ +public class ChildMaxIssueSeverity extends TernaryAsyncRead, Severity> { + static class AsyncReadResult { + private AtomicReference resultRef; + private Throwable throwable; + private AtomicInteger counter = new AtomicInteger(1); + private AsyncProcedure procedure; + AsyncReadResult(AsyncProcedure procedure, AtomicReference resultRef) { + this.procedure = procedure; + this.resultRef = resultRef; + } + void except(AsyncReadGraph graph, Throwable throwable) { + this.throwable = throwable; + dec(graph); + } + void set(AsyncReadGraph graph, T result) { + resultRef.set(result); + dec(graph); + } + void inc() { + counter.incrementAndGet(); + } + void dec(AsyncReadGraph graph) { + if(counter.decrementAndGet() == 0) { + if(throwable != null) + procedure.exception(graph, throwable); + else + procedure.execute(graph, resultRef.get()); + } + } + + } + public ChildMaxIssueSeverity(Resource resource, Resource childRelation, Set typesToRecurse) { super(resource, childRelation, typesToRecurse); } -// @Override -// public Severity perform(ReadGraph graph) throws DatabaseException { -// Severity maxSeverity = null; -// //System.out.println("severityForChildren: " + NameUtils.getSafeName(graph, resource)); -// for (Resource child : graph.getObjects(resource, resource2)) { -// Severity s = graph.syncRequest(new MaxIssueSeverityRecursive(child)); -// maxSeverity = Severity.moreSevere(maxSeverity, s); -// } -// //System.out.println("severityForChildren: " + NameUtils.getSafeName(graph, resource) + " : " + maxSeverity); -// return maxSeverity; -// } - @Override public void perform(AsyncReadGraph graph, final AsyncProcedure procedure) { - //System.out.println(getClass().getSimpleName() + ": " + parameter); - graph.forTypes(parameter, new AsyncProcedure>() { - @Override - public void execute(AsyncReadGraph graph, Set result) { - if (!Collections.disjoint(parameter3, result)) { - checkChildren(graph, procedure); - } else { - procedure.execute(graph, null); - } - } - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - procedure.exception(graph, throwable); + try { + Set types = graph.getTypes(parameter); + if (!Collections.disjoint(parameter3, types)) { + checkChildren(graph, procedure); + } else { + procedure.execute(graph, null); } - }); + } catch (DatabaseException e) { + procedure.exception(graph, e); + } + } protected void checkChildren(AsyncReadGraph graph, final AsyncProcedure procedure) { - graph.forEachObject(parameter, parameter2, new AsyncMultiProcedure() { - AtomicReference maxSeverity = new AtomicReference(); - @Override - public void execute(AsyncReadGraph graph, Resource child) { + + AsyncReadResult maxSeverity = new AsyncReadResult(procedure, new AtomicReference()); + + try { + Collection children = graph.getObjects(parameter, parameter2); + for(Resource child : children) { + maxSeverity.inc(); graph.asyncRequest(new MaxIssueSeverityRecursive(child, parameter2, parameter3), new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, Severity severity) { if (severity != null) { synchronized (maxSeverity) { - maxSeverity.set(Severity.moreSevere(maxSeverity.get(), severity)); + maxSeverity.set(graph, Severity.moreSevere(maxSeverity.resultRef.get(), severity)); } + } else { + maxSeverity.dec(graph); } } @Override public void exception(AsyncReadGraph graph, Throwable throwable) { - procedure.exception(graph, throwable); + maxSeverity.except(graph, throwable); } }); } - @Override - public void finished(AsyncReadGraph graph) { - procedure.execute(graph, maxSeverity.get()); - } - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - procedure.exception(graph, throwable); - } - }); + maxSeverity.dec(graph); + } catch (DatabaseException e) { + maxSeverity.except(graph, e); + return; + } + } } diff --git a/bundles/org.simantics.issues.common/src/org/simantics/issues/common/MaxIssueSeverityRecursive.java b/bundles/org.simantics.issues.common/src/org/simantics/issues/common/MaxIssueSeverityRecursive.java index 7646271e3..ed3d6c2b9 100644 --- a/bundles/org.simantics.issues.common/src/org/simantics/issues/common/MaxIssueSeverityRecursive.java +++ b/bundles/org.simantics.issues.common/src/org/simantics/issues/common/MaxIssueSeverityRecursive.java @@ -11,91 +11,42 @@ *******************************************************************************/ package org.simantics.issues.common; +import java.util.Collection; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.simantics.db.AsyncReadGraph; +import org.simantics.db.ReadGraph; import org.simantics.db.Resource; -import org.simantics.db.common.request.TernaryAsyncRead; -import org.simantics.db.procedure.AsyncMultiProcedure; -import org.simantics.db.procedure.AsyncProcedure; +import org.simantics.db.common.request.TernaryRead; +import org.simantics.db.exception.DatabaseException; import org.simantics.issues.Severity; import org.simantics.issues.ontology.IssueResource; /** * @author Tuukka Lehtonen */ -public class MaxIssueSeverityRecursive extends TernaryAsyncRead, Severity> { +public class MaxIssueSeverityRecursive extends TernaryRead, Severity> { public MaxIssueSeverityRecursive(Resource resource, Resource childRelation, Set typesToRecurse) { super(resource, childRelation, typesToRecurse); } @Override - public void perform(AsyncReadGraph graph, final AsyncProcedure procedure) { + public Severity perform(ReadGraph graph) throws DatabaseException { IssueResource ISSUE = graph.getService(IssueResource.class); - AtomicInteger issues = new AtomicInteger(); - AtomicBoolean excepted = new AtomicBoolean(false); - - graph.forEachObject(parameter, ISSUE.Issue_HasContext_Inverse, new AsyncMultiProcedure() { - @Override - public void execute(AsyncReadGraph graph, Resource result) { - issues.incrementAndGet(); - } - @Override - public void finished(AsyncReadGraph graph) { - - } - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - if(excepted.compareAndSet(false, true)) - procedure.exception(graph, throwable); - } - }); - - graph.forEachObject(parameter, ISSUE.Issue_ContextList_Element_Inverse, new AsyncMultiProcedure() { - @Override - public void execute(AsyncReadGraph graph, Resource result) { - issues.incrementAndGet(); - } - @Override - public void finished(AsyncReadGraph graph) { - - } - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - if(excepted.compareAndSet(false, true)) - procedure.exception(graph, throwable); - } - }); - - if(excepted.get()) return; - - if (issues.get() == 0) { - // This content does not have directly attached issues, try to look - // for some in the child components. - graph.asyncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3), procedure); - } else { - // Try local issues first - graph.asyncRequest(new MaxIssueSeveritySingle(parameter), new AsyncProcedure() { - @Override - public void execute(AsyncReadGraph graph, Severity maxSeverity) { - if (maxSeverity == null) - // No severity for local issues, try children next. - graph.asyncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3), procedure); - else - procedure.execute(graph, maxSeverity); - } - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - if(excepted.compareAndSet(false, true)) - procedure.exception(graph, throwable); - } - }); + Collection inv = graph.getObjects(parameter, ISSUE.Issue_HasContext_Inverse); + Collection inv2 = graph.getObjects(parameter, ISSUE.Issue_ContextList_Element_Inverse); + if(inv.isEmpty() && inv2.isEmpty()) { + return graph.syncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3)); } + + Severity max = graph.syncRequest(new MaxIssueSeveritySingle(parameter)); + if(max != null) + return max; + else + return graph.syncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3)); + } } diff --git a/bundles/org.simantics.modeling.ui/src/org/simantics/modeling/ui/diagram/monitor/MonitorClassFactory2.java b/bundles/org.simantics.modeling.ui/src/org/simantics/modeling/ui/diagram/monitor/MonitorClassFactory2.java index c7a2b6385..75d3050b9 100644 --- a/bundles/org.simantics.modeling.ui/src/org/simantics/modeling/ui/diagram/monitor/MonitorClassFactory2.java +++ b/bundles/org.simantics.modeling.ui/src/org/simantics/modeling/ui/diagram/monitor/MonitorClassFactory2.java @@ -209,7 +209,7 @@ public class MonitorClassFactory2 extends SyncElementFactory { }); } - IElement mappedElement = diagram.getDiagramClass().getSingleItem(DataElementMap.class).getElement(diagram, element); + IElement mappedElement = e;//diagram.getDiagramClass().getSingleItem(DataElementMap.class).getElement(diagram, element); MonitorListener monitorListener = new MonitorListener(element, canvas, diagram, substitutions); if (mappedElement != null) { MonitorListener oldListener = mappedElement.getHint(KEY_VARIABLE_LISTENER);