abstract public class TransientResourceRead<R> extends BinaryRead<Object,Resource,R> implements ReadExt {
- protected static final Object WITH_PARENT = new Object();
-
- public TransientResourceRead(ReadGraph graph, Resource parameter) throws DatabaseException {
- this(graph, graph.getService(QueryControl.class), parameter);
- }
-
- public TransientResourceRead(ReadGraph graph, QueryControl qc, Resource parameter) throws DatabaseException {
- super(resolveFirstParameter(graph, qc), parameter);
- }
-
- final private static Object resolveFirstParameter(ReadGraph graph, QueryControl qc) throws DatabaseException {
- if(qc.hasParentRequest(graph)) return WITH_PARENT;
- else return graph.getModificationCounter();
- }
-
- @Override
- final public R perform(ReadGraph _graph) throws DatabaseException {
- if(parameter == WITH_PARENT) {
- return perform(_graph, parameter2);
- } else {
- QueryControl qc = _graph.getService(QueryControl.class);
- ReadGraph graph = qc.getIndependentGraph(_graph);
- return perform(graph, parameter2);
- }
- }
-
- abstract public R perform(ReadGraph graph, Resource parameter) throws DatabaseException;
-
- @Override
- public boolean isImmutable(ReadGraph graph) throws DatabaseException {
- return graph.isImmutable(parameter2);
- }
-
+ protected static final Object WITH_PARENT = new Object();
+
+ public TransientResourceRead(ReadGraph graph, Resource parameter) throws DatabaseException {
+ this(graph, graph.getService(QueryControl.class), parameter);
+ }
+
+ public TransientResourceRead(ReadGraph graph, QueryControl qc, Resource parameter) throws DatabaseException {
+ super(resolveFirstParameter(graph, qc), parameter);
+ }
+
+ final private static Object resolveFirstParameter(ReadGraph graph, QueryControl qc) throws DatabaseException {
+ if(qc.hasParentRequest(graph)) return WITH_PARENT;
+ else return graph.getModificationCounter();
+ }
+
+ @Override
+ final public R perform(ReadGraph _graph) throws DatabaseException {
+ if(parameter == WITH_PARENT) {
+ return perform(_graph, parameter2);
+ } else {
+ QueryControl qc = _graph.getService(QueryControl.class);
+ return qc.syncRequestIndependent(_graph, new UniqueRead<R>() {
+ @Override
+ public R perform(ReadGraph graph) throws DatabaseException {
+ return TransientResourceRead.this.perform(graph, parameter2);
+ }
+ });
+ }
+ }
+
+ abstract public R perform(ReadGraph graph, Resource parameter) throws DatabaseException;
+
+ @Override
+ public boolean isImmutable(ReadGraph graph) throws DatabaseException {
+ return graph.isImmutable(parameter2);
+ }
+
}
import org.simantics.db.impl.query.PendingTaskSupport;
import org.simantics.db.procedure.AsyncProcedure;
import org.simantics.db.request.AsyncRead;
-public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result> {
+
+public class BlockingAsyncProcedure<Result> implements AsyncProcedure<Result>, Runnable {
private static final Object NO_RESULT = new Object();
private final Object key;
private final ReadGraphImpl queryGraph;
private final ReadGraphImpl callerGraph;
+ private final AsyncReadEntry<Result> entry;
private final AsyncProcedure<Result> procedure;
private PendingTaskSupport pendingTaskSupport;
private final boolean needsToBlock;
private Object result = NO_RESULT;
private Throwable exception = null;
-
- private ReadGraphImpl queryGraph() {
- return queryGraph;
- }
public BlockingAsyncProcedure(ReadGraphImpl callerGraph, AsyncReadEntry<Result> entry, AsyncProcedure<Result> procedure, Object key, boolean needsToBlock) {
// A new graph for evaluating the query with correct parent and asyncBarrier
- queryGraph = callerGraph.withParent(entry, () -> {
-
- dispatchProcedure(queryGraph(), callerGraph, entry, procedure, needsToBlock);
-
- }, needsToBlock);
-
+ queryGraph = callerGraph.withParent(entry, this, needsToBlock);
queryGraph.asyncBarrier.inc();
+
+ // This makes sure that caller does not quit before dispatch
+ callerGraph.asyncBarrier.inc();
+ this.entry = entry;
this.procedure = procedure;
this.key = key;
this.queryGraph.asyncBarrier.inc();
return "." + procedure;
}
- private void dispatchProcedure(ReadGraphImpl queryGraph, ReadGraphImpl parentGraph, AsyncReadEntry<Result> entry, AsyncProcedure<Result> procedure_, boolean needsToBlock) {
+ @Override
+ public void run() {
- AsyncProcedure<Result> procedure = entry != null ? entry : procedure_;
+ AsyncProcedure<Result> procedure__ = entry != null ? entry : procedure;
- ReadGraphImpl executeGraph = parentGraph.withParent(parentGraph.parent, null, needsToBlock);
+ ReadGraphImpl executeGraph = callerGraph.withParent(callerGraph.parent, null, needsToBlock);
executeGraph.asyncBarrier.inc();
+
+ // This counters the inc in the constructor
+ callerGraph.asyncBarrier.dec();
+
try {
- if(procedure != null) {
- procedure.execute(executeGraph, get());
+ if(procedure__ != null) {
+ procedure__.execute(executeGraph, get());
}
} catch (DatabaseException e) {
- if(procedure != null) procedure.exception(executeGraph, e);
+ if(procedure__ != null) procedure__.exception(executeGraph, e);
exception = e;
} catch (Throwable t) {
DatabaseException dbe = new DatabaseException(t);
- if(procedure != null) procedure.exception(executeGraph, dbe);
+ if(procedure__ != null) procedure__.exception(executeGraph, dbe);
exception = dbe;
} finally {
if (entry != null) {
assert(entry.isReady());
// This does not throw
- entry.performFromCache(executeGraph, procedure_);
+ entry.performFromCache(executeGraph, procedure);
}
executeGraph.asyncBarrier.dec();
if(needsToBlock)
- executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph);
+ executeGraph.asyncBarrier.waitBarrier(procedure__, executeGraph);
}
if (BarrierTracing.BOOKKEEPING) {
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
-import org.simantics.db.common.utils.Logger;
import org.simantics.db.exception.RuntimeDatabaseException;
import org.simantics.db.impl.query.CacheEntry;
import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
+import org.slf4j.LoggerFactory;
public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AsyncBarrierImpl.class);
+
private static final long serialVersionUID = 4724463372850048672L;
static final int WAIT_TIME = 60000;
}
}
if (count < 0) {
- Logger.defaultLogError(
+ LOGGER.error(
"Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
new Exception());
}
import org.simantics.utils.Development;
import org.simantics.utils.datastructures.Pair;
import org.simantics.utils.datastructures.collections.CollectionUtils;
-import org.simantics.utils.threads.logger.ITask;
-import org.simantics.utils.threads.logger.ThreadLogger;
import org.slf4j.LoggerFactory;
import gnu.trove.map.hash.TObjectIntHashMap;
* Implementation of the interface RequestProcessor
*/
- @Override
- public <T> T syncRequest(final Read<T> request) throws DatabaseException {
- assert (request != null);
-
- T result = (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true);
- return result;
-
- }
+ @Override
+ public <T> T syncRequest(final Read<T> request) throws DatabaseException {
+ assert (request != null);
+ return (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true);
+ }
@Override
public <T> T syncRequest(Read<T> request, SyncListener<T> procedure)
return syncRequest(request, new NoneToAsyncListener<T>(procedure));
}
- @Override
- public <T> T syncRequest(final Read<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
-
- assert (request != null);
-
- ListenerBase listener = procedure != null ? getListenerBase(procedure) : null;
- T result = (T)QueryCache.runnerReadEntry(this, request, parent, listener, procedure, true);
- return result;
-
- }
+ @Override
+ public <T> T syncRequest(final Read<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
+ assert (request != null);
+ ListenerBase listener = procedure != null ? getListenerBase(procedure) : null;
+ return (T)QueryCache.runnerReadEntry(this, request, parent, listener, procedure, true);
+ }
@Override
public <T> T syncRequest(final Read<T> request,
return syncRequest(request, new NoneToAsyncListener<T>(procedure));
}
- @Override
- final public <T> T syncRequest(final AsyncRead<T> request,
- final AsyncProcedure<T> procedure) throws DatabaseException {
-
- assert (request != null);
-
- ListenerBase listener = getListenerBase(procedure);
- T result = (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true);
- return result;
-
- }
+ @Override
+ final public <T> T syncRequest(final AsyncRead<T> request,
+ final AsyncProcedure<T> procedure) throws DatabaseException {
+ assert (request != null);
+ ListenerBase listener = getListenerBase(procedure);
+ return (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true);
+ }
@Override
public <T> T syncRequest(AsyncRead<T> request,
asyncRequest(request, new NoneToAsyncListener<T>(procedure));
}
- @Override
- final public <T> void asyncRequest(final AsyncRead<T> request,
- final AsyncProcedure<T> procedure) {
+ @Override
+ final public <T> void asyncRequest(final AsyncRead<T> request,
+ final AsyncProcedure<T> procedure) {
- assert (request != null);
- assert (procedure != null);
+ assert (request != null);
+ assert (procedure != null);
AsyncBarrierImpl barrier = asyncBarrier;
if(barrier != null)
barrier.inc();
- processor.scheduleNow(new SessionTask(this) {
+ processor.scheduleNow(new SessionTask(this) {
- @Override
- public void run0(int thread) {
+ @Override
+ public void run0(int thread) {
- if(barrier != null)
- barrier.inc();
+ if(barrier != null)
+ barrier.inc();
- try {
- final ListenerBase listener = getListenerBase(procedure);
- QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, new AsyncProcedure<T>() {
+ try {
+ final ListenerBase listener = getListenerBase(procedure);
+ QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, new AsyncProcedure<T>() {
- @Override
- public void execute(AsyncReadGraph graph, T result) {
- procedure.execute(graph, result);
- if(barrier != null)
- barrier.dec();
- }
+ @Override
+ public void execute(AsyncReadGraph graph, T result) {
+ procedure.execute(graph, result);
+ if(barrier != null)
+ barrier.dec();
+ }
- @Override
- public void exception(AsyncReadGraph graph, Throwable throwable) {
- procedure.exception(graph, throwable);
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ procedure.exception(graph, throwable);
if(barrier != null)
barrier.dec();
- }
-
- }, false);
- if(barrier != null)
- barrier.dec();
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
- }
-
- });
+ }
+
+ }, false);
+ } catch (DatabaseException e) {
+ LOGGER.error("Error while executing async request", e);
+ } finally {
+ if(barrier != null)
+ barrier.dec();
+ }
+ }
- }
+ });
+
+ }
@Override
public <T> void asyncRequest(AsyncRead<T> request,
}
- public void fireListeners(WriteGraphImpl graph) {
+ public void fireListeners(ReadGraphImpl graph) {
assert (!processor.updating);
assert (!processor.cache.collecting);
entry.prepareRecompute(querySupport);
ReadGraphImpl parentGraph = graph.forRecompute(entry);
-
+ parentGraph.asyncBarrier.inc();
query.recompute(parentGraph);
+ parentGraph.asyncBarrier.dec();
if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
import org.simantics.db.ReadGraph;
import org.simantics.db.Resource;
-import org.simantics.db.common.utils.Logger;
+import org.simantics.db.common.request.UniqueRead;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.layer0.adapter.GenericRelationIndex;
import org.simantics.db.service.QueryControl;
import org.simantics.operation.Layer0X;
import org.simantics.scl.runtime.function.FunctionImpl4;
import org.simantics.scl.runtime.function.UnsaturatedFunction2;
+import org.slf4j.LoggerFactory;
/**
* dependencies:
*/
public class Dependencies extends FunctionImpl4<ReadGraph, Resource, String, Integer, Object> {
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(Dependencies.class);
+
public static final String FIELD_MODEL = "Model";
public static final String FIELD_PARENT = "Parent";
public static final String FIELD_RESOURCE = "Resource";
final int maxResults = _maxResults != null ? _maxResults : Integer.MAX_VALUE;
QueryControl qc = graph.getService(QueryControl.class);
- return index.query(qc.getIndependentGraph(graph), query, getBindingPattern(), new Object[] { model }, maxResults);
+ return qc.syncRequestIndependent(graph, new UniqueRead<Object>() {
+ @Override
+ public Object perform(ReadGraph graph) throws DatabaseException {
+ return index.query(graph, query, getBindingPattern(), new Object[] { model }, maxResults);
+ }
+ });
} catch (DatabaseException e) {
- Logger.defaultLogError(e);
+ LOGGER.error("Error while performing index query", e);
return null;
}
}
import org.simantics.db.ReadGraph;
import org.simantics.db.Resource;
-import org.simantics.db.common.utils.Logger;
+import org.simantics.db.common.request.UniqueRead;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.layer0.adapter.GenericRelationIndex;
import org.simantics.db.service.QueryControl;
import org.simantics.operation.Layer0X;
import org.simantics.scl.runtime.function.FunctionImpl4;
import org.simantics.scl.runtime.function.UnsaturatedFunction2;
+import org.slf4j.LoggerFactory;
/**
* dependencyResources:
*/
public class DependencyResources extends FunctionImpl4<ReadGraph, Resource, String, Integer, Object> {
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(DependencyResources.class);
+
protected Resource getIndexRelation(ReadGraph graph) {
return Layer0X.getInstance(graph).DependenciesRelation;
}
final int maxResults = _maxResults != null ? _maxResults : Integer.MAX_VALUE;
QueryControl qc = graph.getService(QueryControl.class);
- return index.queryResources(qc.getIndependentGraph(graph), query, getBindingPattern(), new Object[] { model }, maxResults);
+ return qc.syncRequestIndependent(graph, new UniqueRead<Object>() {
+ @Override
+ public Object perform(ReadGraph graph) throws DatabaseException {
+ return index.queryResources(graph, query, getBindingPattern(), new Object[] { model }, maxResults);
+ }
+ });
} catch (DatabaseException e) {
- Logger.defaultLogError(e);
+ LOGGER.error("Error while performing index query", e);
return null;
}
}
import org.simantics.db.procedure.AsyncContextMultiProcedure;
import org.simantics.db.procedure.AsyncMultiProcedure;
import org.simantics.db.request.ExternalRead;
+import org.simantics.db.request.Read;
import org.simantics.db.service.QueryControl;
import org.simantics.utils.DataContainer;
import org.slf4j.Logger;
ReadGraphImpl impl = (ReadGraphImpl)graph;
return impl.withParent(null, null, false);
}
+
+ @Override
+ public <T> T syncRequestIndependent(ReadGraph graph, Read<T> request) throws DatabaseException {
+ ReadGraphImpl independent = ((ReadGraphImpl)graph).withParent(null, null, false);
+ independent.asyncBarrier.inc();
+ try {
+ return independent.syncRequest(request);
+ } finally {
+ independent.asyncBarrier.dec();
+ }
+ }
@Override
public boolean hasParentRequest(ReadGraph graph) {
});
assert (null != writer);
-// writer.state.barrier.inc();
+ writer.asyncBarrier.inc();
try {
request.perform(writer);
Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
writeState.except(t);
} finally {
-// writer.state.barrier.dec();
+ writer.asyncBarrier.dec();
// writer.waitAsync(request);
}
VirtualGraph vg = getProvider(request.getProvider());
WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
- try {
- WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
- writeState = writeStateT;
+ WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
+ writeState = writeStateT;
+ assert (null != writer);
- assert (null != writer);
-// writer.state.barrier.inc();
+ try {
+ writer.asyncBarrier.inc();
writeStateT.setResult(request.perform(writer));
assert (null != writer);
-
-// writer.state.barrier.dec();
-// writer.waitAsync(null);
-
} catch (Throwable e) {
-
-// writer.state.barrier.dec();
-// writer.waitAsync(null);
-
writeState.except(e);
-
-// state.stopWriteTransaction(clusterStream);
-//
-// } catch (Throwable e) {
-// // Log it first, just to be safe that the error is always logged.
-// Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
-//
-// try {
-// // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
-// // All we can do here is to log those, can't really pass them anywhere.
-// if (procedure != null) {
-// if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
-// else procedure.exception(new DatabaseException(e));
-// }
-// } catch (Throwable e2) {
-// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
-// }
-//
-// clientChanges = new ClientChangesImpl(SessionImplSocket.this);
-//
-// state.stopWriteTransaction(clusterStream);
-
} finally {
+ writer.asyncBarrier.dec();
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
}
-// if(notify != null) notify.release();
-
task.finish();
}
ITask task = ThreadLogger.task(request);
final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+ newGraph.asyncBarrier.inc();
try {
} finally {
+ newGraph.asyncBarrier.dec();
+
fireSessionVariableChange(SessionVariables.QUEUED_READS);
}
return;
WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
+ reactionGraph.asyncBarrier.inc();
try {
} finally {
+ reactionGraph.asyncBarrier.dec();
+
}
} catch (Throwable t) {
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.exception.InternalException;
import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.impl.graph.ReadGraphImpl;
import org.simantics.db.impl.graph.WriteGraphImpl;
import org.simantics.db.impl.query.QueryProcessor;
import org.simantics.db.request.WriteOnly;
// start = System.nanoTime();
queryProvider.propagateChangesInQueryCache(graph);
- queryProvider.listening.fireListeners(graph);
+ ReadGraphImpl listenerGraph = graph.forRecompute(null);
+ listenerGraph.asyncBarrier.inc();
+ queryProvider.listening.fireListeners(listenerGraph);
+ listenerGraph.asyncBarrier.dec();
// duration = System.nanoTime() - start;
// System.out.println("performScheduledUpdates " + 1e-9*duration + "s. ");
import org.simantics.db.ReadGraph;
import org.simantics.db.Resource;
import org.simantics.db.WriteGraph;
+import org.simantics.db.exception.DatabaseException;
import org.simantics.db.procedure.AsyncContextMultiProcedure;
import org.simantics.db.procedure.AsyncMultiProcedure;
import org.simantics.db.request.ExternalRead;
+import org.simantics.db.request.Read;
/**
* performed with the specified ReadGraph. DB listeners are therefore not
* triggered by anything that is performed with the returned ReadGraph.
*
+ * @Deprecated In favor of syncRequestIndependent
* @param graph read transaction handle to clone for listener-independent
* use
* @return read transaction handle that is independent of the requests
* performed with the parameter
*/
ReadGraph getIndependentGraph(ReadGraph graph);
+
+ /**
+ * Performs the given request without accumulating query dependencies.
+ * DB listeners are therefore not triggered by anything that is performed within the request.
+ *
+ * @param graph read transaction handle to clone for listener-independent
+ * use
+ * @param request the request to perform
+ * @return the result of the request evaluation
+ */
+ <T> T syncRequestIndependent(ReadGraph graph, Read<T> request) throws DatabaseException;
+
boolean hasParentRequest(ReadGraph graph);
}