]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Multiple reader thread support for db client
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
index d71a8bfc81c491d52dc7afe30cae65a5e8e95d60..f5b6611b7e72b53a1ae9fbe2c40095d5dbac1e5e 100644 (file)
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
  * in Industry THTH ry.
  * All rights reserved. This program and the accompanying materials
  * are made available under the terms of the Eclipse Public License v1.0
@@ -39,6 +39,7 @@ import org.simantics.db.ExternalValueSupport;
 import org.simantics.db.Metadata;
 import org.simantics.db.MonitorContext;
 import org.simantics.db.MonitorHandler;
+import org.simantics.db.ReadGraph;
 import org.simantics.db.Resource;
 import org.simantics.db.ResourceSerializer;
 import org.simantics.db.Session;
@@ -52,10 +53,13 @@ import org.simantics.db.common.Indexing;
 import org.simantics.db.common.TransactionPolicyRelease;
 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
@@ -75,6 +79,7 @@ import org.simantics.db.exception.ResourceNotFoundException;
 import org.simantics.db.exception.RuntimeDatabaseException;
 import org.simantics.db.exception.ServiceException;
 import org.simantics.db.exception.ServiceNotFoundException;
+import org.simantics.db.impl.BlockingAsyncProcedure;
 import org.simantics.db.impl.ClusterBase;
 import org.simantics.db.impl.ClusterI;
 import org.simantics.db.impl.ClusterTraitsBase;
@@ -88,7 +93,9 @@ import org.simantics.db.impl.graph.WriteGraphImpl;
 import org.simantics.db.impl.graph.WriteSupport;
 import org.simantics.db.impl.internal.RandomAccessValueSupport;
 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
-import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
+import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
+import org.simantics.db.impl.query.QueryCache;
+import org.simantics.db.impl.query.QueryCacheBase;
 import org.simantics.db.impl.query.QueryProcessor;
 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
@@ -409,7 +416,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+        requestManager.scheduleWrite(new SessionTask(true) {
 
             @Override
             public void run(int thread) {
@@ -466,7 +473,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     }
 
 
-                    assert(!queryProvider2.dirty);
+                    assert(!queryProvider2.cache.dirty);
 
                 } catch (Throwable e) {
 
@@ -547,7 +554,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(true) {
 
             @Override
             public void run(int thread) {
@@ -633,7 +640,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(true) {
 
             @Override
             public void run(int thread) {
@@ -1372,7 +1379,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+        requestManager.scheduleWrite(new SessionTask(true) {
 
             @Override
             public void run(int thread) {
@@ -1472,7 +1479,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(true) {
 
             @Override
             public void run(int thread) {
@@ -1494,9 +1501,9 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         assert (request != null);
         assert (procedure != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+        //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+        requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
             public void run(int thread) {
@@ -1512,7 +1519,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     if (listener != null) {
 
                        try {
-                               newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
+                               
+                               AsyncProcedure ap = new AsyncProcedure<T>() {
 
                                        @Override
                                        public void exception(AsyncReadGraph graph, Throwable t) {
@@ -1530,7 +1538,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                                                procedure.execute(graph, t);
                                        }
 
-                               }, listener);
+                               };
+                               
+                               QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
+                               
                        } catch (Throwable t) {
                            // This is handled by the AsyncProcedure
                                //Logger.defaultLogError("Internal error", t);
@@ -1610,13 +1621,77 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
+        requestManager.scheduleRead(new SessionRead(null, notify) {
+
+            @Override
+            public void run(int thread) {
+
+                fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+                final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+
+                try {
+
+                    if (listener != null) {
+
+                        try {
+                               QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
+                               //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
+                                                       //newGraph.processor.query(newGraph, request, null, procedure, listener);
+                                               } catch (DatabaseException e) {
+                                                       Logger.defaultLogError(e);
+                                               }
+
+                    } else {
+
+//                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
+//                                procedure, "request");
+
+                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
+
+                       try {
+
+                            request.perform(newGraph, wrap);
+                            wrap.get();
+
+                        } catch (DatabaseException e) {
+
+                                                       Logger.defaultLogError(e);
+
+                        }
+
+                    }
+
+                } finally {
+
+                    fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+                }
+
+            }
+
+        });
+
+    }
+
+    public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
+
+        assert (request != null);
+        assert (procedure != null);
+
+        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+
+        int sync = notify != null ? thread : -1;
+
+        requestManager.scheduleRead(new SessionRead(null, notify) {
 
             @Override
             public void run(int thread) {
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
+                ListenerBase listener = getListenerBase(procedure);
+
                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
 
                 try {
@@ -1629,22 +1704,15 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                     } else {
 
-                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
-                                procedure, "request");
+                        final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
 
                         try {
 
-//                            newGraph.state.barrier.inc();
-
                             request.perform(newGraph, wrapper);
 
-//                            newGraph.waitAsync(request);
-
                         } catch (Throwable t) {
 
-                            wrapper.exception(newGraph, t);
-//                            newGraph.waitAsync(request);
-
+                            t.printStackTrace();
 
                         }
 
@@ -1661,7 +1729,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         });
 
     }
-
+    
     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
 
         assert (request != null);
@@ -1671,7 +1739,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int sync = notify != null ? thread : -1;
 
-        requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
+        requestManager.scheduleRead(new SessionRead(null, notify) {
 
             @Override
             public void run(int thread) {
@@ -1725,7 +1793,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+        requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
             public void run(int thread) {
@@ -1740,25 +1808,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                     if (listener != null) {
 
-                        newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
-
-                            @Override
-                            public void exception(Throwable t) {
-                                procedure.exception(t);
-                                if(throwable != null) {
-                                    throwable.set(t);
-                                }
-                            }
-
-                            @Override
-                            public void execute(T t) {
-                                if(result != null) result.set(t);
-                                procedure.execute(t);
-                            }
-
-                        }, listener);
-
-//                        newGraph.waitAsync(request);
+                        try {
+                            QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
+                        } catch (DatabaseException e) {
+                            Logger.defaultLogError(e);
+                        }
 
                     } else {
 
@@ -3048,21 +3102,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         final ArrayList<T> result = new ArrayList<T>();
         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
 
-        syncRequest(request, new AsyncMultiProcedure<T>() {
+        syncRequest(request, new SyncMultiProcedure<T>() {
 
             @Override
-            public void execute(AsyncReadGraph graph, T t) {
+            public void execute(ReadGraph graph, T t) {
                 synchronized(result) {
                     result.add(t);
                 }
             }
 
             @Override
-            public void finished(AsyncReadGraph graph) {
+            public void finished(ReadGraph graph) {
             }
 
             @Override
-            public void exception(AsyncReadGraph graph, Throwable t) {
+            public void exception(ReadGraph graph, Throwable t) {
                 exception.set(t);
             }
 
@@ -3080,33 +3134,27 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
     }
 
     @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
+    public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
+        throw new Error("Not implemented!");
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+        return syncRequest(request, (SyncMultiProcedure<T>)procedure);
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
-    }
-
-    @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
-        assertNotSession();
-        return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+        return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+        return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
     }
 
     @Override
@@ -3262,38 +3310,33 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert(request != null);
 
-        asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
+        asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
             @Override
-            public void exception(AsyncReadGraph graph, Throwable t) {
+            public void exception(ReadGraph graph, Throwable t) {
                 t.printStackTrace();
             }
         });
 
     }
 
-    @Override
-    public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
-        asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
-    }
-
     @Override
     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
-        asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+        asyncRequest(request, (SyncMultiProcedure<T>)procedure);
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
-        asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
+        asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
-        asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+       scheduleRequest(request, procedure, null);
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
-        asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+        asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
     }
 
     @Override
@@ -3335,17 +3378,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
     }
 
-    @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
-        assertNotSession();
-        throw new Error("Not implemented!");
-    }
-
-    @Override
-    public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
-        throw new Error("Not implemented!");
-    }
-
     @Override
     final public <T> void asyncRequest(final ExternalRead<T> request) {