import org.simantics.utils.Development;
import org.simantics.utils.datastructures.Pair;
import org.simantics.utils.datastructures.collections.CollectionUtils;
+import org.simantics.utils.threads.logger.ITask;
+import org.simantics.utils.threads.logger.ThreadLogger;
import org.slf4j.LoggerFactory;
import gnu.trove.map.hash.TObjectIntHashMap;
@Override
public <T> T syncRequest(final Read<T> request) throws DatabaseException {
assert (request != null);
- return (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true);
+
+ T result = (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true);
+ return result;
+
}
@Override
assert (request != null);
ListenerBase listener = procedure != null ? getListenerBase(procedure) : null;
-
- return QueryCache.resultReadEntry(this, request, parent, listener, procedure);
+ T result = (T)QueryCache.runnerReadEntry(this, request, parent, listener, procedure, true);
+ return result;
}
assert (request != null);
ListenerBase listener = getListenerBase(procedure);
-
-// BlockingAsyncProcedure<T> ap = new BlockingAsyncProcedure<>(this, procedure, request);
- return (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true);
-// return ap.get();
+ T result = (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true);
+ return result;
}
assert (request != null);
assert (procedure != null);
-
- processor.schedule(new SessionTask(this) {
+
+ AsyncBarrierImpl barrier = asyncBarrier;
+ if(barrier != null)
+ barrier.inc();
+
+ processor.scheduleNow(new SessionTask(this) {
@Override
public void run0(int thread) {
QueryCache.runnerReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false);
} catch (DatabaseException e) {
Logger.defaultLogError(e);
+ } finally {
+ if(barrier != null)
+ barrier.dec();
}
}
assert (request != null);
assert (procedure != null);
- processor.schedule(new SessionTask(this) {
+ AsyncBarrierImpl barrier = asyncBarrier;
+ if(barrier != null)
+ barrier.inc();
+
+ processor.scheduleNow(new SessionTask(this) {
@Override
public void run0(int thread) {
- try {
+
+ if(barrier != null)
+ barrier.inc();
+
+ try {
final ListenerBase listener = getListenerBase(procedure);
- QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false);
+ QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, new AsyncProcedure<T>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, T result) {
+ procedure.execute(graph, result);
+ if(barrier != null)
+ barrier.dec();
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ procedure.exception(graph, throwable);
+ if(barrier != null)
+ barrier.dec();
+ }
+
+ }, false);
+ if(barrier != null)
+ barrier.dec();
} catch (DatabaseException e) {
Logger.defaultLogError(e);
}
this.parentGraph = parentGraph;
this.parent = parent;
this.processor = support;
- this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent);
+ this.asyncBarrier = prepareBarrier(parentGraph, parent, null, false);
}
+ ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support, AsyncBarrierImpl asyncBarrier) {
+ this.parentGraph = parentGraph;
+ this.parent = parent;
+ this.processor = support;
+ this.asyncBarrier = asyncBarrier;
+ }
+
ReadGraphImpl(ReadGraphImpl graph, CacheEntry parent) {
this(graph, parent, graph.processor);
}
+ ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ this(parentGraph, parent, parentGraph.processor, prepareBarrier(parentGraph, parent, callback, needsToBlock));
+ }
+
+ static AsyncBarrierImpl prepareBarrier(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ return new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback, needsToBlock);
+ }
+
ReadGraphImpl(ReadGraphImpl graph) {
this(graph, graph.parent);
}
- public ReadGraphImpl withParent(CacheEntry parent) {
- return new ReadGraphImpl(this, parent);
+ public ReadGraphImpl withParent(CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ return new ReadGraphImpl(this, parent, callback, needsToBlock);
}
+// public ReadGraphImpl withParent(CacheEntry parent) {
+// return withParent(parent, null);
+// }
+
+ public ReadGraphImpl syncWithParent(CacheEntry parent) {
+ return new ReadGraphImpl(this, parent, processor, null);
+ }
+
public ReadGraphImpl forRecompute(CacheEntry parent) {
return new ReadGraphImpl(null, parent, processor);
}
public static ReadGraphImpl create(QueryProcessor support) {
- return new ReadGraphImpl(null, null, support);
+ ReadGraphImpl result = new ReadGraphImpl(null, null, support);
+ return result;
}
public ReadGraphImpl newRestart(ReadGraphImpl impl) {
public boolean performPending() {
return processor.performPending(this);
}
-
+
public Set<ReadGraphImpl> ancestorSet() {
HashSet<ReadGraphImpl> result = new HashSet<>();
ReadGraphImpl g = this;
}
return result;
}
+
+ public int getLevel() {
+ return getLevelStatic(this);
+ }
+
+ private static int getLevelStatic(ReadGraphImpl impl) {
+ if(impl == null) return 0;
+ else return 1 + getLevelStatic(impl.parentGraph);
+ }
+
+ public boolean isParent(ReadGraphImpl impl) {
+ if(impl == null) return false;
+ if(this == impl) return true;
+ return isParent(impl.parentGraph);
+ }
+
+ public ReadGraphImpl getTopLevelGraph() {
+ return getTopLevelGraphStatic(this);
+ }
+
+ private static ReadGraphImpl getTopLevelGraphStatic(ReadGraphImpl impl) {
+ if(impl.parentGraph == null) return impl;
+ else return getTopLevelGraphStatic(impl.parentGraph);
+ }
}