]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Fixes based on feedback
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
index 9290ac99fdd98a5792a542a0b51b7309941f1704..a08011fed1172615a2f937cb9de08bc099f3f1de 100644 (file)
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
  * in Industry THTH ry.
  * All rights reserved. This program and the accompanying materials
  * are made available under the terms of the Eclipse Public License v1.0
@@ -39,6 +39,7 @@ import org.simantics.db.ExternalValueSupport;
 import org.simantics.db.Metadata;
 import org.simantics.db.MonitorContext;
 import org.simantics.db.MonitorHandler;
+import org.simantics.db.ReadGraph;
 import org.simantics.db.Resource;
 import org.simantics.db.ResourceSerializer;
 import org.simantics.db.Session;
@@ -52,10 +53,13 @@ import org.simantics.db.common.Indexing;
 import org.simantics.db.common.TransactionPolicyRelease;
 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
@@ -75,6 +79,7 @@ import org.simantics.db.exception.ResourceNotFoundException;
 import org.simantics.db.exception.RuntimeDatabaseException;
 import org.simantics.db.exception.ServiceException;
 import org.simantics.db.exception.ServiceNotFoundException;
+import org.simantics.db.impl.BlockingAsyncProcedure;
 import org.simantics.db.impl.ClusterBase;
 import org.simantics.db.impl.ClusterI;
 import org.simantics.db.impl.ClusterTraitsBase;
@@ -88,7 +93,9 @@ import org.simantics.db.impl.graph.WriteGraphImpl;
 import org.simantics.db.impl.graph.WriteSupport;
 import org.simantics.db.impl.internal.RandomAccessValueSupport;
 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
-import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
+import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
+import org.simantics.db.impl.query.QueryCache;
+import org.simantics.db.impl.query.QueryCacheBase;
 import org.simantics.db.impl.query.QueryProcessor;
 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
@@ -407,9 +414,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
             }
         }
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
             public void run(int thread) {
@@ -466,7 +471,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     }
 
 
-                    assert(!queryProvider2.dirty);
+                    assert(!queryProvider2.cache.dirty);
 
                 } catch (Throwable e) {
 
@@ -499,7 +504,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 //                        state.cancelCommit(context, clusterStream);
 //                        if (!empty) {
 //                            if (!context.isOk()) // this is a blocking operation
-//                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
+//                                throw new InternalException("Cancel failed. This should never happen.");
 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
 //                        }
 //                        state.cancelCommit2(context, clusterStream);
@@ -545,9 +550,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
             public void run(int thread) {
@@ -631,9 +634,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
             public void run(int thread) {
@@ -1022,15 +1023,13 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                        try {
 
                                int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
-                               int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
-                               int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
 
                                ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
                                ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
 
                                clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
-                               clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
-                               clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
+                               clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
+                               clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
                                clusterTranslator.removeStatement(cluster);
 
                                queryProvider2.invalidateResource(s);
@@ -1372,9 +1371,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
             public void run(int thread) {
@@ -1472,9 +1469,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
             public void run(int thread) {
@@ -1496,9 +1491,9 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         assert (request != null);
         assert (procedure != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+        //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+        requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
             public void run(int thread) {
@@ -1514,7 +1509,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     if (listener != null) {
 
                        try {
-                               newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
+                               
+                               AsyncProcedure ap = new AsyncProcedure<T>() {
 
                                        @Override
                                        public void exception(AsyncReadGraph graph, Throwable t) {
@@ -1532,7 +1528,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                                                procedure.execute(graph, t);
                                        }
 
-                               }, listener);
+                               };
+                               
+                               QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
+                               
                        } catch (Throwable t) {
                            // This is handled by the AsyncProcedure
                                //Logger.defaultLogError("Internal error", t);
@@ -1612,7 +1611,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
+        requestManager.scheduleRead(new SessionRead(null, notify) {
 
             @Override
             public void run(int thread) {
@@ -1621,6 +1620,70 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
 
+                try {
+
+                    if (listener != null) {
+
+                        try {
+                               QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
+                               //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
+                                                       //newGraph.processor.query(newGraph, request, null, procedure, listener);
+                                               } catch (DatabaseException e) {
+                                                       Logger.defaultLogError(e);
+                                               }
+
+                    } else {
+
+//                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
+//                                procedure, "request");
+
+                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
+
+                       try {
+
+                            request.perform(newGraph, wrap);
+                            wrap.get();
+
+                        } catch (DatabaseException e) {
+
+                                                       Logger.defaultLogError(e);
+
+                        }
+
+                    }
+
+                } finally {
+
+                    fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+                }
+
+            }
+
+        });
+
+    }
+
+    public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
+
+        assert (request != null);
+        assert (procedure != null);
+
+        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+
+        int sync = notify != null ? thread : -1;
+
+        requestManager.scheduleRead(new SessionRead(null, notify) {
+
+            @Override
+            public void run(int thread) {
+
+                fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+                ListenerBase listener = getListenerBase(procedure);
+
+                final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+
                 try {
 
                     if (listener != null) {
@@ -1631,22 +1694,15 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                     } else {
 
-                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
-                                procedure, "request");
+                        final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
 
                         try {
 
-//                            newGraph.state.barrier.inc();
-
                             request.perform(newGraph, wrapper);
 
-//                            newGraph.waitAsync(request);
-
                         } catch (Throwable t) {
 
-                            wrapper.exception(newGraph, t);
-//                            newGraph.waitAsync(request);
-
+                            t.printStackTrace();
 
                         }
 
@@ -1663,7 +1719,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         });
 
     }
-
+    
     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
 
         assert (request != null);
@@ -1673,7 +1729,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int sync = notify != null ? thread : -1;
 
-        requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
+        requestManager.scheduleRead(new SessionRead(null, notify) {
 
             @Override
             public void run(int thread) {
@@ -1727,7 +1783,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+        requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
             public void run(int thread) {
@@ -1742,25 +1798,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                     if (listener != null) {
 
-                        newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
-
-                            @Override
-                            public void exception(Throwable t) {
-                                procedure.exception(t);
-                                if(throwable != null) {
-                                    throwable.set(t);
-                                }
-                            }
-
-                            @Override
-                            public void execute(T t) {
-                                if(result != null) result.set(t);
-                                procedure.execute(t);
-                            }
-
-                        }, listener);
-
-//                        newGraph.waitAsync(request);
+                        try {
+                            QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
+                        } catch (DatabaseException e) {
+                            Logger.defaultLogError(e);
+                        }
 
                     } else {
 
@@ -3050,21 +3092,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         final ArrayList<T> result = new ArrayList<T>();
         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
 
-        syncRequest(request, new AsyncMultiProcedure<T>() {
+        syncRequest(request, new SyncMultiProcedure<T>() {
 
             @Override
-            public void execute(AsyncReadGraph graph, T t) {
+            public void execute(ReadGraph graph, T t) {
                 synchronized(result) {
                     result.add(t);
                 }
             }
 
             @Override
-            public void finished(AsyncReadGraph graph) {
+            public void finished(ReadGraph graph) {
             }
 
             @Override
-            public void exception(AsyncReadGraph graph, Throwable t) {
+            public void exception(ReadGraph graph, Throwable t) {
                 exception.set(t);
             }
 
@@ -3082,33 +3124,27 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
     }
 
     @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
+    public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
+        throw new Error("Not implemented!");
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+        return syncRequest(request, (SyncMultiProcedure<T>)procedure);
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
-    }
-
-    @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
-        assertNotSession();
-        return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+        return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+        return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
     }
 
     @Override
@@ -3264,38 +3300,33 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert(request != null);
 
-        asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
+        asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
             @Override
-            public void exception(AsyncReadGraph graph, Throwable t) {
+            public void exception(ReadGraph graph, Throwable t) {
                 t.printStackTrace();
             }
         });
 
     }
 
-    @Override
-    public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
-        asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
-    }
-
     @Override
     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
-        asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+        asyncRequest(request, (SyncMultiProcedure<T>)procedure);
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
-        asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
+        asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
-        asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+       scheduleRequest(request, procedure, null);
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
-        asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+        asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
     }
 
     @Override
@@ -3337,17 +3368,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
     }
 
-    @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
-        assertNotSession();
-        throw new Error("Not implemented!");
-    }
-
-    @Override
-    public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
-        throw new Error("Not implemented!");
-    }
-
     @Override
     final public <T> void asyncRequest(final ExternalRead<T> request) {