]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
index 8a634d1cfb05504a7ffa4d86db2ed12c308aa2e1..29c70445bd208dd190c9796b60f2e31f46408dca 100644 (file)
@@ -143,6 +143,7 @@ import org.simantics.db.service.ClusteringSupport;
 import org.simantics.db.service.CollectionSupport;
 import org.simantics.db.service.DebugSupport;
 import org.simantics.db.service.DirectQuerySupport;
+import org.simantics.db.service.EventSupport;
 import org.simantics.db.service.GraphChangeListenerSupport;
 import org.simantics.db.service.InitSupport;
 import org.simantics.db.service.LifecycleSupport;
@@ -166,6 +167,7 @@ import org.simantics.utils.DataContainer;
 import org.simantics.utils.Development;
 import org.simantics.utils.threads.logger.ITask;
 import org.simantics.utils.threads.logger.ThreadLogger;
+import org.slf4j.LoggerFactory;
 
 import gnu.trove.procedure.TLongProcedure;
 import gnu.trove.set.hash.TLongHashSet;
@@ -173,6 +175,8 @@ import gnu.trove.set.hash.TLongHashSet;
 
 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
 
+    private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
+
     protected static final boolean DEBUG        = false;
 
     private static final boolean DIAGNOSTICS       = false;
@@ -289,6 +293,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
+        serviceLocator.registerService(EventSupport.class, new EventSupportImpl());
         ServiceActivityUpdaterForWriteTransactions.register(this);
 
         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
@@ -323,7 +328,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         try {
             getClusterTable().refresh(csid, this, clusterUID);
         } catch (Throwable t) {
-            Logger.defaultLogError("Refesh failed.", t);
+            LOGGER.error("refresh({}, {}) failed", thread, csid, t);
         }
     }
 
@@ -417,7 +422,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 if(Development.DEVELOPMENT) {
                     try {
@@ -456,17 +461,17 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     });
 
                     assert (null != writer);
-//                    writer.state.barrier.inc();
+                    writer.asyncBarrier.inc();
 
                     try {
                         request.perform(writer);
                         assert (null != writer);
                     } catch (Throwable t) {
                         if (!(t instanceof CancelTransactionException))
-                            Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
+                            LOGGER.error("Write transaction caused an unexpected error, see exception.", t);
                         writeState.except(t);
                     } finally {
-//                        writer.state.barrier.dec();
+                        writer.asyncBarrier.dec();
 //                        writer.waitAsync(request);
                     }
 
@@ -477,7 +482,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                     // Log it first, just to be safe that the error is always logged.
                     if (!(e instanceof CancelTransactionException))
-                    Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
+                        LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
 
 //                    writeState.getGraph().state.barrier.dec();
 //                    writeState.getGraph().waitAsync(request);
@@ -553,7 +558,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
 
@@ -566,52 +571,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 VirtualGraph vg = getProvider(request.getProvider());
                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
 
-                try {
-                    WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
-                    writeState = writeStateT;
+                WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
+                writeState = writeStateT;
+                assert (null != writer);
 
-                    assert (null != writer);
-//                    writer.state.barrier.inc();
+                try {
+                    writer.asyncBarrier.inc();
                     writeStateT.setResult(request.perform(writer));
                     assert (null != writer);
-
-//                    writer.state.barrier.dec();
-//                    writer.waitAsync(null);
-
                 } catch (Throwable e) {
-
-//                    writer.state.barrier.dec();
-//                    writer.waitAsync(null);
-
                     writeState.except(e);
-
-//                  state.stopWriteTransaction(clusterStream);
-//
-//              } catch (Throwable e) {
-//                  // Log it first, just to be safe that the error is always logged.
-//                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
-//
-//                  try {
-//                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
-//                      // All we can do here is to log those, can't really pass them anywhere.
-//                      if (procedure != null) {
-//                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
-//                          else procedure.exception(new DatabaseException(e));
-//                      }
-//                  } catch (Throwable e2) {
-//                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
-//                  }
-//
-//                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
-//
-//                  state.stopWriteTransaction(clusterStream);
-
                 } finally {
+                    writer.asyncBarrier.dec();
                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
                 }
 
-//              if(notify != null) notify.release();
-
                 task.finish();
 
             }
@@ -630,14 +604,14 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
     @Override
     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
 
-        final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
+        final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
 
         assert (request != null);
 
         requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
                 Procedure<Object> stateProcedure = new Procedure<Object>() {
@@ -652,7 +626,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
                             else callback.accept(new DatabaseException(t));
                         } else
-                            Logger.defaultLogError("Unhandled exception", t);
+                            LOGGER.error("Unhandled exception", t);
                     }
                 };
 
@@ -677,7 +651,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                 delayedWriteState = null;
 
-                ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
+                ITask task2 = ThreadLogger.task("DelayedWriteCommit");
                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                 flushCounter = 0;
@@ -1350,7 +1324,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 callback.exception(new DatabaseException(e));
 
             state.stopWriteTransaction(clusterStream);
-            Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
+            LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
 
         } finally  {
 
@@ -1374,7 +1348,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
 
@@ -1472,7 +1446,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
 
@@ -1491,19 +1465,22 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         assert (request != null);
         assert (procedure != null);
 
-        //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
         requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
+                ITask task = ThreadLogger.task(request);
+
                 ListenerBase listener = getListenerBase(procedure);
 
                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
 
+                // This is never synced but increase to prevent it from visiting 0
+                newGraph.asyncBarrier.inc();
+
                 try {
 
                     if (listener != null) {
@@ -1541,8 +1518,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                         try {
 
-//                            newGraph.state.barrier.inc();
-
                             T t = request.perform(newGraph);
 
                             try {
@@ -1555,7 +1530,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                                 if(throwable != null) {
                                     throwable.set(th);
                                 } else {
-                                    Logger.defaultLogError("Unhandled exception", th);
+                                    LOGGER.error("Unhandled exception", th);
                                 }
 
                             }
@@ -1568,7 +1543,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                             if(throwable != null) {
                                 throwable.set(t);
                             } else {
-                                Logger.defaultLogError("Unhandled exception", t);
+                                LOGGER.error("Unhandled exception", t);
                             }
 
                             try {
@@ -1580,15 +1555,14 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                                 if(throwable != null) {
                                     throwable.set(t2);
                                 } else {
-                                    Logger.defaultLogError("Unhandled exception", t2);
+                                    LOGGER.error("Unhandled exception", t2);
                                 }
 
                             }
 
                         }
-
-//                        newGraph.state.barrier.dec();
-//                        newGraph.waitAsync(request);
+                        
+                        task.finish();
 
                     }
 
@@ -1614,46 +1588,53 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         requestManager.scheduleRead(new SessionRead(null, notify) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
+                ITask task = ThreadLogger.task(request);
+
                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+                newGraph.asyncBarrier.inc();
 
                 try {
 
                     if (listener != null) {
 
                         try {
-                               QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
-                               //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
-                                                       //newGraph.processor.query(newGraph, request, null, procedure, listener);
-                                               } catch (DatabaseException e) {
-                                                       Logger.defaultLogError(e);
-                                               }
+                            QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
+                        } catch (DatabaseException e) {
+                            LOGGER.error("Unhandled query exception", e);
+                        }
 
                     } else {
 
-//                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
-//                                procedure, "request");
+                        BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
 
-                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
+                            public void execute(AsyncReadGraph graph_, T result) {
+                                task.finish();
+                                super.execute(graph_, result);
+                            }
 
-                       try {
+                            public void exception(AsyncReadGraph graph_, Throwable t) {
+                                task.finish();
+                                super.exception(graph_, t);
+                            }
 
-                            request.perform(newGraph, wrap);
-                            wrap.get();
+                        };
 
+                        try {
+                            wrap.performSync(request);
                         } catch (DatabaseException e) {
-
-                                                       Logger.defaultLogError(e);
-
+                            LOGGER.error("Unhandled query exception", e);
                         }
 
                     }
 
                 } finally {
 
+                    newGraph.asyncBarrier.dec();
+
                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
                 }
@@ -1676,7 +1657,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         requestManager.scheduleRead(new SessionRead(null, notify) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
@@ -1732,7 +1713,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         requestManager.scheduleRead(new SessionRead(null, notify) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
@@ -1786,7 +1767,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
@@ -2470,6 +2451,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 return;
 
             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
+            reactionGraph.asyncBarrier.inc();
 
             try {
 
@@ -2478,16 +2460,18 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     try {
                         l.graphChanged(e2);
                     } catch (Throwable ex) {
-                        ex.printStackTrace();
+                        LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
                     }
                 }
 
             } finally {
 
+                reactionGraph.asyncBarrier.dec();
+
             }
 
         } catch (Throwable t) {
-            t.printStackTrace();
+            LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
         }
 
     }
@@ -2527,39 +2511,41 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
      */
     @SuppressWarnings("unchecked")
     @Override
-    public synchronized <T> T peekService(Class<T> api) {
-
-               if(serviceKey1 == api) {
-                       return (T)service1;
-               } else if (serviceKey2 == api) {
-                       // Promote this key
-                       Object result = service2;
-                       service2 = service1;
-                       serviceKey2 = serviceKey1;
-                       service1 = result;
-                       serviceKey1 = api;
-                       return (T)result;
-               }
+    public <T> T peekService(Class<T> api) {
+        if (Layer0.class == api)
+            return (T) L0;
 
-       if (Layer0.class == api)
-               return (T) L0;
-        if (ServerInformation.class == api)
-            return (T) getCachedServerInformation();
-        else if (WriteGraphImpl.class == api)
-            return (T) writeState.getGraph();
-        else if (ClusterBuilder.class == api)
-            return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
-        else if (ClusterBuilderFactory.class == api)
-            return (T)new ClusterBuilderFactoryImpl(this);
+        synchronized (this) {
+            if (serviceKey1 == api) {
+                return (T) service1;
+            }
+            if (serviceKey2 == api) {
+                // Promote this key
+                Object result = service2;
+                service2 = service1;
+                serviceKey2 = serviceKey1;
+                service1 = result;
+                serviceKey1 = api;
+                return (T)result;
+            }
 
-               service2 = service1;
-               serviceKey2 = serviceKey1;
+            if (ServerInformation.class == api)
+                return (T) getCachedServerInformation();
+            else if (WriteGraphImpl.class == api)
+                return (T) writeState.getGraph();
+            else if (ClusterBuilder.class == api)
+                return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
+            else if (ClusterBuilderFactory.class == api)
+                return (T)new ClusterBuilderFactoryImpl(this);
 
-        service1 = serviceLocator.peekService(api);
-        serviceKey1 = api;
+            service2 = service1;
+            serviceKey2 = serviceKey1;
 
-        return (T)service1;
+            service1 = serviceLocator.peekService(api);
+            serviceKey1 = api;
 
+            return (T)service1;
+        }
     }
 
     /*
@@ -2606,9 +2592,9 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
             try {
                 h.valuesChanged(ctx);
             } catch (Exception e) {
-                Logger.defaultLogError("monitor handler notification produced the following exception", e);
+                LOGGER.error("monitor handler notification produced the following exception", e);
             } catch (LinkageError e) {
-                Logger.defaultLogError("monitor handler notification produced a linkage error", e);
+                LOGGER.error("monitor handler notification produced a linkage error", e);
             }
         }
     }
@@ -3179,9 +3165,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         });
 
         Throwable t = exception.get();
-        if(t != null) {
-            if(t instanceof DatabaseException) throw (DatabaseException)t;
-            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
+        if (t != null) {
+            if (t instanceof DatabaseException)
+                throw (DatabaseException) t;
+            else
+                throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
         }
 
         return result;
@@ -3387,28 +3375,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         asyncRequest(request, (Procedure<T>)procedure);
     }
 
-
-
-    void check(Throwable t) throws DatabaseException {
-        if(t != null) {
-            if(t instanceof DatabaseException) throw (DatabaseException)t;
-            else throw new DatabaseException("Unexpected exception", t);
-        }
-    }
-
-    void check(DataContainer<Throwable> container) throws DatabaseException {
-        Throwable t = container.get();
-        if(t != null) {
-            if(t instanceof DatabaseException) throw (DatabaseException)t;
-            else throw new DatabaseException("Unexpected exception", t);
-        }
-    }
-
-
-
-
-
-
     boolean sameProvider(Write request) {
         if(writeState.getGraph().provider != null) {
             return writeState.getGraph().provider.equals(request.getProvider());
@@ -3425,9 +3391,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
 
     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
+
     private void assertNotSession() throws DatabaseException {
         Thread current = Thread.currentThread();
-        if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
+        if (sessionThreads.contains(current))
+            throw new ServiceException("Caller is already inside a transaction.");
     }
 
     void assertAlive() {
@@ -3552,8 +3520,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         while(dirtyPrimitives) {
             dirtyPrimitives = false;
-            getQueryProvider2().performDirtyUpdates(writer);
-            getQueryProvider2().performScheduledUpdates(writer);
+            getQueryProvider2().propagateChangesInQueryCache(writer);
+            getQueryProvider2().listening.fireListeners(writer);
         }
 
         fireMetadataListeners(writer, clientChanges);
@@ -3613,4 +3581,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         state.setCombine(false);
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T l0() {
+        return (T) L0;
+    }
+
 }