X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fgraph%2FReadGraphImpl.java;h=48662d1903a08267a13dded57e04e04f8b85db6d;hp=b5da0b92f7bae6521487e2fa0907ca2b96a3c09b;hb=472f4b2af82d02fb46f9a929fb6c8090bb4c5301;hpb=0d9b90834ce56b292c00b1a39850ed842c3e4d42 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java index b5da0b92f..48662d190 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java @@ -18,10 +18,10 @@ import java.io.IOException; import java.io.PrintStream; import java.lang.reflect.Array; import java.lang.reflect.InvocationTargetException; -import java.nio.BufferUnderflowException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; @@ -45,6 +45,7 @@ import org.simantics.databoard.type.Datatype; import org.simantics.databoard.util.binary.BinaryFile; import org.simantics.databoard.util.binary.RandomAccessBinary; import org.simantics.db.AsyncReadGraph; +import org.simantics.db.ComputationalValue; import org.simantics.db.DevelopmentKeys; import org.simantics.db.ExternalValueSupport; import org.simantics.db.ReadGraph; @@ -89,6 +90,7 @@ import org.simantics.db.common.primitiverequest.Value; import org.simantics.db.common.primitiverequest.ValueImplied; import org.simantics.db.common.primitiverequest.VariantValueImplied; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; +import org.simantics.db.common.procedure.adapter.AsyncProcedureAdapter; import org.simantics.db.common.procedure.adapter.ProcedureAdapter; import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter; import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener; @@ -133,7 +135,6 @@ import org.simantics.db.exception.NoSingleResultException; import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.exception.ServiceException; import org.simantics.db.exception.ValidationException; -import org.simantics.db.impl.BlockingAsyncProcedure; import org.simantics.db.impl.RelationContextImpl; import org.simantics.db.impl.ResourceImpl; import org.simantics.db.impl.internal.RandomAccessValueSupport; @@ -187,6 +188,8 @@ import org.simantics.utils.DataContainer; import org.simantics.utils.Development; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.datastructures.collections.CollectionUtils; +import org.simantics.utils.threads.logger.ITask; +import org.simantics.utils.threads.logger.ThreadLogger; import org.slf4j.LoggerFactory; import gnu.trove.map.hash.TObjectIntHashMap; @@ -198,9 +201,10 @@ public class ReadGraphImpl implements AsyncReadGraph { final static boolean EMPTY_RESOURCE_CHECK = false; final public CacheEntry parent; + public final ReadGraphImpl parentGraph; final public QueryProcessor processor; - public AsyncBarrierImpl asyncBarrier = null; + public final AsyncBarrierImpl asyncBarrier; final static Binding DATA_TYPE_BINDING_INTERNAL = Bindings.getBindingUnchecked(Datatype.class); final static Serializer DATA_TYPE_SERIALIZER = Bindings.getSerializerUnchecked(DATA_TYPE_BINDING_INTERNAL); @@ -1104,9 +1108,10 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (subject != null); + byte[] bytes = null; try { - byte[] bytes = processor.getValue(this, subject); + bytes = processor.getValue(this, subject); if (bytes == null) throw new DoesNotContainValueException("No value for resource " + subject); Serializer serializer = getSerializer(binding); @@ -1116,20 +1121,9 @@ public class ReadGraphImpl implements AsyncReadGraph { throw new DoesNotContainValueException(e); - } catch (IOException e) { - - throw new ServiceException(e); - - } catch (DatabaseException e) { - - throw new ServiceException(e); - - } catch (BufferUnderflowException e) { - // This is sometimes thrown when deserialize fails because wrong format. - // For callers of this method this is just an service exception. - throw new ServiceException(e); - } - + } catch (Throwable t) { + throw new ServiceException("Could not getValue for subject " + debugString(subject) + " and binding " + String.valueOf(binding) + " with bytes " + safeArrayToString(bytes), t); + } } @Override @@ -1930,11 +1924,11 @@ public class ReadGraphImpl implements AsyncReadGraph { @Override public T syncRequest(final Read request) throws DatabaseException { - assert (request != null); - return QueryCache.resultReadEntry(this, request, parent, null, null); - + T result = (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true); + return result; + } @Override @@ -1955,8 +1949,8 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); ListenerBase listener = procedure != null ? getListenerBase(procedure) : null; - - return QueryCache.resultReadEntry(this, request, parent, listener, procedure); + T result = (T)QueryCache.runnerReadEntry(this, request, parent, listener, procedure, true); + return result; } @@ -2012,10 +2006,7 @@ public class ReadGraphImpl implements AsyncReadGraph { throws DatabaseException { assert (request != null); - asyncBarrier = new AsyncBarrierImpl(null); - BlockingAsyncProcedure ap = new BlockingAsyncProcedure<>(this, null, request); - syncRequest(request, ap); - return ap.get(); + return syncRequest(request, new AsyncProcedureAdapter<>() ); } @@ -2044,27 +2035,8 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); ListenerBase listener = getListenerBase(procedure); - - BlockingAsyncProcedure ap = new BlockingAsyncProcedure<>(this, procedure, request); - - QueryCache.runnerAsyncReadEntry(this, request, parent, listener, ap, true); - - return ap.get(); - - } - - final private void syncRequest(final AsyncRead request, final AsyncReadProcedure procedure) throws DatabaseException { - - assert (request != null); - - ListenerBase listener = getListenerBase(procedure); - assert(listener == null); - - BlockingAsyncProcedure ap = new BlockingAsyncProcedure<>(this, procedure, request); - - QueryCache.runnerAsyncReadEntry(this, request, parent, listener, ap, true); - - ap.get(); + T result = (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true); + return result; } @@ -3564,14 +3536,9 @@ public class ReadGraphImpl implements AsyncReadGraph { // else procedure.execute(graph, (T) obj); - } catch (IOException e) { - procedure.exception(graph, e); - } catch (BufferUnderflowException e) { - procedure.exception(graph, e); } catch (Throwable t) { - procedure.exception(graph, t); + procedure.exception(graph, new ServiceException("Could not forValue for subject " + debugString(resource) + " and binding " + String.valueOf(binding) + " with bytes " + safeArrayToString(result), t)); } - } @Override @@ -3591,6 +3558,24 @@ public class ReadGraphImpl implements AsyncReadGraph { }); } + + private static String safeArrayToString(byte[] a) { + if (a == null) + return "null"; + int iMax = a.length - 1; + if (iMax == -1) + return "[]"; + + StringBuilder b = new StringBuilder(); + b.append('['); + for (int i = 0; i < 100; i++) { // limit to first 100 items + b.append(a[i]); + if (i == iMax) + return b.append(']').toString(); + b.append(", "); + } + return b.append(", ... (" + a.length + ")]").toString(); + } @Override public void forValue(Resource subject, Binding binding, @@ -4285,14 +4270,9 @@ public class ReadGraphImpl implements AsyncReadGraph { else procedure.execute(graph, (T) obj); - } catch (IOException e) { - procedure.exception(graph, e); - } catch (BufferUnderflowException e) { - procedure.exception(graph, e); } catch (Throwable t) { - procedure.exception(graph, t); + procedure.exception(graph, new ServiceException("Could not forValue for subject " + debugString(resource) + " and binding " + String.valueOf(binding) + " with bytes " + safeArrayToString(result), t)); } - } @Override @@ -5174,16 +5154,23 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); assert (procedure != null); - - processor.schedule(new SessionTask(false) { + + AsyncBarrierImpl barrier = asyncBarrier; + if(barrier != null) + barrier.inc(); + + processor.scheduleNow(new SessionTask(this) { @Override - public void run(int thread) { + public void run0(int thread) { try { final ListenerBase listener = getListenerBase(procedure); QueryCache.runnerReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false); } catch (DatabaseException e) { Logger.defaultLogError(e); + } finally { + if(barrier != null) + barrier.dec(); } } @@ -5192,7 +5179,7 @@ public class ReadGraphImpl implements AsyncReadGraph { } public static ReadGraphImpl createAsync(QueryProcessor support) { - return new ReadGraphImpl(null, support); + return new ReadGraphImpl(null, null, support); } @Override @@ -5255,13 +5242,39 @@ public class ReadGraphImpl implements AsyncReadGraph { assert (request != null); assert (procedure != null); - processor.schedule(new SessionTask(false) { + AsyncBarrierImpl barrier = asyncBarrier; + if(barrier != null) + barrier.inc(); + + processor.scheduleNow(new SessionTask(this) { @Override - public void run(int thread) { - try { + public void run0(int thread) { + + if(barrier != null) + barrier.inc(); + + try { final ListenerBase listener = getListenerBase(procedure); - QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false); + QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, new AsyncProcedure() { + + @Override + public void execute(AsyncReadGraph graph, T result) { + procedure.execute(graph, result); + if(barrier != null) + barrier.dec(); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + procedure.exception(graph, throwable); + if(barrier != null) + barrier.dec(); + } + + }, false); + if(barrier != null) + barrier.dec(); } catch (DatabaseException e) { Logger.defaultLogError(e); } @@ -5638,36 +5651,56 @@ public class ReadGraphImpl implements AsyncReadGraph { * Internal routines */ - protected static String INTERNAL_ERROR_STRING = "Transaction aborted due to internal client error. Contact application support."; + protected static String INTERNAL_ERROR_STRING = "Transaction aborted due to internal client error."; /* * callerThread is the currently running thread state.syncThread is blocking for * this execution state.syncParent is the blocking request */ - final private boolean isExternal(int thread) { - return thread == Integer.MIN_VALUE; + ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support) { + this.parentGraph = parentGraph; + this.parent = parent; + this.processor = support; + this.asyncBarrier = prepareBarrier(parentGraph, parent, null, false); } - ReadGraphImpl(ReadGraphImpl graph) { - this(graph.parent, graph.processor); + ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, QueryProcessor support, AsyncBarrierImpl asyncBarrier) { + this.parentGraph = parentGraph; + this.parent = parent; + this.processor = support; + this.asyncBarrier = asyncBarrier; + } + + ReadGraphImpl(ReadGraphImpl graph, CacheEntry parent) { + this(graph, parent, graph.processor); } - ReadGraphImpl(CacheEntry parent, QueryProcessor support) { - this.parent = parent; - this.processor = support; + ReadGraphImpl(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) { + this(parentGraph, parent, parentGraph.processor, prepareBarrier(parentGraph, parent, callback, needsToBlock)); + } + + static AsyncBarrierImpl prepareBarrier(ReadGraphImpl parentGraph, CacheEntry parent, Runnable callback, boolean needsToBlock) { + return new AsyncBarrierImpl(parentGraph != null ? parentGraph.asyncBarrier : null, parent, callback, needsToBlock); + } + + ReadGraphImpl(ReadGraphImpl graph) { + this(graph, graph.parent); } - public static ReadGraphImpl create(QueryProcessor support) { - return new ReadGraphImpl(null, support); + public ReadGraphImpl withParent(CacheEntry parent, Runnable callback, boolean needsToBlock) { + return new ReadGraphImpl(this, parent, callback, needsToBlock); } - public static ReadGraphImpl newAsync(ReadGraphImpl parent) { - ReadGraphImpl result = new ReadGraphImpl(parent); - result.asyncBarrier = new AsyncBarrierImpl(parent.asyncBarrier); - return result; + public ReadGraphImpl forRecompute(CacheEntry parent) { + return new ReadGraphImpl(null, parent, processor); } + public static ReadGraphImpl create(QueryProcessor support) { + ReadGraphImpl result = new ReadGraphImpl(null, null, support); + return result; + } + public ReadGraphImpl newRestart(ReadGraphImpl impl) { WriteGraphImpl write = processor.getSession().getService( @@ -5677,11 +5710,6 @@ public class ReadGraphImpl implements AsyncReadGraph { } - public ReadGraphImpl withParent(CacheEntry parent) { - if(parent == this.parent) return this; - else return new ReadGraphImpl(parent, processor); - } - final private ListenerBase getListenerBase(final Object procedure) { if (procedure instanceof ListenerBase) return (ListenerBase) procedure; @@ -5938,6 +5966,8 @@ public class ReadGraphImpl implements AsyncReadGraph { } Serializer serializer = binding.serializer(); byte[] bytes = serializer.serialize(initialValue); + // In case the file has been previously accessed and was larger we set the correct size now + rd.binaryFile.setLength(bytes.length); rd.binaryFile.write(bytes); ravs.put(resource, rd); return rd; @@ -6127,12 +6157,18 @@ public class ReadGraphImpl implements AsyncReadGraph { return getValue(r, binding); } } else if(types.contains(L0.ExternalValue)) { - try { - return (T)ReflectionUtils.getValue(getURI(r)).getValue(); - } catch(ValueNotFoundException e) { - throw new DatabaseException(e); - } catch(ClassCastException e) { - throw new DatabaseException(e); + ComputationalValue cv = syncRequest(new PossibleAdapter(r, ComputationalValue.class), TransientCacheAsyncListener.instance()); + if(cv != null) { + return cv.getValue(this, r); + } else { + // This should not even be possible since L0 defines an adapter for all values + try { + return (T)ReflectionUtils.getValue(getURI(r)).getValue(); + } catch(ValueNotFoundException e) { + throw new DatabaseException(e); + } catch(ClassCastException e) { + throw new DatabaseException(e); + } } } else { @@ -6197,11 +6233,12 @@ public class ReadGraphImpl implements AsyncReadGraph { @Override public T getRelatedValue2(Resource subject, Resource relation, Object context) throws DatabaseException { if(Development.DEVELOPMENT) { - String error = L0Validations.checkValueType(this, subject, relation); - if(error != null) { - Logger.defaultLogError(new ValidationException(error)); - //throw new ValidationException(error); - new ValidationException(error).printStackTrace(); + if(Development.getProperty(DevelopmentKeys.L0_VALIDATION, Bindings.BOOLEAN)) { + String error = L0Validations.checkValueType(this, subject, relation); + if(error != null) { + Logger.defaultLogError(new ValidationException(error)); + throw new ValidationException(error); + } } } return getValue2(getSingleObject(subject, relation), context); @@ -6210,12 +6247,13 @@ public class ReadGraphImpl implements AsyncReadGraph { @Override public Variant getRelatedVariantValue2(Resource subject, Resource relation, Object context) throws DatabaseException { if(Development.DEVELOPMENT) { - String error = L0Validations.checkValueType(this, subject, relation); - if(error != null) { - Logger.defaultLogError(new ValidationException(error)); - //throw new ValidationException(error); - new ValidationException(error).printStackTrace(); - } + if(Development.getProperty(DevelopmentKeys.L0_VALIDATION, Bindings.BOOLEAN)) { + String error = L0Validations.checkValueType(this, subject, relation); + if(error != null) { + Logger.defaultLogError(new ValidationException(error)); + throw new ValidationException(error); + } + } } return getVariantValue2(getSingleObject(subject, relation), context); } @@ -6324,7 +6362,41 @@ public class ReadGraphImpl implements AsyncReadGraph { @Override public boolean performPending() { - return processor.performPending(processor.thread.get()); + return processor.performPending(this); + } + + public Set ancestorSet() { + HashSet result = new HashSet<>(); + ReadGraphImpl g = this; + while(g != null) { + result.add(g); + g = g.parentGraph; + } + return result; + } + + public int getLevel() { + return getLevelStatic(this); + } + + private static int getLevelStatic(ReadGraphImpl impl) { + if(impl == null) return 0; + else return 1 + getLevelStatic(impl.parentGraph); + } + + public boolean isParent(ReadGraphImpl impl) { + if(impl == null) return false; + if(this == impl) return true; + return isParent(impl.parentGraph); + } + + public ReadGraphImpl getTopLevelGraph() { + return getTopLevelGraphStatic(this); + } + + private static ReadGraphImpl getTopLevelGraphStatic(ReadGraphImpl impl) { + if(impl.parentGraph == null) return impl; + else return getTopLevelGraphStatic(impl.parentGraph); } }