]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Multiple readers in db client
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
index d32c066bcdfc9bee732cd6ecf3829b5b0eb7a9a1..030d512eb736e2f6303f0d68c1d0b0e3d2e57b97 100644 (file)
@@ -635,7 +635,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
     @Override
     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
 
-        final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
+        final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
 
         assert (request != null);
 
@@ -682,7 +682,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                 delayedWriteState = null;
 
-                ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
+                ITask task2 = ThreadLogger.task("DelayedWriteCommit");
                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                 flushCounter = 0;
@@ -1496,8 +1496,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         assert (request != null);
         assert (procedure != null);
 
-        //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
         requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
@@ -1505,6 +1503,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
+                ITask task = ThreadLogger.task(request);
+
                 ListenerBase listener = getListenerBase(procedure);
 
                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
@@ -1549,8 +1549,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                         try {
 
-//                            newGraph.state.barrier.inc();
-
                             T t = request.perform(newGraph);
 
                             try {
@@ -1594,9 +1592,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                             }
 
                         }
-
-//                        newGraph.state.barrier.dec();
-//                        newGraph.waitAsync(request);
+                        
+                        task.finish();
 
                     }
 
@@ -1626,6 +1623,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
+                ITask task = ThreadLogger.task(request);
+
                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
 
                 try {
@@ -1634,22 +1633,30 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                         try {
                                QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
-                               //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
-                                                       //newGraph.processor.query(newGraph, request, null, procedure, listener);
                                                } catch (DatabaseException e) {
                                                        Logger.defaultLogError(e);
                                                }
 
                     } else {
 
-//                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
-//                                procedure, "request");
-
-                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
+                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
+                               
+                               public void execute(AsyncReadGraph graph_, T result) {
+                                       task.finish();
+                                       super.execute(graph_, result);
+                               }
+                               
+                               public void exception(AsyncReadGraph graph_, Throwable t) {
+                                       task.finish();
+                                       super.exception(graph_, t);
+                               }
+                               
+                       };
 
                        try {
 
                             request.perform(newGraph, wrap);
+                            wrap.dec();
                             wrap.get();
 
                         } catch (DatabaseException e) {
@@ -3542,8 +3549,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         while(dirtyPrimitives) {
             dirtyPrimitives = false;
-            getQueryProvider2().performDirtyUpdates(writer);
-            getQueryProvider2().performScheduledUpdates(writer);
+            getQueryProvider2().propagateChangesInQueryCache(writer);
+            getQueryProvider2().listening.fireListeners(writer);
         }
 
         fireMetadataListeners(writer, clientChanges);