package org.simantics.db.impl;
import org.simantics.db.AsyncReadGraph;
-import org.simantics.db.common.utils.Logger;
import org.simantics.db.exception.DatabaseException;
-import org.simantics.db.impl.graph.AsyncBarrierImpl;
+import org.simantics.db.impl.graph.BarrierTracing;
import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.query.AsyncReadEntry;
+import org.simantics.db.impl.query.PendingTaskSupport;
import org.simantics.db.procedure.AsyncProcedure;
public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
private static final Object NO_RESULT = new Object();
- private final Object key;
- private final AsyncBarrierImpl barrier;
- private final ReadGraphImpl procedureGraph;
- private final AsyncProcedure<Result> procedure;
+ public final Object key;
+ public final ReadGraphImpl queryGraph;
+ public final ReadGraphImpl callerGraph;
+ public final AsyncProcedure<Result> procedure;
+ public PendingTaskSupport pendingTaskSupport;
+ public Object result = NO_RESULT;
+ public Throwable exception = null;
- private Object result = NO_RESULT;
- private Throwable exception = null;
+ private ReadGraphImpl queryGraph() {
+ return queryGraph;
+ }
+
+ public BlockingAsyncProcedure(ReadGraphImpl callerGraph, AsyncReadEntry<Result> entry, AsyncProcedure<Result> procedure, Object key, boolean needsToBlock) {
+
+ // A new graph for evaluating the query with correct parent and asyncBarrier
+ queryGraph = callerGraph.withParent(entry, () -> {
+
+ dispatchProcedure(queryGraph(), callerGraph, entry, procedure, needsToBlock);
+
+ });
+
+ queryGraph.asyncBarrier.inc();
- public BlockingAsyncProcedure(AsyncBarrierImpl barrier, ReadGraphImpl procedureGraph, AsyncProcedure<Result> procedure, Object key) {
this.procedure = procedure;
this.key = key;
- this.barrier = barrier;
- this.barrier.inc();
- this.procedureGraph = procedureGraph;
+ this.queryGraph.asyncBarrier.inc();
+ this.callerGraph = callerGraph;
+ if (BarrierTracing.BOOKKEEPING) {
+ BarrierTracing.registerBAP(this);
+ }
}
@Override
public void execute(AsyncReadGraph graph_, Result result) {
+
this.result = result;
- try {
- if(procedure != null) procedure.execute(procedureGraph, result);
- } catch (Throwable throwable) {
- Logger.defaultLogError("AsyncProcedure.execute threw for " + procedure, throwable);
- } finally {
- barrier.dec();
- }
+ queryGraph.asyncBarrier.dec();
+
}
@Override
public void exception(AsyncReadGraph graph_, Throwable t) {
+
this.exception = t;
- try {
- if(procedure != null) procedure.exception(procedureGraph, t);
- } catch (Throwable throwable) {
- Logger.defaultLogError("AsyncProcedure.exception threw for " + procedure, throwable);
- } finally {
- barrier.dec();
- }
+ queryGraph.asyncBarrier.dec();
+
}
-
+
public void waitBarrier() {
- barrier.waitBarrier(key, procedureGraph);
+ queryGraph.asyncBarrier.waitBarrier(key, queryGraph);
+ }
+
+ public void dec() {
+
+ queryGraph.asyncBarrier.dec();
+
}
@SuppressWarnings("unchecked")
public Result get() throws DatabaseException {
- barrier.waitBarrier(key, procedureGraph);
+ queryGraph.asyncBarrier.waitBarrier(key, queryGraph);
if(exception != null) {
if(exception instanceof DatabaseException) throw (DatabaseException)exception;
}
- public boolean isDone() {
- return barrier.get() == 0;
- }
-
@SuppressWarnings("unchecked")
public Result getResult() {
return (Result)result;
public String toString() {
return "." + procedure;
}
+
+ private void dispatchProcedure(ReadGraphImpl queryGraph, ReadGraphImpl parentGraph, AsyncReadEntry<Result> entry, AsyncProcedure<Result> procedure_, boolean needsToBlock) {
+
+ AsyncProcedure<Result> procedure = entry != null ? entry : procedure_;
+
+ ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent);
+ executeGraph.asyncBarrier.inc();
+ try {
+ if(procedure != null) {
+ procedure.execute(executeGraph, get());
+ }
+ } catch (DatabaseException e) {
+ if(procedure != null) procedure.exception(executeGraph, e);
+ exception = e;
+ } catch (Throwable t) {
+ DatabaseException dbe = new DatabaseException(t);
+ if(procedure != null) procedure.exception(executeGraph, dbe);
+ exception = dbe;
+ } finally {
+
+ if (entry != null) {
+ assert(entry.isReady());
+ // This does not throw
+ entry.performFromCache(executeGraph, procedure_);
+ }
+
+ executeGraph.asyncBarrier.dec();
+ if(needsToBlock)
+ executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph);
+ }
+
+ if (BarrierTracing.BOOKKEEPING) {
+ BarrierTracing.unregisterBAP(this);
+ }
+
+ }
}
import org.simantics.db.exception.RuntimeDatabaseException;
import org.simantics.db.impl.query.CacheEntry;
import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
private static final long serialVersionUID = 4724463372850048672L;
- static final int WAIT_TIME = 600;
+ static final int WAIT_TIME = 60000;
public static final boolean PRINT = false;
final public AsyncBarrierImpl caller;
+
+ final public Runnable callback;
- public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry) {
+ public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry, Runnable callback) {
super(0);
this.caller = caller;
+ this.callback = callback;
if (BarrierTracing.BOOKKEEPING) {
BarrierTracing.trace(this, entry);
}
new Exception());
}
assert (count >= 0);
+
+ if(callback != null)
+ callback.run();
+
}
}
e.printStackTrace();
}
}
- if(waitCount > WAIT_TIME*1000) {
+ if(waitCount > WAIT_TIME) {
System.err.println("AsyncBarrierImpl.waitBarrier("
+ request
synchronized (BarrierTracing.reverseLookup) {
printReverse(this, 0);
}
+ BarrierTracing.printBAPS();
+ }
+
+ for(SessionTask t : impl.processor.freeScheduling) {
+ System.err.println("Pending task:" + t);
}
// if(Development.DEVELOPMENT) {
import java.util.HashMap;
import java.util.Map;
+import org.simantics.db.impl.BlockingAsyncProcedure;
import org.simantics.db.impl.query.CacheEntry;
import org.simantics.db.impl.query.QueryProcessor.SessionTask;
public class BarrierTracing {
public static final boolean BOOKKEEPING = false;
- static final boolean RESTART_GUARD = BOOKKEEPING && false;
+ static final boolean RESTART_GUARD = BOOKKEEPING && true;
public static Map<SessionTask,Exception> tasks = new HashMap<>();
public static final HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>> reverseLookup = new HashMap<>();
public static final HashMap<AsyncBarrierImpl, CacheEntry<?>> entryMap = new HashMap<>();
public static final HashMap<AsyncBarrierImpl, Throwable> restartMap = new HashMap<>();
public static final HashMap<AsyncBarrierImpl, Throwable> startMap = new HashMap<>();
+ public static final HashMap<BlockingAsyncProcedure, Throwable> baps = new HashMap<>();
+ synchronized public static void registerBAP(BlockingAsyncProcedure bap) {
+ baps.put(bap, new Exception());
+ }
+
+ synchronized public static void unregisterBAP(BlockingAsyncProcedure bap) {
+ baps.remove(bap);
+ }
+
+ synchronized public static void printBAPS() {
+ for(BlockingAsyncProcedure bap : baps.keySet()) {
+ Throwable e = baps.get(bap);
+ System.err.println("BlockingAsyncProcedure");
+ System.err.println("-key: " + bap.key);
+ System.err.println("-queryGraph: " + bap.queryGraph);
+ System.err.println("-callerGraph: " + bap.callerGraph);
+ System.err.println("-procedure: " + bap.procedure);
+ System.err.println("-pendingTaskSupport: " + bap.pendingTaskSupport);
+ System.err.println("-result: " + bap.result);
+ System.err.println("-exception: " + bap.exception);
+ e.printStackTrace();
+ }
+ }
+
public static void trace(AsyncBarrierImpl barrier, CacheEntry<?> entry) {
if (RESTART_GUARD) {
assert (request != null);
assert (procedure != null);
-
+
+ AsyncBarrierImpl barrier = asyncBarrier;
+ if(barrier != null)
+ barrier.inc();
+
processor.scheduleNow(new SessionTask(this) {
@Override
QueryCache.runnerReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false);
} catch (DatabaseException e) {
Logger.defaultLogError(e);
+ } finally {
+ if(barrier != null)
+ barrier.dec();
}
}
assert (procedure != null);
ITask task = ThreadLogger.task(request);
+
+ AsyncBarrierImpl barrier = asyncBarrier;
+ if(barrier != null)
+ barrier.inc();
processor.scheduleNow(new SessionTask(this) {
@Override
public void run0(int thread) {
- try {
+
+ if(barrier != null)
+ barrier.inc();
+
+ try {
final ListenerBase listener = getListenerBase(procedure);
QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, new AsyncProcedure<T>() {
public void execute(AsyncReadGraph graph, T result) {
task.finish();
procedure.execute(graph, result);
+ if(barrier != null)
+ barrier.dec();
}
@Override
public void exception(AsyncReadGraph graph, Throwable throwable) {
task.finish();
procedure.exception(graph, throwable);
+ if(barrier != null)
+ barrier.dec();
}
}, false);
+ if(barrier != null)
+ barrier.dec();
} catch (DatabaseException e) {
Logger.defaultLogError(e);
}
this.parentGraph = parentGraph;
this.parent = parent;
this.processor = support;
- this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent);
+ this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, null);
}
+ ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support, AsyncBarrierImpl asyncBarrier) {
+ this.parentGraph = parentGraph;
+ this.parent = parent;
+ this.processor = support;
+ this.asyncBarrier = asyncBarrier;
+ }
+
ReadGraphImpl(ReadGraphImpl graph, CacheEntry parent) {
this(graph, parent, graph.processor);
}
+ ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback) {
+ this(parentGraph, parent, parentGraph.processor, new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback));
+ }
+
ReadGraphImpl(ReadGraphImpl graph) {
this(graph, graph.parent);
}
- public ReadGraphImpl withParent(CacheEntry parent) {
- return new ReadGraphImpl(this, parent);
+ public ReadGraphImpl withParent(CacheEntry parent, Runnable callback) {
+ return new ReadGraphImpl(this, parent, callback);
}
+ public ReadGraphImpl withParent(CacheEntry parent) {
+ return withParent(parent, null);
+ }
+
+ public ReadGraphImpl syncWithParent(CacheEntry parent) {
+ return new ReadGraphImpl(this, parent, processor, null);
+ }
+
public ReadGraphImpl forRecompute(CacheEntry parent) {
return new ReadGraphImpl(null, parent, processor);
}
public static ReadGraphImpl create(QueryProcessor support) {
- return new ReadGraphImpl(null, null, support);
+ ReadGraphImpl result = new ReadGraphImpl(null, null, support);
+ return result;
}
public ReadGraphImpl newRestart(ReadGraphImpl impl) {
else return 1 + getLevelStatic(impl.parentGraph);
}
+ public boolean isParent(ReadGraphImpl impl) {
+ if(impl == null) return false;
+ if(this == impl) return true;
+ return isParent(impl.parentGraph);
+ }
+
public ReadGraphImpl getTopLevelGraph() {
return getTopLevelGraphStatic(this);
}
*******************************************************************************/
package org.simantics.db.impl.query;
+import java.util.Collection;
+
import org.simantics.databoard.Bindings;
import org.simantics.db.AsyncReadGraph;
import org.simantics.db.DevelopmentKeys;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.impl.BlockingAsyncProcedure;
-import org.simantics.db.impl.graph.AsyncBarrierImpl;
-import org.simantics.db.impl.graph.BarrierTracing;
import org.simantics.db.impl.graph.ReadGraphImpl;
import org.simantics.db.impl.query.QueryProcessor.SessionTask;
import org.simantics.db.procedure.AsyncProcedure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements AsyncProcedure<T> {
+final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements AsyncProcedure<T>, IPending {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncReadEntry.class);
protected AsyncRead<T> id;
+ protected PendingTaskSupport pendingTaskSupport;
AsyncReadEntry(AsyncRead<T> request) {
this.id = request;
try {
- BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(graph.asyncBarrier, graph, new AsyncProcedure<T>() {
+ BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure(graph, AsyncReadEntry.this, new AsyncProcedure<T>() {
@Override
public void execute(AsyncReadGraph graph, T result) {
except(t);
}
- }, id);
-
- id.perform(graph, proc);
+ }, id, true);
+ id.perform(proc.queryGraph, proc);
+
+ proc.dec();
proc.get();
} catch (Throwable t) {
}
- public static <T> T computeForEach(ReadGraphImpl graph, AsyncRead<T> request, AsyncReadEntry<T> entry,
+ public static <T> T computeForEach(ReadGraphImpl callerGraph, AsyncRead<T> request, AsyncReadEntry<T> entry,
AsyncProcedure<T> procedure_, boolean needsToBlock) throws DatabaseException {
- AsyncProcedure<T> procedure = entry != null ? entry : procedure_;
-
- ReadGraphImpl queryGraph = graph.withParent(entry);
- queryGraph.asyncBarrier.inc();
-
- BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph.asyncBarrier, graph, null, request);
-
- class AsyncTask extends SessionTask {
-
- int counter = 0;
- T result;
- DatabaseException exception;
-
- public AsyncTask(ReadGraphImpl graph) {
- this(graph, 1);
- }
-
- public AsyncTask(ReadGraphImpl graph, int pos) {
- super(graph);
- this.position = pos;
- if(this.position < 1024)
- this.position *= 2;
- }
-
- @Override
- public void run0(int thread) {
- if(needsToBlock) proc.waitBarrier();
- if(proc.isDone()) {
- ReadGraphImpl executeGraph = graph.withParent(graph.parent);
- executeGraph.asyncBarrier.inc();
- try {
- result = (T)proc.get();
- if(procedure != null) {
- procedure.execute(executeGraph, result);
- }
- } catch (DatabaseException e) {
- if(procedure != null) procedure.exception(executeGraph, e);
- exception = e;
- } catch (Throwable t) {
- DatabaseException dbe = new DatabaseException(t);
- if(procedure != null) procedure.exception(executeGraph, dbe);
- exception = dbe;
- } finally {
- if (entry != null) {
- // This does not throw
- entry.performFromCache(executeGraph, procedure_);
- }
- executeGraph.asyncBarrier.dec();
- executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph);
- }
- } else {
- if(counter++ > 10000) {
- if(BarrierTracing.BOOKKEEPING) {
- AsyncBarrierImpl.printReverse(queryGraph.asyncBarrier, 2);
- AsyncBarrierImpl caller = queryGraph.asyncBarrier.caller;
- while(caller != null) {
- System.err.println("called by " + AsyncBarrierImpl.report(caller));
- caller = caller.caller;
- }
- for(AsyncBarrierImpl ab : BarrierTracing.debuggerMap.keySet()) {
- AsyncBarrierImpl.printReverse(ab, 2);
- }
- }
- throw new IllegalStateException("Eternal loop in queries.");
- }
- graph.processor.scheduleLater(new AsyncTask(graph, position));
- }
- }
-
- @Override
- public boolean maybeReady() {
- return proc.isDone();
- }
-
- }
+ BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure(callerGraph, entry, procedure_, request, needsToBlock);
try {
- request.perform(queryGraph, proc);
+ request.perform(proc.queryGraph, proc);
} finally {
- queryGraph.asyncBarrier.dec();
+ proc.queryGraph.asyncBarrier.dec();
}
- AsyncTask task = new AsyncTask(graph);
-
- if(needsToBlock) task.run(0);
- else if (proc.isDone()) task.run(0);
- else {
- graph.processor.scheduleLater(task);
+ if(needsToBlock) {
+ proc.waitBarrier();
+ return proc.get();
+ } else {
return null;
}
- if(task.exception != null) throw task.exception;
- else return task.result;
-
}
@Override
@Override
public void execute(AsyncReadGraph graph, T result) {
- setResult(result);
- setReady();
+ Collection<SessionTask> tasks = null;
+ synchronized(this) {
+ setResult(result);
+ setReady();
+ if(pendingTaskSupport != null)
+ tasks = pendingTaskSupport.executePending();
+ }
+ if(tasks != null)
+ for(SessionTask task : tasks)
+ ((ReadGraphImpl)graph).processor.scheduleNow(task);
}
@Override
- public void exception(AsyncReadGraph graph, Throwable throwable) {
- except(throwable);
+ public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
+ Collection<SessionTask> tasks = null;
+ synchronized(this) {
+ except(throwable);
+ if(pendingTaskSupport != null)
+ tasks = pendingTaskSupport.executePending();
+ }
+ if(tasks != null)
+ for(SessionTask task : tasks)
+ ((ReadGraphImpl)graph).processor.scheduleNow(task);
+ }
+
+ public void executeWhenResultIsAvailable(QueryProcessor processor, SessionTask task) {
+ boolean ready = false;
+ synchronized(this) {
+ if(pendingTaskSupport == null)
+ pendingTaskSupport = new PendingTaskSupport(this);
+ ready = pendingTaskSupport.executeWhenResultIsAvailable(task);
+ }
+ if(ready) {
+ processor.scheduleNow(task);
+ }
}
}
--- /dev/null
+package org.simantics.db.impl.query;
+
+public interface IPending {
+
+ boolean isPending();
+
+}
--- /dev/null
+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
+
+/*
+ * Support class for queuing pending tasks to be executed when result gets ready
+ */
+public class PendingTaskSupport {
+
+ private ArrayList<SessionTask> pendingTasks;
+ private IPending pending;
+
+ public PendingTaskSupport(IPending pending) {
+ this.pending = pending;
+ }
+
+ /*
+ * We assume here that the associated IPending performs this atomically
+ * The caller is responsible for execution of the returned task after the critical section
+ */
+ public boolean executeWhenResultIsAvailable(SessionTask task) {
+ if(pending.isPending()) {
+ if(pendingTasks == null)
+ pendingTasks = new ArrayList<SessionTask>();
+ pendingTasks.add(task);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ /*
+ * We assume here that the associated IPending performs this atomically after changing the pending result
+ * The caller is responsible for execution of the returned task after the critical section
+ */
+ public Collection<SessionTask> executePending() {
+ ArrayList<SessionTask> ret = pendingTasks;
+ pendingTasks = null;
+ return ret;
+ }
+
+}
}
ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock);
if(entry == null) {
- graph.processor.scheduleNow(new SessionTask(graph) {
+ graph.asyncBarrier.inc();
+ graph.processor.scheduleNow(new SessionTask() {
@Override
public void run0(int thread) {
try {
runnerReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+ graph.asyncBarrier.dec();
} catch (DatabaseException e) {
Logger.defaultLogError(e);
}
}
}
- AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, boolean needsToBlock) throws DatabaseException {
+ AsyncReadEntry getOrCreateAsyncReadEntry(ReadGraphImpl graph, AsyncRead<?> r, CacheEntry parent, ListenerBase listener, final AsyncProcedure procedure, boolean needsToBlock) throws DatabaseException {
AsyncReadEntry existing = null;
synchronized(asyncReadEntryMap) {
existing = (AsyncReadEntry)asyncReadEntryMap.get(r);
}
}
if(existing.isPending()) {
- if(needsToBlock)
- waitPending(graph, existing);
- else {
- return null;
- }
+ if(needsToBlock)
+ waitPending(graph, existing);
+ else {
+ existing.executeWhenResultIsAvailable(graph.processor, new SessionTask(graph) {
+ @Override
+ public void run0(int thread) {
+ try {
+ runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+ });
+ return null;
+ }
}
return existing;
}
}
return AsyncReadEntry.computeForEach(graph, r, null, procedure, needsToBlock);
}
- AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock);
+ AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
if(entry == null) {
- graph.processor.scheduleNow(new SessionTask(graph) {
- @Override
- public void run0(int thread) {
- try {
- runnerAsyncReadEntry(graph, r, parent, listener, procedure, needsToBlock);
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
- }
- });
- return null;
+ // Entry was pending and this request has been queued
+ return null;
}
AsyncProcedure procedure_ = procedure != null ? procedure : emptyProcedureAsyncReadEntry;
if(entry.isReady()) {
int counter = 0;
while(entry.isPending()) {
try {
- boolean performed = graph.performPending();
+ boolean performed = false;//graph.performPending();
if(!performed) {
Thread.sleep(1);
counter++;
QueryThread[] executors;
public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
+
+ public LinkedList<SessionTask> topLevelTasks = new LinkedList<SessionTask>();
enum ThreadState {
public void close() {
}
- public SessionTask getSubTask(ReadGraphImpl impl) {
+ public SessionTask getSubTask(ReadGraphImpl parent) {
synchronized(querySupportLock) {
int index = 0;
while(index < freeScheduling.size()) {
SessionTask task = freeScheduling.get(index);
- if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) {
- queueLength.decrementAndGet();
+ if(task.isSubtask(parent) && task.maybeReady()) {
return freeScheduling.remove(index);
}
index++;
* We are running errands while waiting for requests to complete.
* We can only run work that is part of the current root request to avoid any deadlocks
*/
- public boolean performPending(ReadGraphImpl graph) {
- SessionTask task = getSubTask(graph);
+ public boolean performPending(ReadGraphImpl under) {
+ SessionTask task = getSubTask(under);
if(task != null) {
- task.run(QueryProcessor.thread.get());
+ task.run(thread.get());
return true;
}
return false;
}
-
+
final public void scheduleNow(SessionTask request) {
- schedule(request, false);
+ SessionTask toExecute = scheduleOrReturnForExecution(request);
+ if(toExecute != null)
+ toExecute.run(thread.get());
}
- final public void scheduleLater(SessionTask request) {
- schedule(request, true);
- }
+ final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
- AtomicInteger queueLength = new AtomicInteger(0);
-
- final public void schedule(SessionTask request, boolean late) {
-
- int queueLengthEstimate = queueLength.get();
- if(!late && queueLengthEstimate > 80) {
- request.run(thread.get());
- return;
- }
-
- assert(request != null);
-
- synchronized(querySupportLock) {
+ assert(request != null);
- if(BarrierTracing.BOOKKEEPING) {
- Exception current = new Exception();
- Exception previous = BarrierTracing.tasks.put(request, current);
- if(previous != null) {
- previous.printStackTrace();
- current.printStackTrace();
- }
- }
+ synchronized(querySupportLock) {
- if(late) {
- int pos = request.position - 1;
- if(pos < freeScheduling.size()) {
- freeScheduling.add(pos, request);
- queueLength.incrementAndGet();
- requests.release();
- } else {
- freeScheduling.addLast(request);
- queueLength.incrementAndGet();
- requests.release();
- }
- }
- else {
- if(request.getLevel() < 4) {
- if(freeScheduling.size() < 100) {
- freeScheduling.addFirst(request);
- queueLength.incrementAndGet();
- requests.release();
- } else {
- request.run(thread.get());
- }
- } else {
- if(freeScheduling.size() < 20) {
- freeScheduling.addFirst(request);
- queueLength.incrementAndGet();
- requests.release();
- } else {
- request.run(thread.get());
- }
- }
- }
+ LinkedList<SessionTask> queue = request.rootGraph != null ? freeScheduling : topLevelTasks;
+
+ if(BarrierTracing.BOOKKEEPING) {
+ Exception current = new Exception();
+ Exception previous = BarrierTracing.tasks.put(request, current);
+ if(previous != null) {
+ previous.printStackTrace();
+ current.printStackTrace();
+ }
+ }
+ queue.addFirst(request);
+ requests.release();
- }
+ }
- }
+ return null;
+
+ }
final int THREADS;
public static abstract class SessionTask {
- public final ReadGraphImpl graph;
+ final protected ReadGraphImpl rootGraph;
private int counter = 0;
protected int position = 1;
private Exception trace;
- public SessionTask(ReadGraphImpl graph) {
- this.graph = graph;
- if(graph != null) graph.asyncBarrier.inc();
+ public SessionTask() {
+ this(null);
}
-
- public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) {
- if(r1 == null || r2 == null) return false;
- return r1.getTopLevelGraph() == r2.getTopLevelGraph();
- }
+
+ public SessionTask(ReadGraphImpl rootGraph) {
+ this.rootGraph = rootGraph;
+ }
+
+ public boolean isSubtask(ReadGraphImpl graph) {
+ return graph.isParent(rootGraph);
+ }
public abstract void run0(int thread);
trace = new Exception();
}
run0(thread);
- if(graph != null) graph.asyncBarrier.dec();
+ //if(graph != null && graph.asyncBarrier != null) graph.asyncBarrier.dec();
}
public boolean maybeReady() {
@Override
public String toString() {
- if(graph == null)
+// if(graph == null)
return "SessionTask[no graph]";
- else
- return "SessionTask[" + graph.parent + "]";
+// else
+// return "SessionTask[" + graph.parent + "]";
}
- public int getLevel() {
- if(graph == null) return 0;
- else return graph.getLevel();
- }
+// public int getLevel() {
+// if(graph == null) return 0;
+// else return graph.getLevel();
+// }
}
}
private boolean pumpTask() {
- if(!processor.freeScheduling.isEmpty()) {
- tasks.add(processor.freeScheduling.removeFirst());
- processor.queueLength.decrementAndGet();
- return true;
- }
+ // First finish existing executions
+ if(!processor.freeScheduling.isEmpty()) {
+ tasks.add(processor.freeScheduling.removeFirst());
+ return true;
+ }
+ // Check for new tasks
+ if(!processor.topLevelTasks.isEmpty()) {
+ tasks.add(processor.topLevelTasks.removeFirst());
+ return true;
+ }
return false;
}
@Override
public void run() {
- processor.thread.set(index);
-
QuerySupport support = this.querySupport;
try {
while(!tasks.isEmpty()) {
SessionTask task = tasks.remove(tasks.size()-1);
-// System.err.println("QT " + index + " runs " + task);
- task.run(index);
+ task.run(0);
}
-// for(int performer=0;performer<THREADS;performer++) {
-// if(!delayQueues[index * THREADS + performer].isEmpty()) {
-// synchronized(executors[performer]) {
-// threadLocks[performer].lock();
-// queues[performer].addAll(delayQueues[index * THREADS + performer]);
-// delayQueues[index * THREADS + performer].clear();
-// executors[performer].notify();
-// threadLocks[performer].unlock();
-// }
-// }
-// }
-
}
} catch (Throwable t) {
} finally {
executeGraph.asyncBarrier.dec();
- executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph);
+ if(needsToBlock)
+ executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph);
}
} else {
- BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request) {
+ BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
public void execute(AsyncReadGraph graph_, T result) {
task.finish();
try {
request.perform(newGraph, wrap);
+ wrap.dec();
wrap.get();
} catch (DatabaseException e) {
public int getAmountOfQueryThreads() {
// This must be a power of two
- return 1;
+ return 4;
// return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
}
--- /dev/null
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.diagram.adapter;
+
+import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.Resource;
+import org.simantics.db.common.primitiverequest.Adapter;
+import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.diagram.synchronization.ErrorHandler;
+import org.simantics.g2d.canvas.ICanvasContext;
+import org.simantics.g2d.diagram.IDiagram;
+import org.simantics.g2d.element.ElementClass;
+import org.simantics.g2d.element.IElement;
+import org.simantics.scl.runtime.tuple.Tuple3;
+
+/**
+ * @author Antti Villberg
+ */
+public class ConnectionRequest2 extends BaseRequest2<Resource, Tuple3> {
+
+ final IDiagram diagram;
+ final ErrorHandler errorHandler;
+
+ public ConnectionRequest2(ICanvasContext canvas, IDiagram diagram, Resource resource, ErrorHandler errorHandler) {
+ super(canvas, resource);
+ this.diagram = diagram;
+ this.errorHandler = errorHandler;
+ }
+
+ @Override
+ public void perform(AsyncReadGraph graph, final AsyncProcedure<Tuple3> procedure) {
+
+ graph.forHasStatement(data, new AsyncProcedure<Boolean>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ procedure.exception(graph, throwable);
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, Boolean result) {
+
+ if (!result) {
+ procedure.execute(graph, null);
+ return;
+ }
+
+ graph.asyncRequest(new Adapter<ElementFactory>(data, ElementFactory.class), new TransientCacheAsyncListener<ElementFactory>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ errorHandler.error("Unexpected ElementFactory adaption failure", throwable);
+ procedure.execute(graph, null);
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, final ElementFactory factory) {
+
+ graph.asyncRequest(new GetElementClassRequest(factory, data, canvas, diagram), new TransientCacheAsyncListener<ElementClass>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ errorHandler.error("Unexpected ElementClass creation failure", throwable);
+ procedure.execute(graph, null);
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, final ElementClass ec) {
+
+ graph.asyncRequest(new SpawnRequest(canvas, ec, data), new TransientCacheAsyncListener<IElement>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ errorHandler.error("Unexpected SpawnRequest failure", throwable);
+ procedure.execute(graph, null);
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, IElement element) {
+ procedure.execute(graph, new Tuple3(element, ec, factory));
+ }
+ });
+ }
+ });
+ }
+ });
+ }
+ });
+ }
+
+}
import org.simantics.scenegraph.profile.DataNodeConstants;
import org.simantics.scenegraph.profile.DataNodeMap;
import org.simantics.scenegraph.profile.common.ProfileObserver;
+import org.simantics.scl.runtime.tuple.Tuple3;
import org.simantics.structural2.modelingRules.IModelingRules;
import org.simantics.utils.datastructures.ArrayMap;
import org.simantics.utils.datastructures.MapSet;
this.removedRouteGraphConnections.clear();
}
+ class LoadNodeListener extends DisposableListener<IElement> {
+
+ final Resource element;
+ public IElement lastLoaded;
+
+ public LoadNodeListener(ListenerSupport support, Resource element) {
+ super(support);
+ this.element = element;
+ }
+
+ @Override
+ public String toString() {
+ return "Node load listener for " + element;
+ }
+
+ public void applyFirst(IElement loaded) {
+
+ Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+
+ if (addedElementMap.containsKey(data)) {
+ // This element was just loaded, in
+ // which case its hints need to
+ // uploaded to the real mapped
+ // element immediately.
+ IElement mappedElement = getMappedElement(data);
+ if (DebugPolicy.DEBUG_NODE_LISTENER)
+ System.out.println("LOADED ADDED ELEMENT, currently mapped element: " + mappedElement);
+ if (mappedElement != null && (mappedElement instanceof Element)) {
+ if (DebugPolicy.DEBUG_NODE_LISTENER) {
+ System.out.println(" mapped hints: " + mappedElement.getHints());
+ System.out.println(" loaded hints: " + loaded.getHints());
+ }
+ updateMappedElement((Element) mappedElement, loaded);
+ }
+ }
+
+ }
+
+ @Override
+ public void execute(IElement loaded) {
+
+ // Invoked when the element has been loaded.
+ if (DebugPolicy.DEBUG_NODE_LISTENER)
+ System.out.println("NODE LoadListener for " + loaded);
+
+ if (loaded == null) {
+ disposeListener();
+ return;
+ }
+
+
+ boolean first = lastLoaded == null;
+
+ lastLoaded = loaded;
+
+ /*
+ * The first invocation is postponed
+ */
+ if(first) {
+ applyFirst(loaded);
+ return;
+ }
+
+ Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+
+ // Logic for disposing listener
+ if (!previousContent.nodeSet.contains(data)) {
+ if (DebugPolicy.DEBUG_NODE_LISTENER)
+ System.out.println("NODE LoadListener, node not in current content: " + data + ". Disposing.");
+ disposeListener();
+ return;
+ }
+
+ // This element was already loaded.
+ // Just schedule an update some time
+ // in the future.
+ if (DebugPolicy.DEBUG_NODE_LISTENER)
+ System.out.println("PREVIOUSLY LOADED NODE UPDATED, scheduling update into the future");
+ offerGraphUpdate( nodeUpdater(element, loaded) );
+
+ }
+
+ }
+
void processNodes(AsyncReadGraph graph) throws DatabaseException {
for (Map.Entry<Resource, Change> entry : changes.elements.entrySet()) {
@Override
public void execute(AsyncReadGraph graph, final IElement e) {
+ mapElement(element, e);
+ synchronized (GraphToDiagramUpdater.this) {
+ addedElements.add(e);
+ addedElementMap.put(element, e);
+ addedConnectionMap.put(element, e);
+ }
+
// Read connection type
graph.forSingleType(element, br.DIA.Connection, new Procedure<Resource>() {
@Override
});
} else if (content.nodeSet.contains(element)) {
- Listener<IElement> loadListener = new DisposableListener<IElement>(canvasListenerSupport) {
-
- boolean firstTime = true;
+ graph.asyncRequest(new ReadRequest() {
@Override
- public String toString() {
- return "Node load listener for " + element;
- }
- @Override
- public void execute(IElement loaded) {
- // Invoked when the element has been loaded.
- if (DebugPolicy.DEBUG_NODE_LISTENER)
- System.out.println("NODE LoadListener for " + loaded);
-
- if (loaded == null) {
- disposeListener();
- return;
- }
-
- if (firstTime) {
-
- // This is invoked before the element is actually loaded.
- //System.out.println("NodeRequestProcedure " + e);
- if (DebugPolicy.DEBUG_NODE_LOAD)
- System.out.println("MAPPING ADDED NODE: " + element + " -> " + loaded);
- mapElement(element, loaded);
- synchronized (GraphToDiagramUpdater.this) {
- addedElements.add(loaded);
- addedElementMap.put(element, loaded);
- }
-
- firstTime = false;
-
- }
-
- Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+ public void run(ReadGraph graph) throws DatabaseException {
- // Logic for disposing listener
- if (!previousContent.nodeSet.contains(data)) {
- if (DebugPolicy.DEBUG_NODE_LISTENER)
- System.out.println("NODE LoadListener, node not in current content: " + data + ". Disposing.");
- disposeListener();
+ LoadNodeListener loadListener = new LoadNodeListener(canvasListenerSupport, element);
+ Tuple3 t = graph.syncRequest(new NodeRequest2(canvas, diagram, element));
+ IElement e = (IElement)t.c0;
+ ElementClass ec = (ElementClass)t.c1;
+ org.simantics.diagram.adapter.ElementFactory ef = (org.simantics.diagram.adapter.ElementFactory)t.c2;
+ if (e == null)
return;
+
+ // This is invoked before the element is actually loaded.
+ //System.out.println("NodeRequestProcedure " + e);
+ if (DebugPolicy.DEBUG_NODE_LOAD)
+ System.out.println("MAPPING ADDED NODE: " + element + " -> " + e);
+ mapElement(element, e);
+ synchronized (GraphToDiagramUpdater.this) {
+ addedElements.add(e);
+ addedElementMap.put(element, e);
}
- if (addedElementMap.containsKey(data)) {
- // This element was just loaded, in
- // which case its hints need to
- // uploaded to the real mapped
- // element immediately.
- IElement mappedElement = getMappedElement(data);
- if (DebugPolicy.DEBUG_NODE_LISTENER)
- System.out.println("LOADED ADDED ELEMENT, currently mapped element: " + mappedElement);
- if (mappedElement != null && (mappedElement instanceof Element)) {
- if (DebugPolicy.DEBUG_NODE_LISTENER) {
- System.out.println(" mapped hints: " + mappedElement.getHints());
- System.out.println(" loaded hints: " + loaded.getHints());
- }
- updateMappedElement((Element) mappedElement, loaded);
- }
- } else {
- // This element was already loaded.
- // Just schedule an update some time
- // in the future.
- if (DebugPolicy.DEBUG_NODE_LISTENER)
- System.out.println("PREVIOUSLY LOADED NODE UPDATED, scheduling update into the future");
- offerGraphUpdate( nodeUpdater(element, loaded) );
- }
- }
- };
-
- //System.out.println("NODE REQUEST: " + element);
- graph.asyncRequest(new NodeRequest(canvas, diagram, element, loadListener), new AsyncProcedure<IElement>() {
- @Override
- public void execute(AsyncReadGraph graph, IElement e) {
- }
-
- @Override
- public void exception(AsyncReadGraph graph, Throwable throwable) {
- error(throwable);
+ graph.syncRequest(new LoadRequest(canvas, diagram, ef, ec, element), loadListener);
+
}
+
});
} else {
}
}
}
+
+ class LoadRouteGraphConnectionListener extends DisposableListener<IElement> {
+
+ final Resource connection;
+ public IElement lastLoaded;
+
+ public LoadRouteGraphConnectionListener(ListenerSupport support, Resource connection) {
+ super(support);
+ this.connection = connection;
+ }
+
+ @Override
+ public String toString() {
+ return "processRouteGraphConnections " + connection;
+ }
+
+ public void applyFirst(IElement loaded) {
+
+ Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+ if (addedElementMap.containsKey(data)) {
+ // This element was just loaded, in
+ // which case its hints need to
+ // uploaded to the real mapped
+ // element immediately.
+ IElement mappedElement = getMappedElement(data);
+ if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+ System.out.println("LOADED ADDED ROUTE GRAPH CONNECTION, currently mapped connection: " + mappedElement);
+ if (mappedElement instanceof Element) {
+ if (DebugPolicy.DEBUG_CONNECTION_LISTENER) {
+ System.out.println(" mapped hints: " + mappedElement.getHints());
+ System.out.println(" loaded hints: " + loaded.getHints());
+ }
+ updateMappedElement((Element) mappedElement, loaded);
+ }
+ }
+
+ }
+
+ @Override
+ public void execute(IElement loaded) {
+
+ // Invoked when the element has been loaded.
+ if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+ System.out.println("ROUTE GRAPH CONNECTION LoadListener for " + loaded);
+
+ if (loaded == null) {
+ disposeListener();
+ return;
+ }
+
+ boolean first = lastLoaded == null;
+
+ lastLoaded = loaded;
+
+ /*
+ * The first invocation is postponed
+ */
+ if(first) {
+ applyFirst(loaded);
+ return;
+ }
+
+ Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+
+ // Logic for disposing listener
+ if (!previousContent.routeGraphConnectionSet.contains(data)) {
+ if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+ System.out.println("ROUTE GRAPH CONNECTION LoadListener, connection not in current content: " + data + ". Disposing.");
+ disposeListener();
+ return;
+ }
+
+ if (addedElementMap.containsKey(data)) {
+ // This element was just loaded, in
+ // which case its hints need to
+ // uploaded to the real mapped
+ // element immediately.
+ IElement mappedElement = getMappedElement(data);
+ if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+ System.out.println("LOADED ADDED ROUTE GRAPH CONNECTION, currently mapped connection: " + mappedElement);
+ if (mappedElement instanceof Element) {
+ if (DebugPolicy.DEBUG_CONNECTION_LISTENER) {
+ System.out.println(" mapped hints: " + mappedElement.getHints());
+ System.out.println(" loaded hints: " + loaded.getHints());
+ }
+ updateMappedElement((Element) mappedElement, loaded);
+ }
+ } else {
+ // This element was already loaded.
+ // Just schedule an update some time
+ // in the future.
+ if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+ System.out.println("PREVIOUSLY LOADED ROUTE GRAPH CONNECTION UPDATED, scheduling update into the future: " + connection);
+
+ Set<Object> dirtyNodes = new THashSet<Object>(4);
+ IElement mappedElement = getMappedElement(connection);
+ ConnectionEntity ce = mappedElement.getHint(ElementHints.KEY_CONNECTION_ENTITY);
+ if (ce != null) {
+ for (Connection conn : ce.getTerminalConnections(null)) {
+ Object o = conn.node.getHint(ElementHints.KEY_OBJECT);
+ if (o != null) {
+ dirtyNodes.add(o);
+ if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
+ System.out.println("Marked connectivity dirty for node: " + conn.node);
+ }
+ }
+ }
+
+ offerGraphUpdate( routeGraphConnectionUpdater(connection, loaded, dirtyNodes) );
+ }
+ }
+ };
- void processRouteGraphConnections(ReadGraph graph) throws DatabaseException {
+
+ void processRouteGraphConnections(AsyncReadGraph graph) throws DatabaseException {
for (Map.Entry<Resource, Change> entry : changes.routeGraphConnections.entrySet()) {
final Resource connection = entry.getKey();
if (mappedElement != null)
continue;
- Listener<IElement> loadListener = new DisposableListener<IElement>(canvasListenerSupport) {
-
- boolean firstTime = true;
+
+ graph.asyncRequest(new ReadRequest() {
@Override
- public String toString() {
- return "processRouteGraphConnections " + connection;
- }
- @Override
- public void execute(IElement loaded) {
- // Invoked when the element has been loaded.
- if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
- System.out.println("ROUTE GRAPH CONNECTION LoadListener for " + loaded);
+ public void run(ReadGraph graph) throws DatabaseException {
- if (loaded == null) {
- disposeListener();
- return;
- }
-
- if(firstTime) {
- if (DebugPolicy.DEBUG_NODE_LOAD)
- System.out.println("MAPPING ADDED ROUTE GRAPH CONNECTION: " + connection + " -> " + loaded);
- mapElement(connection, loaded);
- synchronized (GraphToDiagramUpdater.this) {
- addedElements.add(loaded);
- addedElementMap.put(connection, loaded);
- addedRouteGraphConnectionMap.put(connection, loaded);
- }
- firstTime = false;
- }
+ LoadRouteGraphConnectionListener loadListener = new LoadRouteGraphConnectionListener(canvasListenerSupport, connection);
- Object data = loaded.getHint(ElementHints.KEY_OBJECT);
+ Tuple3 t = graph.syncRequest(new ConnectionRequest2(canvas, diagram, connection, errorHandler));
+ IElement e = (IElement)t.c0;
+ ElementClass ec = (ElementClass)t.c1;
+ org.simantics.diagram.adapter.ElementFactory ef = (org.simantics.diagram.adapter.ElementFactory)t.c2;
- // Logic for disposing listener
- if (!previousContent.routeGraphConnectionSet.contains(data)) {
- if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
- System.out.println("ROUTE GRAPH CONNECTION LoadListener, connection not in current content: " + data + ". Disposing.");
- disposeListener();
+ if (e == null)
return;
+
+ //System.out.println("ConnectionRequestProcedure " + e);
+ if (DebugPolicy.DEBUG_NODE_LOAD)
+ System.out.println("MAPPING ADDED ROUTE GRAPH CONNECTION: " + connection + " -> " + e);
+ mapElement(connection, e);
+ synchronized (GraphToDiagramUpdater.this) {
+ addedElements.add(e);
+ addedElementMap.put(connection, e);
+ addedRouteGraphConnectionMap.put(connection, e);
}
- if (addedElementMap.containsKey(data)) {
- // This element was just loaded, in
- // which case its hints need to
- // uploaded to the real mapped
- // element immediately.
- IElement mappedElement = getMappedElement(data);
- if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
- System.out.println("LOADED ADDED ROUTE GRAPH CONNECTION, currently mapped connection: " + mappedElement);
- if (mappedElement instanceof Element) {
- if (DebugPolicy.DEBUG_CONNECTION_LISTENER) {
- System.out.println(" mapped hints: " + mappedElement.getHints());
- System.out.println(" loaded hints: " + loaded.getHints());
- }
- updateMappedElement((Element) mappedElement, loaded);
- }
- } else {
- // This element was already loaded.
- // Just schedule an update some time
- // in the future.
- if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
- System.out.println("PREVIOUSLY LOADED ROUTE GRAPH CONNECTION UPDATED, scheduling update into the future: " + connection);
-
- Set<Object> dirtyNodes = new THashSet<Object>(4);
- IElement mappedElement = getMappedElement(connection);
- ConnectionEntity ce = mappedElement.getHint(ElementHints.KEY_CONNECTION_ENTITY);
- if (ce != null) {
- for (Connection conn : ce.getTerminalConnections(null)) {
- Object o = conn.node.getHint(ElementHints.KEY_OBJECT);
- if (o != null) {
- dirtyNodes.add(o);
- if (DebugPolicy.DEBUG_CONNECTION_LISTENER)
- System.out.println("Marked connectivity dirty for node: " + conn.node);
- }
- }
- }
+ graph.syncRequest(new LoadRequest(canvas, diagram, ef, ec, connection), loadListener);
- offerGraphUpdate( routeGraphConnectionUpdater(connection, loaded, dirtyNodes) );
- }
}
- };
- graph.syncRequest(new ConnectionRequest(canvas, diagram, connection, errorHandler, loadListener), new Procedure<IElement>() {
- @Override
- public void execute(final IElement e) {
- }
- @Override
- public void exception(Throwable throwable) {
- error(throwable);
- }
});
+
+
break;
}
case REMOVED: {
Listener<IElement> loadListener = new DisposableListener<IElement>(canvasListenerSupport) {
- boolean firstTime = true;
-
@Override
public String toString() {
return "processBranchPoints for " + element;
return;
}
- if (firstTime) {
-
- mapElement(element, loaded);
- synchronized (GraphToDiagramUpdater.this) {
- addedBranchPoints.add(loaded);
- addedElementMap.put(element, loaded);
- ConnectionEntityImpl ce = getConnectionEntity(element);
- loaded.setHint(ElementHints.KEY_CONNECTION_ENTITY, ce);
- loaded.setHint(ElementHints.KEY_PARENT_ELEMENT, ce.getConnectionElement());
- }
-
- firstTime = false;
-
- }
-
Object data = loaded.getHint(ElementHints.KEY_OBJECT);
if (addedElementMap.containsKey(data)) {
// This element was just loaded, in
return "processBranchPoints";
}
});
+
}
//System.out.println("---- PROCESS BRANCH POINTS END");
task = Timing.BEGIN("processRouteGraphConnections");
if (!changes.routeGraphConnections.isEmpty()) {
- graph.syncRequest(new ReadRequest() {
+ graph.syncRequest(new AsyncReadRequest() {
@Override
- public void run(ReadGraph graph) throws DatabaseException {
+ public void run(AsyncReadGraph graph) throws DatabaseException {
processRouteGraphConnections(graph);
}
@Override
diagramUpdateLock.lock();
try {
- if (DebugPolicy.DEBUG_DIAGRAM_UPDATE)
+ if (DebugPolicy.DEBUG_DIAGRAM_UPDATE) {
System.out.println("In diagramGraphUpdater:");
+ System.out.println("-content = " + content);
+ System.out.println("-previousContent = " + previousContent);
+ }
// Find out what has changed since the last query.
Object task = Timing.BEGIN("diagramContentDifference");
--- /dev/null
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.diagram.adapter;
+
+import java.util.List;
+
+import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.Resource;
+import org.simantics.db.common.primitiverequest.Adapter;
+import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.procedure.Listener;
+import org.simantics.diagram.synchronization.ErrorHandler;
+import org.simantics.g2d.canvas.ICanvasContext;
+import org.simantics.g2d.diagram.IDiagram;
+import org.simantics.g2d.diagram.handler.SubstituteElementClass;
+import org.simantics.g2d.element.ElementClass;
+import org.simantics.g2d.element.IElement;
+import org.simantics.scl.runtime.tuple.Tuple3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Antti Villberg
+ */
+public class NodeRequest2 extends BaseRequest2<Resource, Tuple3> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NodeRequest2.class);
+
+ final IDiagram diagram;
+
+ public NodeRequest2(ICanvasContext canvas, IDiagram diagram, Resource resource) {
+ super(canvas, resource);
+ this.diagram = diagram;
+ }
+
+ @Override
+ public void perform(AsyncReadGraph graph, final AsyncProcedure<Tuple3> procedure) {
+ // Keep this code from crashing the whole DB client by unexpected
+ // throwing of NPE's somewhere in the following code that leads to
+ // procedure not getting called properly.
+ if (diagram == null) {
+ procedure.exception(graph, new NullPointerException("null diagram specified for resource " + data));
+ return;
+ }
+
+ final ErrorHandler eh = ElementFactoryUtil.getErrorHandler(diagram);
+
+ graph.forHasStatement(data, new AsyncProcedure<Boolean>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ eh.error("NodeRequest.forHasStatement failed", throwable);
+ procedure.execute(graph, null);
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, Boolean result) {
+
+ if(!result) {
+ procedure.execute(graph, null);
+ return;
+ }
+
+ graph.asyncRequest(new Adapter<ElementFactory>(data, ElementFactory.class), new TransientCacheAsyncListener<ElementFactory>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ eh.error("NodeRequest.asyncRequest(Adapter<ElementFactory>) failed", throwable);
+ procedure.execute(graph, null);
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, final ElementFactory factory) {
+
+ graph.asyncRequest(new GetElementClassRequest(factory, data, canvas, diagram), new TransientCacheAsyncListener<ElementClass>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ LOGGER.error("Unexpected error in GetElementClassRequest", throwable);
+ procedure.execute(graph, null);
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, ElementClass mutableClazz) {
+ List<SubstituteElementClass> substitutes = diagram.getDiagramClass().getItemsByClass(SubstituteElementClass.class);
+ for (SubstituteElementClass subs : substitutes) {
+ mutableClazz = subs.substitute(diagram, mutableClazz);
+ }
+ final ElementClass clazz = mutableClazz;
+ graph.asyncRequest(new SpawnRequest(canvas, clazz, data), new TransientCacheAsyncListener<IElement>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ LOGGER.error("Unexpected error in SpawnRequest", throwable);
+ procedure.execute(graph, null);
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, IElement element) {
+ procedure.execute(graph, new Tuple3(element, clazz, factory));
+ }
+
+ });
+
+ }
+
+ });
+
+ }
+
+ });
+
+ }
+
+ });
+
+ }
+
+}
for(int i=0;i<res.length;i++) {
if(pos > 0) json.append(",");
Object r = res[i];
- if(r instanceof IdentifiedObject) {
- Object id = ((IdentifiedObject) r).getId();
+ if(r instanceof IIdentifiedObject) {
+ Object id = ((IIdentifiedObject) r).getId();
if(id instanceof IAdaptable) {
Object resource = ((IAdaptable) id).getAdapter(Resource.class);
if(resource != null) {
*******************************************************************************/
package org.simantics.issues.common;
+import java.util.Collection;
import java.util.Collections;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.simantics.db.AsyncReadGraph;
import org.simantics.db.Resource;
import org.simantics.db.common.request.TernaryAsyncRead;
-import org.simantics.db.procedure.AsyncMultiProcedure;
+import org.simantics.db.exception.DatabaseException;
import org.simantics.db.procedure.AsyncProcedure;
import org.simantics.issues.Severity;
/**
* @author Tuukka Lehtonen
*/
-public class ChildMaxIssueSeverity extends TernaryAsyncRead<Resource, Resource, Set<Resource>, Severity>{
+public class ChildMaxIssueSeverity extends TernaryAsyncRead<Resource, Resource, Set<Resource>, Severity> {
+ static class AsyncReadResult<T> {
+ private AtomicReference<T> resultRef;
+ private Throwable throwable;
+ private AtomicInteger counter = new AtomicInteger(1);
+ private AsyncProcedure<T> procedure;
+ AsyncReadResult(AsyncProcedure<T> procedure, AtomicReference<T> resultRef) {
+ this.procedure = procedure;
+ this.resultRef = resultRef;
+ }
+ void except(AsyncReadGraph graph, Throwable throwable) {
+ this.throwable = throwable;
+ dec(graph);
+ }
+ void set(AsyncReadGraph graph, T result) {
+ resultRef.set(result);
+ dec(graph);
+ }
+ void inc() {
+ counter.incrementAndGet();
+ }
+ void dec(AsyncReadGraph graph) {
+ if(counter.decrementAndGet() == 0) {
+ if(throwable != null)
+ procedure.exception(graph, throwable);
+ else
+ procedure.execute(graph, resultRef.get());
+ }
+ }
+
+ }
+
public ChildMaxIssueSeverity(Resource resource, Resource childRelation, Set<Resource> typesToRecurse) {
super(resource, childRelation, typesToRecurse);
}
-// @Override
-// public Severity perform(ReadGraph graph) throws DatabaseException {
-// Severity maxSeverity = null;
-// //System.out.println("severityForChildren: " + NameUtils.getSafeName(graph, resource));
-// for (Resource child : graph.getObjects(resource, resource2)) {
-// Severity s = graph.syncRequest(new MaxIssueSeverityRecursive(child));
-// maxSeverity = Severity.moreSevere(maxSeverity, s);
-// }
-// //System.out.println("severityForChildren: " + NameUtils.getSafeName(graph, resource) + " : " + maxSeverity);
-// return maxSeverity;
-// }
-
@Override
public void perform(AsyncReadGraph graph, final AsyncProcedure<Severity> procedure) {
- //System.out.println(getClass().getSimpleName() + ": " + parameter);
- graph.forTypes(parameter, new AsyncProcedure<Set<Resource>>() {
- @Override
- public void execute(AsyncReadGraph graph, Set<Resource> result) {
- if (!Collections.disjoint(parameter3, result)) {
- checkChildren(graph, procedure);
- } else {
- procedure.execute(graph, null);
- }
- }
- @Override
- public void exception(AsyncReadGraph graph, Throwable throwable) {
- procedure.exception(graph, throwable);
+ try {
+ Set<Resource> types = graph.getTypes(parameter);
+ if (!Collections.disjoint(parameter3, types)) {
+ checkChildren(graph, procedure);
+ } else {
+ procedure.execute(graph, null);
}
- });
+ } catch (DatabaseException e) {
+ procedure.exception(graph, e);
+ }
+
}
protected void checkChildren(AsyncReadGraph graph, final AsyncProcedure<Severity> procedure) {
- graph.forEachObject(parameter, parameter2, new AsyncMultiProcedure<Resource>() {
- AtomicReference<Severity> maxSeverity = new AtomicReference<Severity>();
- @Override
- public void execute(AsyncReadGraph graph, Resource child) {
+
+ AsyncReadResult<Severity> maxSeverity = new AsyncReadResult<Severity>(procedure, new AtomicReference<Severity>());
+
+ try {
+ Collection<Resource> children = graph.getObjects(parameter, parameter2);
+ for(Resource child : children) {
+ maxSeverity.inc();
graph.asyncRequest(new MaxIssueSeverityRecursive(child, parameter2, parameter3), new AsyncProcedure<Severity>() {
@Override
public void execute(AsyncReadGraph graph, Severity severity) {
if (severity != null) {
synchronized (maxSeverity) {
- maxSeverity.set(Severity.moreSevere(maxSeverity.get(), severity));
+ maxSeverity.set(graph, Severity.moreSevere(maxSeverity.resultRef.get(), severity));
}
+ } else {
+ maxSeverity.dec(graph);
}
}
@Override
public void exception(AsyncReadGraph graph, Throwable throwable) {
- procedure.exception(graph, throwable);
+ maxSeverity.except(graph, throwable);
}
});
}
- @Override
- public void finished(AsyncReadGraph graph) {
- procedure.execute(graph, maxSeverity.get());
- }
- @Override
- public void exception(AsyncReadGraph graph, Throwable throwable) {
- procedure.exception(graph, throwable);
- }
- });
+ maxSeverity.dec(graph);
+ } catch (DatabaseException e) {
+ maxSeverity.except(graph, e);
+ return;
+ }
+
}
}
*******************************************************************************/
package org.simantics.issues.common;
+import java.util.Collection;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.ReadGraph;
import org.simantics.db.Resource;
-import org.simantics.db.common.request.TernaryAsyncRead;
-import org.simantics.db.procedure.AsyncMultiProcedure;
-import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.common.request.TernaryRead;
+import org.simantics.db.exception.DatabaseException;
import org.simantics.issues.Severity;
import org.simantics.issues.ontology.IssueResource;
/**
* @author Tuukka Lehtonen
*/
-public class MaxIssueSeverityRecursive extends TernaryAsyncRead<Resource, Resource, Set<Resource>, Severity> {
+public class MaxIssueSeverityRecursive extends TernaryRead<Resource, Resource, Set<Resource>, Severity> {
public MaxIssueSeverityRecursive(Resource resource, Resource childRelation, Set<Resource> typesToRecurse) {
super(resource, childRelation, typesToRecurse);
}
@Override
- public void perform(AsyncReadGraph graph, final AsyncProcedure<Severity> procedure) {
+ public Severity perform(ReadGraph graph) throws DatabaseException {
IssueResource ISSUE = graph.getService(IssueResource.class);
- AtomicInteger issues = new AtomicInteger();
- AtomicBoolean excepted = new AtomicBoolean(false);
-
- graph.forEachObject(parameter, ISSUE.Issue_HasContext_Inverse, new AsyncMultiProcedure<Resource>() {
- @Override
- public void execute(AsyncReadGraph graph, Resource result) {
- issues.incrementAndGet();
- }
- @Override
- public void finished(AsyncReadGraph graph) {
-
- }
- @Override
- public void exception(AsyncReadGraph graph, Throwable throwable) {
- if(excepted.compareAndSet(false, true))
- procedure.exception(graph, throwable);
- }
- });
-
- graph.forEachObject(parameter, ISSUE.Issue_ContextList_Element_Inverse, new AsyncMultiProcedure<Resource>() {
- @Override
- public void execute(AsyncReadGraph graph, Resource result) {
- issues.incrementAndGet();
- }
- @Override
- public void finished(AsyncReadGraph graph) {
-
- }
- @Override
- public void exception(AsyncReadGraph graph, Throwable throwable) {
- if(excepted.compareAndSet(false, true))
- procedure.exception(graph, throwable);
- }
- });
-
- if(excepted.get()) return;
-
- if (issues.get() == 0) {
- // This content does not have directly attached issues, try to look
- // for some in the child components.
- graph.asyncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3), procedure);
- } else {
- // Try local issues first
- graph.asyncRequest(new MaxIssueSeveritySingle(parameter), new AsyncProcedure<Severity>() {
- @Override
- public void execute(AsyncReadGraph graph, Severity maxSeverity) {
- if (maxSeverity == null)
- // No severity for local issues, try children next.
- graph.asyncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3), procedure);
- else
- procedure.execute(graph, maxSeverity);
- }
- @Override
- public void exception(AsyncReadGraph graph, Throwable throwable) {
- if(excepted.compareAndSet(false, true))
- procedure.exception(graph, throwable);
- }
- });
+ Collection<Resource> inv = graph.getObjects(parameter, ISSUE.Issue_HasContext_Inverse);
+ Collection<Resource> inv2 = graph.getObjects(parameter, ISSUE.Issue_ContextList_Element_Inverse);
+ if(inv.isEmpty() && inv2.isEmpty()) {
+ return graph.syncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3));
}
+
+ Severity max = graph.syncRequest(new MaxIssueSeveritySingle(parameter));
+ if(max != null)
+ return max;
+ else
+ return graph.syncRequest(new ChildMaxIssueSeverity(parameter, parameter2, parameter3));
+
}
}
});
}
- IElement mappedElement = diagram.getDiagramClass().getSingleItem(DataElementMap.class).getElement(diagram, element);
+ IElement mappedElement = e;//diagram.getDiagramClass().getSingleItem(DataElementMap.class).getElement(diagram, element);
MonitorListener monitorListener = new MonitorListener(element, canvas, diagram, substitutions);
if (mappedElement != null) {
MonitorListener oldListener = mappedElement.getHint(KEY_VARIABLE_LISTENER);