]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/ReadGraphImpl.java
Working towards multiple readers.
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / graph / ReadGraphImpl.java
index 299396ed715a6fb514b2474fd4b29933ab982b12..7070ee5b8b9552ed0ab9317124b2861b23833bfc 100644 (file)
@@ -89,9 +89,9 @@ import org.simantics.db.common.primitiverequest.UniqueAdapter;
 import org.simantics.db.common.primitiverequest.Value;
 import org.simantics.db.common.primitiverequest.ValueImplied;
 import org.simantics.db.common.primitiverequest.VariantValueImplied;
-import org.simantics.db.common.procedure.BlockingAsyncProcedure;
 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.adapter.TransientCacheAsyncListener;
 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
 import org.simantics.db.common.procedure.single.SyncReadProcedure;
@@ -107,6 +107,8 @@ import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncSetProcedure;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
@@ -114,7 +116,6 @@ import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncSetProcedure;
 import org.simantics.db.common.request.AdaptValue;
 import org.simantics.db.common.request.ResourceRead;
-import org.simantics.db.common.utils.Functions;
 import org.simantics.db.common.utils.Logger;
 import org.simantics.db.common.utils.NameUtils;
 import org.simantics.db.common.validation.L0Validations;
@@ -133,21 +134,19 @@ import org.simantics.db.exception.NoSingleResultException;
 import org.simantics.db.exception.ResourceNotFoundException;
 import org.simantics.db.exception.ServiceException;
 import org.simantics.db.exception.ValidationException;
+import org.simantics.db.impl.BlockingAsyncProcedure;
 import org.simantics.db.impl.RelationContextImpl;
 import org.simantics.db.impl.ResourceImpl;
 import org.simantics.db.impl.internal.RandomAccessValueSupport;
 import org.simantics.db.impl.internal.ResourceData;
-import org.simantics.db.impl.procedure.CallWrappedSingleQueryProcedure4;
-import org.simantics.db.impl.procedure.InternalProcedure;
-import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
-import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
+import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
 import org.simantics.db.impl.query.CacheEntry;
 import org.simantics.db.impl.query.QueryCache;
 import org.simantics.db.impl.query.QueryCacheBase;
 import org.simantics.db.impl.query.QueryProcessor;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 import org.simantics.db.impl.query.QuerySupport;
 import org.simantics.db.impl.query.TripleIntProcedure;
-import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 import org.simantics.db.impl.support.ResourceSupport;
 import org.simantics.db.procedure.AsyncListener;
 import org.simantics.db.procedure.AsyncMultiListener;
@@ -174,7 +173,6 @@ import org.simantics.db.request.ExternalRead;
 import org.simantics.db.request.MultiRead;
 import org.simantics.db.request.Read;
 import org.simantics.db.request.ReadInterface;
-import org.simantics.db.request.RequestFlags;
 import org.simantics.db.request.Write;
 import org.simantics.db.request.WriteInterface;
 import org.simantics.db.request.WriteOnly;
@@ -185,22 +183,26 @@ import org.simantics.scl.compiler.types.Type;
 import org.simantics.scl.compiler.types.exceptions.SCLTypeParseException;
 import org.simantics.scl.reflection.ReflectionUtils;
 import org.simantics.scl.reflection.ValueNotFoundException;
-import org.simantics.scl.runtime.function.Function1;
 import org.simantics.scl.runtime.function.Function3;
 import org.simantics.utils.DataContainer;
 import org.simantics.utils.Development;
 import org.simantics.utils.datastructures.Pair;
 import org.simantics.utils.datastructures.collections.CollectionUtils;
+import org.slf4j.LoggerFactory;
 
 import gnu.trove.map.hash.TObjectIntHashMap;
 
-public class ReadGraphImpl implements ReadGraph {
+public class ReadGraphImpl implements AsyncReadGraph {
+
+    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(ReadGraphImpl.class);
 
     final static boolean EMPTY_RESOURCE_CHECK = false;
     
        final public CacheEntry parent;
        final public QueryProcessor processor;
        
+       public AsyncBarrierImpl asyncBarrier = null;
+       
        final static Binding DATA_TYPE_BINDING_INTERNAL = Bindings.getBindingUnchecked(Datatype.class);
        final static Serializer DATA_TYPE_SERIALIZER = Bindings.getSerializerUnchecked(DATA_TYPE_BINDING_INTERNAL);
 
@@ -2091,14 +2093,10 @@ public class ReadGraphImpl implements ReadGraph {
                        throws DatabaseException {
 
                assert (request != null);
-//             AsyncReadProcedure<T> procedure = new AsyncReadProcedure<T>();
+               asyncBarrier = new AsyncBarrierImpl(null);
                BlockingAsyncProcedure<T> ap = new BlockingAsyncProcedure<>(this, null, request);
                syncRequest(request, ap);
                return ap.get();
-//             procedure.checkAndThrow();
-//             return procedure.result;
-               
-//             return syncRequest(request, new AsyncProcedureAdapter<T>());
 
        }
 
@@ -2132,6 +2130,8 @@ public class ReadGraphImpl implements ReadGraph {
                
 //             final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
 //                             procedure, request);
+
+               ReadGraphImpl async = ReadGraphImpl.newAsync(this);
                
                QueryCache.runnerAsyncReadEntry(this, request, parent, listener, ap, true);
                
@@ -2239,21 +2239,21 @@ public class ReadGraphImpl implements ReadGraph {
                final ArrayList<T> result = new ArrayList<T>();
                final DataContainer<Throwable> exception = new DataContainer<Throwable>();
 
-               syncRequest(request, new AsyncMultiProcedure<T>() {
+               syncRequest(request, new SyncMultiProcedure<T>() {
 
                        @Override
-                       public void execute(AsyncReadGraph graph, T t) {
+                       public void execute(ReadGraph graph, T t) {
                                synchronized (result) {
                                        result.add(t);
                                }
                        }
 
                        @Override
-                       public void finished(AsyncReadGraph graph) {
+                       public void finished(ReadGraph graph) {
                        }
 
                        @Override
-                       public void exception(AsyncReadGraph graph, Throwable t) {
+                       public void exception(ReadGraph graph, Throwable t) {
                                exception.set(t);
                        }
 
@@ -2278,24 +2278,25 @@ public class ReadGraphImpl implements ReadGraph {
 
        }
 
-       @Override
+/*     @Override
        public <T> Collection<T> syncRequest(MultiRead<T> request,
                        AsyncMultiListener<T> procedure) {
                return syncRequest(request, (AsyncMultiProcedure<T>) procedure);
-       }
+       }*/
 
        @Override
        public <T> Collection<T> syncRequest(MultiRead<T> request,
                        SyncMultiListener<T> procedure) {
-               return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+               return syncRequest(request, (SyncMultiProcedure<T>)procedure);
        }
 
        @Override
        public <T> Collection<T> syncRequest(MultiRead<T> request,
                        MultiListener<T> procedure) {
-               return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
+               return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
        }
 
+       /*
        @Override
        public <T> Collection<T> syncRequest(MultiRead<T> request,
                        AsyncMultiProcedure<T> procedure) {
@@ -2339,17 +2340,51 @@ public class ReadGraphImpl implements ReadGraph {
                return null;
 
        }
+       */
 
        @Override
        public <T> Collection<T> syncRequest(MultiRead<T> request,
                        SyncMultiProcedure<T> procedure) {
-               return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+
+               assert (request != null);
+
+               ListenerBase listener = getListenerBase(procedure);
+
+               final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
+
+               if (parent != null || listener != null) {
+
+//                     Object syncParent = request;
+
+//                     final ReadGraphImpl newGraph = newSync();
+
+                       processor.query(this, request, parent, wrapper, listener);
+
+//                     newGraph.waitAsync(syncParent);
+
+               } else {
+
+//                     Object syncParent = request;
+
+//                     final ReadGraphImpl newGraph = newSync();
+
+
+                       try {
+                               request.perform(this, wrapper);
+                       } catch (Throwable t) {
+                               wrapper.exception(this, t);
+                       }
+
+               }
+
+               return wrapper.get();
+
        }
 
        @Override
        public <T> Collection<T> syncRequest(MultiRead<T> request,
                        MultiProcedure<T> procedure) {
-               return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+               return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
        }
 
        static class AsyncMultiReadProcedure<T> extends ArrayList<T> implements AsyncMultiProcedure<T> {
@@ -3327,18 +3362,18 @@ public class ReadGraphImpl implements ReadGraph {
 //     }
        
        @Override
-       final public void forEachDirectPredicate(final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
+       final public void forEachDirectPredicate(final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
                processor.forEachDirectPredicate(this, subject, procedure);
        }
 
        @Override
-       public void forEachDirectPredicate(Resource subject, SyncMultiProcedure<Resource> procedure) {
-               forEachDirectPredicate(subject, new SyncToAsyncMultiProcedure<Resource>(procedure));
+       final public void forEachDirectPredicate(final Resource subject, final SyncProcedure<Set<Resource>> procedure) {
+               forEachDirectPredicate(subject, new SyncToAsyncProcedure<Set<Resource>>(procedure));
        }
 
        @Override
-       public void forEachDirectPredicate(Resource subject, MultiProcedure<Resource> procedure) {
-               forEachDirectPredicate(subject, new NoneToAsyncMultiProcedure<Resource>(procedure));
+       public void forEachDirectPredicate(Resource subject, Procedure<Set<Resource>> procedure) {
+               forEachDirectPredicate(subject, new NoneToAsyncProcedure<Set<Resource>>(procedure));
        }
 
        @Override
@@ -5560,9 +5595,9 @@ public class ReadGraphImpl implements ReadGraph {
 
                assert (request != null);
 
-               asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
+               asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
                        @Override
-                       public void exception(AsyncReadGraph graph, Throwable t) {
+                       public void exception(ReadGraph graph, Throwable t) {
                        Logger.defaultLogError(t);
                        }
 
@@ -5574,27 +5609,22 @@ public class ReadGraphImpl implements ReadGraph {
 
        }
 
-       @Override
-       public <T> void asyncRequest(MultiRead<T> request,
-                       AsyncMultiListener<T> procedure) {
-               asyncRequest(request, (AsyncMultiProcedure<T>) procedure);
-       }
-
        @Override
        public <T> void asyncRequest(MultiRead<T> request,
                        SyncMultiListener<T> procedure) {
-               asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+               asyncRequest(request, (SyncMultiProcedure<T>)procedure);
        }
 
        @Override
        public <T> void asyncRequest(MultiRead<T> request,
                        MultiListener<T> procedure) {
-               asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
+               asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
        }
 
+       
        @Override
        public <T> void asyncRequest(final MultiRead<T> request,
-                       final AsyncMultiProcedure<T> procedure) {
+                       final SyncMultiProcedure<T> procedure) {
 
                assert (request != null);
                assert (procedure != null);
@@ -5616,7 +5646,11 @@ public class ReadGraphImpl implements ReadGraph {
 
                    } catch (Throwable t) {
 
-                       procedure.exception(this, t);
+                       try {
+                                       procedure.exception(this, t);
+                               } catch (DatabaseException e) {
+                                       LOGGER.error("Unexpected exception while handling exception", e);
+                               }
 
                    }                           
 
@@ -5624,16 +5658,10 @@ public class ReadGraphImpl implements ReadGraph {
 
        }
 
-       @Override
-       public <T> void asyncRequest(MultiRead<T> request,
-                       SyncMultiProcedure<T> procedure) {
-               asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
-       }
-
        @Override
        public <T> void asyncRequest(MultiRead<T> request,
                        MultiProcedure<T> procedure) {
-               asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+               asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
        }
 
        @Override
@@ -6019,6 +6047,12 @@ public class ReadGraphImpl implements ReadGraph {
        public static ReadGraphImpl create(QueryProcessor support) {
                return new ReadGraphImpl(null, support);
        }
+       
+       public static ReadGraphImpl newAsync(ReadGraphImpl parent) {
+               ReadGraphImpl result = new ReadGraphImpl(parent);
+               result.asyncBarrier = new AsyncBarrierImpl(parent.asyncBarrier);
+               return result;
+       }
 
        public ReadGraphImpl newRestart(ReadGraphImpl impl) {