import org.simantics.utils.Development;
import org.simantics.utils.datastructures.Pair;
import org.simantics.utils.datastructures.collections.CollectionUtils;
-import org.simantics.utils.threads.logger.ITask;
-import org.simantics.utils.threads.logger.ThreadLogger;
import org.slf4j.LoggerFactory;
import gnu.trove.map.hash.TObjectIntHashMap;
Statement stm = getSingleStatement(resource, relation);
- return adaptContextual(stm.getObject(), new RelationContextImpl(resource, stm), RelationContext.class, clazz);
+ Object o = adaptContextual(stm.getObject(), new RelationContextImpl(resource, stm), RelationContext.class, clazz);
+ if (clazz.isInstance(o))
+ return (T)o;
+ throw new AdaptionException("Returned value is not expected class , got " + o.getClass().getName()+ " , expected " + clazz.getName());
}
try {
- int result = processor.getSingleObject(this, subject, relation);
- if(result == 0) return null;
-
- return processor.querySupport.getResource(result);
+ int result = processor.getSingleObject(this, subject, relation);
+ if(result == 0) return null;
+
+ return processor.querySupport.getResource(result);
+
+ } catch (ManyObjectsForFunctionalRelationException e) {
- } catch (ManyObjectsForFunctionalRelationException e) {
+ throw new ManyObjectsForFunctionalRelationException("Many objects in " + subject + " for functional relation " + relation);
- throw new ManyObjectsForFunctionalRelationException("subject=" + subject + ", relation=" + relation, e);
-
} catch (DatabaseException e) {
throw new ServiceException(e);
* Implementation of the interface RequestProcessor
*/
- @Override
- public <T> T syncRequest(final Read<T> request) throws DatabaseException {
- assert (request != null);
-
- ITask task = ThreadLogger.task(request);
- T result = (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true);
- task.finish();
- return result;
-
- }
+ @Override
+ public <T> T syncRequest(final Read<T> request) throws DatabaseException {
+ assert (request != null);
+ return (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true);
+ }
@Override
public <T> T syncRequest(Read<T> request, SyncListener<T> procedure)
return syncRequest(request, new NoneToAsyncListener<T>(procedure));
}
- @Override
- public <T> T syncRequest(final Read<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
-
- assert (request != null);
-
- ITask task = ThreadLogger.task(request);
- ListenerBase listener = procedure != null ? getListenerBase(procedure) : null;
- T result = QueryCache.resultReadEntry(this, request, parent, listener, procedure);
- task.finish();
- return result;
-
- }
+ @Override
+ public <T> T syncRequest(final Read<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
+ assert (request != null);
+ ListenerBase listener = procedure != null ? getListenerBase(procedure) : null;
+ return (T)QueryCache.runnerReadEntry(this, request, parent, listener, procedure, true);
+ }
@Override
public <T> T syncRequest(final Read<T> request,
return syncRequest(request, new NoneToAsyncListener<T>(procedure));
}
- @Override
- final public <T> T syncRequest(final AsyncRead<T> request,
- final AsyncProcedure<T> procedure) throws DatabaseException {
-
- assert (request != null);
-
- ITask task = ThreadLogger.task(request);
- ListenerBase listener = getListenerBase(procedure);
- T result = (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true);
- task.finish();
- return result;
-
- }
+ @Override
+ final public <T> T syncRequest(final AsyncRead<T> request,
+ final AsyncProcedure<T> procedure) throws DatabaseException {
+ assert (request != null);
+ ListenerBase listener = getListenerBase(procedure);
+ return (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true);
+ }
@Override
public <T> T syncRequest(AsyncRead<T> request,
assert (request != null);
assert (procedure != null);
-
+
+ AsyncBarrierImpl barrier = asyncBarrier;
+ if(barrier != null)
+ barrier.inc();
+
processor.scheduleNow(new SessionTask(this) {
@Override
QueryCache.runnerReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false);
} catch (DatabaseException e) {
Logger.defaultLogError(e);
+ } finally {
+ if(barrier != null)
+ barrier.dec();
}
}
asyncRequest(request, new NoneToAsyncListener<T>(procedure));
}
- @Override
- final public <T> void asyncRequest(final AsyncRead<T> request,
- final AsyncProcedure<T> procedure) {
+ @Override
+ final public <T> void asyncRequest(final AsyncRead<T> request,
+ final AsyncProcedure<T> procedure) {
- assert (request != null);
- assert (procedure != null);
+ assert (request != null);
+ assert (procedure != null);
- ITask task = ThreadLogger.task(request);
+ AsyncBarrierImpl barrier = asyncBarrier;
+ if(barrier != null)
+ barrier.inc();
- processor.scheduleNow(new SessionTask(this) {
+ processor.scheduleNow(new SessionTask(this) {
- @Override
- public void run0(int thread) {
- try {
- final ListenerBase listener = getListenerBase(procedure);
- QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, new AsyncProcedure<T>() {
+ @Override
+ public void run0(int thread) {
- @Override
- public void execute(AsyncReadGraph graph, T result) {
- task.finish();
- procedure.execute(graph, result);
- }
+ if(barrier != null)
+ barrier.inc();
- @Override
- public void exception(AsyncReadGraph graph, Throwable throwable) {
- task.finish();
- procedure.exception(graph, throwable);
- }
-
- }, false);
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
- }
-
- });
+ try {
+ final ListenerBase listener = getListenerBase(procedure);
+ QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, new AsyncProcedure<T>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, T result) {
+ procedure.execute(graph, result);
+ if(barrier != null)
+ barrier.dec();
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ procedure.exception(graph, throwable);
+ if(barrier != null)
+ barrier.dec();
+ }
+
+ }, false);
+ } catch (DatabaseException e) {
+ LOGGER.error("Error while executing async request", e);
+ } finally {
+ if(barrier != null)
+ barrier.dec();
+ }
+ }
- }
+ });
+
+ }
@Override
public <T> void asyncRequest(AsyncRead<T> request,
this.parentGraph = parentGraph;
this.parent = parent;
this.processor = support;
- this.asyncBarrier = new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent);
+ this.asyncBarrier = prepareBarrier(parentGraph, parent, null, false);
}
+ ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support, AsyncBarrierImpl asyncBarrier) {
+ this.parentGraph = parentGraph;
+ this.parent = parent;
+ this.processor = support;
+ this.asyncBarrier = asyncBarrier;
+ }
+
ReadGraphImpl(ReadGraphImpl graph, CacheEntry parent) {
this(graph, parent, graph.processor);
}
+ ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ this(parentGraph, parent, parentGraph.processor, prepareBarrier(parentGraph, parent, callback, needsToBlock));
+ }
+
+ static AsyncBarrierImpl prepareBarrier(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ return new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback, needsToBlock);
+ }
+
ReadGraphImpl(ReadGraphImpl graph) {
this(graph, graph.parent);
}
- public ReadGraphImpl withParent(CacheEntry parent) {
- return new ReadGraphImpl(this, parent);
+ public ReadGraphImpl withParent(CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ return new ReadGraphImpl(this, parent, callback, needsToBlock);
}
public ReadGraphImpl forRecompute(CacheEntry parent) {
return new ReadGraphImpl(null, parent, processor);
}
- public static ReadGraphImpl create(QueryProcessor support) {
- return new ReadGraphImpl(null, null, support);
- }
+ public static ReadGraphImpl create(QueryProcessor support) {
+ ReadGraphImpl result = new ReadGraphImpl(null, null, support);
+ return result;
+ }
public ReadGraphImpl newRestart(ReadGraphImpl impl) {
@Override
public String toString() {
- return "ReadGraphImpl[thread=" + Thread.currentThread() + "]";
+ return "ReadGraphImpl[thread=" + Thread.currentThread() + "] [parent=" + String.valueOf(parent) + "] [asyncBarrier=" + String.valueOf(asyncBarrier)+ "]";
}
@Override
else return 1 + getLevelStatic(impl.parentGraph);
}
+ public boolean isParent(ReadGraphImpl impl) {
+ if(impl == null) return false;
+ if(this == impl) return true;
+ return isParent(impl.parentGraph);
+ }
+
public ReadGraphImpl getTopLevelGraph() {
return getTopLevelGraphStatic(this);
}
else return getTopLevelGraphStatic(impl.parentGraph);
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T l0() {
+ return (T) processor.getL0();
+ }
+
}