]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
AsyncBarrier.dec runs into refcounting problem
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
index b98b4dec643383d163cd05746427db59b6c523ea..f8fd7d17ddab55e7274388ff259155918227486f 100644 (file)
@@ -461,7 +461,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     });
 
                     assert (null != writer);
-//                    writer.state.barrier.inc();
+                    writer.asyncBarrier.inc();
 
                     try {
                         request.perform(writer);
@@ -471,7 +471,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
                         writeState.except(t);
                     } finally {
-//                        writer.state.barrier.dec();
+                        writer.asyncBarrier.dec();
 //                        writer.waitAsync(request);
                     }
 
@@ -571,52 +571,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 VirtualGraph vg = getProvider(request.getProvider());
                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
 
-                try {
-                    WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
-                    writeState = writeStateT;
+                WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
+                writeState = writeStateT;
+                assert (null != writer);
 
-                    assert (null != writer);
-//                    writer.state.barrier.inc();
+                try {
+                    writer.asyncBarrier.inc();
                     writeStateT.setResult(request.perform(writer));
                     assert (null != writer);
-
-//                    writer.state.barrier.dec();
-//                    writer.waitAsync(null);
-
                 } catch (Throwable e) {
-
-//                    writer.state.barrier.dec();
-//                    writer.waitAsync(null);
-
                     writeState.except(e);
-
-//                  state.stopWriteTransaction(clusterStream);
-//
-//              } catch (Throwable e) {
-//                  // Log it first, just to be safe that the error is always logged.
-//                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
-//
-//                  try {
-//                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
-//                      // All we can do here is to log those, can't really pass them anywhere.
-//                      if (procedure != null) {
-//                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
-//                          else procedure.exception(new DatabaseException(e));
-//                      }
-//                  } catch (Throwable e2) {
-//                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
-//                  }
-//
-//                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
-//
-//                  state.stopWriteTransaction(clusterStream);
-
                 } finally {
+                    writer.asyncBarrier.dec();
                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
                 }
 
-//              if(notify != null) notify.release();
-
                 task.finish();
 
             }
@@ -635,7 +604,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
     @Override
     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
 
-        final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
+        final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
 
         assert (request != null);
 
@@ -682,7 +651,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                 delayedWriteState = null;
 
-                ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
+                ITask task2 = ThreadLogger.task("DelayedWriteCommit");
                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                 flushCounter = 0;
@@ -1496,8 +1465,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         assert (request != null);
         assert (procedure != null);
 
-        //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
         requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
@@ -1505,6 +1472,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
+                ITask task = ThreadLogger.task(request);
+
                 ListenerBase listener = getListenerBase(procedure);
 
                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
@@ -1549,8 +1518,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                         try {
 
-//                            newGraph.state.barrier.inc();
-
                             T t = request.perform(newGraph);
 
                             try {
@@ -1594,9 +1561,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                             }
 
                         }
-
-//                        newGraph.state.barrier.dec();
-//                        newGraph.waitAsync(request);
+                        
+                        task.finish();
 
                     }
 
@@ -1626,42 +1592,49 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
+                ITask task = ThreadLogger.task(request);
+
                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+                newGraph.asyncBarrier.inc();
 
                 try {
 
                     if (listener != null) {
 
                         try {
-                               QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
-                               //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
-                                                       //newGraph.processor.query(newGraph, request, null, procedure, listener);
-                                               } catch (DatabaseException e) {
-                                                       Logger.defaultLogError(e);
-                                               }
+                            QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
+                        } catch (DatabaseException e) {
+                            LOGGER.error("Unhandled query exception", e);
+                        }
 
                     } else {
 
-//                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
-//                                procedure, "request");
+                        BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
 
-                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
+                            public void execute(AsyncReadGraph graph_, T result) {
+                                task.finish();
+                                super.execute(graph_, result);
+                            }
 
-                       try {
+                            public void exception(AsyncReadGraph graph_, Throwable t) {
+                                task.finish();
+                                super.exception(graph_, t);
+                            }
 
-                            request.perform(newGraph, wrap);
-                            wrap.get();
+                        };
 
+                        try {
+                            wrap.performSync(request);
                         } catch (DatabaseException e) {
-
-                                                       Logger.defaultLogError(e);
-
+                            LOGGER.error("Unhandled query exception", e);
                         }
 
                     }
 
                 } finally {
 
+                    newGraph.asyncBarrier.dec();
+
                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
                 }
@@ -2478,6 +2451,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 return;
 
             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
+            reactionGraph.asyncBarrier.inc();
 
             try {
 
@@ -2492,6 +2466,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
             } finally {
 
+                reactionGraph.asyncBarrier.dec();
+
             }
 
         } catch (Throwable t) {