import java.io.PrintStream;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationTargetException;
-import java.nio.BufferUnderflowException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import org.simantics.databoard.util.binary.BinaryFile;
import org.simantics.databoard.util.binary.RandomAccessBinary;
import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.ComputationalValue;
import org.simantics.db.DevelopmentKeys;
import org.simantics.db.ExternalValueSupport;
import org.simantics.db.ReadGraph;
import org.simantics.db.common.primitiverequest.ValueImplied;
import org.simantics.db.common.primitiverequest.VariantValueImplied;
import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.AsyncProcedureAdapter;
import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener;
import org.simantics.db.exception.ResourceNotFoundException;
import org.simantics.db.exception.ServiceException;
import org.simantics.db.exception.ValidationException;
-import org.simantics.db.impl.BlockingAsyncProcedure;
import org.simantics.db.impl.RelationContextImpl;
import org.simantics.db.impl.ResourceImpl;
import org.simantics.db.impl.internal.RandomAccessValueSupport;
final static boolean EMPTY_RESOURCE_CHECK = false;
final public CacheEntry parent;
+ public final ReadGraphImpl parentGraph;
final public QueryProcessor processor;
- public AsyncBarrierImpl asyncBarrier = null;
+ public final AsyncBarrierImpl asyncBarrier;
final static Binding DATA_TYPE_BINDING_INTERNAL = Bindings.getBindingUnchecked(Datatype.class);
final static Serializer DATA_TYPE_SERIALIZER = Bindings.getSerializerUnchecked(DATA_TYPE_BINDING_INTERNAL);
assert (subject != null);
+ byte[] bytes = null;
try {
- byte[] bytes = processor.getValue(this, subject);
+ bytes = processor.getValue(this, subject);
if (bytes == null) throw new DoesNotContainValueException("No value for resource " + subject);
Serializer serializer = getSerializer(binding);
throw new DoesNotContainValueException(e);
- } catch (IOException e) {
-
- throw new ServiceException(e);
-
- } catch (DatabaseException e) {
-
- throw new ServiceException(e);
-
- } catch (BufferUnderflowException e) {
- // This is sometimes thrown when deserialize fails because wrong format.
- // For callers of this method this is just an service exception.
- throw new ServiceException(e);
- }
-
+ } catch (Throwable t) {
+ throw new ServiceException("Could not getValue for subject " + debugString(subject) + " and binding " + String.valueOf(binding) + " with bytes " + safeArrayToString(bytes), t);
+ }
}
@Override
try {
- int result = processor.getSingleObject(this, subject, relation);
- if(result == 0) return null;
-
- return processor.querySupport.getResource(result);
+ int result = processor.getSingleObject(this, subject, relation);
+ if(result == 0) return null;
+
+ return processor.querySupport.getResource(result);
- } catch (ManyObjectsForFunctionalRelationException e) {
+ } catch (ManyObjectsForFunctionalRelationException e) {
+
+ throw new ManyObjectsForFunctionalRelationException("Many objects in " + subject + " for functional relation " + relation);
- throw new ManyObjectsForFunctionalRelationException("subject=" + subject + ", relation=" + relation, e);
-
} catch (DatabaseException e) {
throw new ServiceException(e);
* Implementation of the interface RequestProcessor
*/
- @Override
- public <T> T syncRequest(final Read<T> request) throws DatabaseException {
-
- assert (request != null);
-
- return QueryCache.resultReadEntry(this, request, parent, null, null);
-
- }
+ @Override
+ public <T> T syncRequest(final Read<T> request) throws DatabaseException {
+ assert (request != null);
+ return (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true);
+ }
@Override
public <T> T syncRequest(Read<T> request, SyncListener<T> procedure)
return syncRequest(request, new NoneToAsyncListener<T>(procedure));
}
- @Override
- public <T> T syncRequest(final Read<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
-
- assert (request != null);
-
- ListenerBase listener = procedure != null ? getListenerBase(procedure) : null;
-
- return QueryCache.resultReadEntry(this, request, parent, listener, procedure);
-
- }
+ @Override
+ public <T> T syncRequest(final Read<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
+ assert (request != null);
+ ListenerBase listener = procedure != null ? getListenerBase(procedure) : null;
+ return (T)QueryCache.runnerReadEntry(this, request, parent, listener, procedure, true);
+ }
@Override
public <T> T syncRequest(final Read<T> request,
throws DatabaseException {
assert (request != null);
- asyncBarrier = new AsyncBarrierImpl(null);
- BlockingAsyncProcedure<T> ap = new BlockingAsyncProcedure<>(this, null, request);
- syncRequest(request, ap);
- return ap.get();
+ return syncRequest(request, new AsyncProcedureAdapter<>() );
}
return syncRequest(request, new NoneToAsyncListener<T>(procedure));
}
- @Override
- final public <T> T syncRequest(final AsyncRead<T> request,
- final AsyncProcedure<T> procedure) throws DatabaseException {
-
- assert (request != null);
-
- ListenerBase listener = getListenerBase(procedure);
-
- BlockingAsyncProcedure<T> ap = new BlockingAsyncProcedure<>(this, procedure, request);
-
- QueryCache.runnerAsyncReadEntry(this, request, parent, listener, ap, true);
-
- return ap.get();
-
- }
-
- final private <T> void syncRequest(final AsyncRead<T> request, final AsyncReadProcedure<T> procedure) throws DatabaseException {
-
- assert (request != null);
-
- ListenerBase listener = getListenerBase(procedure);
- assert(listener == null);
-
- BlockingAsyncProcedure<T> ap = new BlockingAsyncProcedure<>(this, procedure, request);
-
- QueryCache.runnerAsyncReadEntry(this, request, parent, listener, ap, true);
-
- ap.get();
-
- }
+ @Override
+ final public <T> T syncRequest(final AsyncRead<T> request,
+ final AsyncProcedure<T> procedure) throws DatabaseException {
+ assert (request != null);
+ ListenerBase listener = getListenerBase(procedure);
+ return (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true);
+ }
@Override
public <T> T syncRequest(AsyncRead<T> request,
// else
procedure.execute(graph, (T) obj);
- } catch (IOException e) {
- procedure.exception(graph, e);
- } catch (BufferUnderflowException e) {
- procedure.exception(graph, e);
} catch (Throwable t) {
- procedure.exception(graph, t);
+ procedure.exception(graph, new ServiceException("Could not forValue for subject " + debugString(resource) + " and binding " + String.valueOf(binding) + " with bytes " + safeArrayToString(result), t));
}
-
}
@Override
});
}
+
+ private static String safeArrayToString(byte[] a) {
+ if (a == null)
+ return "null";
+ int iMax = a.length - 1;
+ if (iMax == -1)
+ return "[]";
+
+ StringBuilder b = new StringBuilder();
+ b.append('[');
+ for (int i = 0; i < 100; i++) { // limit to first 100 items
+ b.append(a[i]);
+ if (i == iMax)
+ return b.append(']').toString();
+ b.append(", ");
+ }
+ return b.append(", ... (" + a.length + ")]").toString();
+ }
@Override
public <T> void forValue(Resource subject, Binding binding,
else
procedure.execute(graph, (T) obj);
- } catch (IOException e) {
- procedure.exception(graph, e);
- } catch (BufferUnderflowException e) {
- procedure.exception(graph, e);
} catch (Throwable t) {
- procedure.exception(graph, t);
+ procedure.exception(graph, new ServiceException("Could not forValue for subject " + debugString(resource) + " and binding " + String.valueOf(binding) + " with bytes " + safeArrayToString(result), t));
}
-
}
@Override
assert (request != null);
assert (procedure != null);
-
- processor.schedule(new SessionTask(false) {
+
+ AsyncBarrierImpl barrier = asyncBarrier;
+ if(barrier != null)
+ barrier.inc();
+
+ processor.scheduleNow(new SessionTask(this) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
try {
final ListenerBase listener = getListenerBase(procedure);
QueryCache.runnerReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false);
} catch (DatabaseException e) {
Logger.defaultLogError(e);
+ } finally {
+ if(barrier != null)
+ barrier.dec();
}
}
}
public static ReadGraphImpl createAsync(QueryProcessor support) {
- return new ReadGraphImpl(null, support);
+ return new ReadGraphImpl(null, null, support);
}
@Override
asyncRequest(request, new NoneToAsyncListener<T>(procedure));
}
- @Override
- final public <T> void asyncRequest(final AsyncRead<T> request,
- final AsyncProcedure<T> procedure) {
+ @Override
+ final public <T> void asyncRequest(final AsyncRead<T> request,
+ final AsyncProcedure<T> procedure) {
- assert (request != null);
- assert (procedure != null);
+ assert (request != null);
+ assert (procedure != null);
- processor.schedule(new SessionTask(false) {
+ AsyncBarrierImpl barrier = asyncBarrier;
+ if(barrier != null)
+ barrier.inc();
- @Override
- public void run(int thread) {
- try {
- final ListenerBase listener = getListenerBase(procedure);
- QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false);
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
- }
-
- });
+ processor.scheduleNow(new SessionTask(this) {
- }
+ @Override
+ public void run0(int thread) {
+
+ if(barrier != null)
+ barrier.inc();
+
+ try {
+ final ListenerBase listener = getListenerBase(procedure);
+ QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, new AsyncProcedure<T>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, T result) {
+ procedure.execute(graph, result);
+ if(barrier != null)
+ barrier.dec();
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ procedure.exception(graph, throwable);
+ if(barrier != null)
+ barrier.dec();
+ }
+
+ }, false);
+ } catch (DatabaseException e) {
+ LOGGER.error("Error while executing async request", e);
+ } finally {
+ if(barrier != null)
+ barrier.dec();
+ }
+ }
+
+ });
+
+ }
@Override
public <T> void asyncRequest(AsyncRead<T> request,
* Internal routines
*/
- protected static String INTERNAL_ERROR_STRING = "Transaction aborted due to internal client error. Contact application support.";
+ protected static String INTERNAL_ERROR_STRING = "Transaction aborted due to internal client error.";
/*
* callerThread is the currently running thread state.syncThread is blocking for
* this execution state.syncParent is the blocking request
*/
- final private boolean isExternal(int thread) {
- return thread == Integer.MIN_VALUE;
+ ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support) {
+ this.parentGraph = parentGraph;
+ this.parent = parent;
+ this.processor = support;
+ this.asyncBarrier = prepareBarrier(parentGraph, parent, null, false);
}
- ReadGraphImpl(ReadGraphImpl graph) {
- this(graph.parent, graph.processor);
+ ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support, AsyncBarrierImpl asyncBarrier) {
+ this.parentGraph = parentGraph;
+ this.parent = parent;
+ this.processor = support;
+ this.asyncBarrier = asyncBarrier;
+ }
+
+ ReadGraphImpl(ReadGraphImpl graph, CacheEntry parent) {
+ this(graph, parent, graph.processor);
}
- ReadGraphImpl(CacheEntry parent, QueryProcessor support) {
- this.parent = parent;
- this.processor = support;
+ ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ this(parentGraph, parent, parentGraph.processor, prepareBarrier(parentGraph, parent, callback, needsToBlock));
+ }
+
+ static AsyncBarrierImpl prepareBarrier(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ return new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback, needsToBlock);
+ }
+
+ ReadGraphImpl(ReadGraphImpl graph) {
+ this(graph, graph.parent);
}
- public static ReadGraphImpl create(QueryProcessor support) {
- return new ReadGraphImpl(null, support);
+ public ReadGraphImpl withParent(CacheEntry parent, Runnable callback, boolean needsToBlock) {
+ return new ReadGraphImpl(this, parent, callback, needsToBlock);
}
- public static ReadGraphImpl newAsync(ReadGraphImpl parent) {
- ReadGraphImpl result = new ReadGraphImpl(parent);
- result.asyncBarrier = new AsyncBarrierImpl(parent.asyncBarrier);
- return result;
+ public ReadGraphImpl forRecompute(CacheEntry parent) {
+ return new ReadGraphImpl(null, parent, processor);
}
+ public static ReadGraphImpl create(QueryProcessor support) {
+ ReadGraphImpl result = new ReadGraphImpl(null, null, support);
+ return result;
+ }
+
public ReadGraphImpl newRestart(ReadGraphImpl impl) {
WriteGraphImpl write = processor.getSession().getService(
}
- public ReadGraphImpl withParent(CacheEntry parent) {
- if(parent == this.parent) return this;
- else return new ReadGraphImpl(parent, processor);
- }
-
final private ListenerBase getListenerBase(final Object procedure) {
if (procedure instanceof ListenerBase)
return (ListenerBase) procedure;
}
Serializer serializer = binding.serializer();
byte[] bytes = serializer.serialize(initialValue);
+ // In case the file has been previously accessed and was larger we set the correct size now
+ rd.binaryFile.setLength(bytes.length);
rd.binaryFile.write(bytes);
ravs.put(resource, rd);
return rd;
return getValue(r, binding);
}
} else if(types.contains(L0.ExternalValue)) {
- try {
- return (T)ReflectionUtils.getValue(getURI(r)).getValue();
- } catch(ValueNotFoundException e) {
- throw new DatabaseException(e);
- } catch(ClassCastException e) {
- throw new DatabaseException(e);
+ ComputationalValue cv = syncRequest(new PossibleAdapter<ComputationalValue>(r, ComputationalValue.class), TransientCacheAsyncListener.instance());
+ if(cv != null) {
+ return cv.getValue(this, r);
+ } else {
+ // This should not even be possible since L0 defines an adapter for all values
+ try {
+ return (T)ReflectionUtils.getValue(getURI(r)).getValue();
+ } catch(ValueNotFoundException e) {
+ throw new DatabaseException(e);
+ } catch(ClassCastException e) {
+ throw new DatabaseException(e);
+ }
}
}
else {
@Override
public <T> T getRelatedValue2(Resource subject, Resource relation, Object context) throws DatabaseException {
if(Development.DEVELOPMENT) {
- String error = L0Validations.checkValueType(this, subject, relation);
- if(error != null) {
- Logger.defaultLogError(new ValidationException(error));
- //throw new ValidationException(error);
- new ValidationException(error).printStackTrace();
+ if(Development.<Boolean>getProperty(DevelopmentKeys.L0_VALIDATION, Bindings.BOOLEAN)) {
+ String error = L0Validations.checkValueType(this, subject, relation);
+ if(error != null) {
+ Logger.defaultLogError(new ValidationException(error));
+ throw new ValidationException(error);
+ }
}
}
return getValue2(getSingleObject(subject, relation), context);
@Override
public Variant getRelatedVariantValue2(Resource subject, Resource relation, Object context) throws DatabaseException {
if(Development.DEVELOPMENT) {
- String error = L0Validations.checkValueType(this, subject, relation);
- if(error != null) {
- Logger.defaultLogError(new ValidationException(error));
- //throw new ValidationException(error);
- new ValidationException(error).printStackTrace();
- }
+ if(Development.<Boolean>getProperty(DevelopmentKeys.L0_VALIDATION, Bindings.BOOLEAN)) {
+ String error = L0Validations.checkValueType(this, subject, relation);
+ if(error != null) {
+ Logger.defaultLogError(new ValidationException(error));
+ throw new ValidationException(error);
+ }
+ }
}
return getVariantValue2(getSingleObject(subject, relation), context);
}
@Override
public boolean performPending() {
- return processor.performPending(processor.thread.get());
+ return processor.performPending(this);
+ }
+
+ public Set<ReadGraphImpl> ancestorSet() {
+ HashSet<ReadGraphImpl> result = new HashSet<>();
+ ReadGraphImpl g = this;
+ while(g != null) {
+ result.add(g);
+ g = g.parentGraph;
+ }
+ return result;
+ }
+
+ public int getLevel() {
+ return getLevelStatic(this);
+ }
+
+ private static int getLevelStatic(ReadGraphImpl impl) {
+ if(impl == null) return 0;
+ else return 1 + getLevelStatic(impl.parentGraph);
+ }
+
+ public boolean isParent(ReadGraphImpl impl) {
+ if(impl == null) return false;
+ if(this == impl) return true;
+ return isParent(impl.parentGraph);
+ }
+
+ public ReadGraphImpl getTopLevelGraph() {
+ return getTopLevelGraphStatic(this);
+ }
+
+ private static ReadGraphImpl getTopLevelGraphStatic(ReadGraphImpl impl) {
+ if(impl.parentGraph == null) return impl;
+ else return getTopLevelGraphStatic(impl.parentGraph);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T l0() {
+ return (T) processor.getL0();
}
}