]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / AsyncReadEntry.java
index 72582ee605802bfe32fac38d10119f6200602586..2fdb43d113332e4f33b6b131685fd8c80c112b22 100644 (file)
  *******************************************************************************/
 package org.simantics.db.impl.query;
 
+import java.util.Collection;
+
 import org.simantics.databoard.Bindings;
 import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.DevelopmentKeys;
 import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.BlockingAsyncProcedure;
-import org.simantics.db.impl.graph.AsyncBarrierImpl;
-import org.simantics.db.impl.graph.BarrierTracing;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 import org.simantics.db.procedure.AsyncProcedure;
@@ -26,11 +26,12 @@ import org.simantics.utils.Development;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements AsyncProcedure<T> {
+final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements AsyncProcedure<T>, IPending {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AsyncReadEntry.class);
 
     protected AsyncRead<T> id;
+    protected PendingTaskSupport pendingTaskSupport;
 
     AsyncReadEntry(AsyncRead<T> request) {
         this.id = request;
@@ -77,7 +78,7 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
 
                 try {
 
-                    BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(graph.asyncBarrier, graph, new AsyncProcedure<T>() {
+                    BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure(graph, AsyncReadEntry.this, new AsyncProcedure<T>() {
 
                         @Override
                         public void execute(AsyncReadGraph graph, T result) {
@@ -90,11 +91,9 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
                             except(t);
                         }
 
-                    }, id);
-
-                    id.perform(graph, proc);
-
-                    proc.get();
+                    }, id, true);
+                    
+                    proc.performSync(id);
 
                 } catch (Throwable t) {
                     except(t);
@@ -152,91 +151,17 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
 
     }
 
-    public static <T> T computeForEach(ReadGraphImpl graph, AsyncRead<T> request, AsyncReadEntry<T> entry,
+    public static <T> T computeForEach(ReadGraphImpl callerGraph, AsyncRead<T> request, AsyncReadEntry<T> entry,
             AsyncProcedure<T> procedure_, boolean needsToBlock) throws DatabaseException {
 
-        AsyncProcedure<T> procedure = entry != null ? entry : procedure_;
-
-        ReadGraphImpl queryGraph = graph.withParent(entry);
-        queryGraph.asyncBarrier.inc();
-        
-        BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph.asyncBarrier, graph, null, request);
-        
-        class AsyncTask extends SessionTask {
-
-            int counter = 0;
-            T result;
-            DatabaseException exception;
-            
-            public AsyncTask(ReadGraphImpl graph) {
-                super(graph);
-            }
-
-            @Override
-            public void run0(int thread) {
-                if(needsToBlock) proc.waitBarrier();
-                if(proc.isDone()) {
-                    ReadGraphImpl executeGraph = graph.withParent(graph.parent);
-                    executeGraph.asyncBarrier.inc();
-                    try {
-                        result = (T)proc.get();
-                        if(procedure != null) {
-                            procedure.execute(executeGraph, result);
-                        }
-                    } catch (DatabaseException e) {
-                        if(procedure != null) procedure.exception(executeGraph, e);
-                        exception = e;
-                    } catch (Throwable t) {
-                        DatabaseException dbe = new DatabaseException(t);
-                        if(procedure != null) procedure.exception(executeGraph, dbe);
-                        exception = dbe;
-                    } finally {
-                        if (entry != null) {
-                            // This does not throw
-                            entry.performFromCache(executeGraph, procedure_);
-                        }
-                        executeGraph.asyncBarrier.dec();
-                        executeGraph.asyncBarrier.waitBarrier(procedure, executeGraph);
-                    }
-                } else {
-                    if(counter++ > 10000) {
-                        if(BarrierTracing.BOOKKEEPING) {
-                            AsyncBarrierImpl.printReverse(queryGraph.asyncBarrier, 2);
-                            AsyncBarrierImpl caller = queryGraph.asyncBarrier.caller;
-                            while(caller != null) {
-                                System.err.println("called by " + AsyncBarrierImpl.report(caller));
-                                caller = caller.caller;
-                            }
-                            for(AsyncBarrierImpl ab : BarrierTracing.debuggerMap.keySet()) {
-                                AsyncBarrierImpl.printReverse(ab, 2);
-                            }
-                        }
-                        throw new IllegalStateException("Eternal loop in queries.");
-                    }
-                    graph.processor.schedule(new AsyncTask(graph));
-                }
-            }
-            
-        }
-
-        try {
-            request.perform(queryGraph, proc);
-        } finally {
-            queryGraph.asyncBarrier.dec();
-        }
-
-        AsyncTask task = new AsyncTask(graph);
-
-        if(needsToBlock) task.run(0);
-        else if (proc.isDone()) task.run(0);
-        else  {
-            graph.processor.schedule(task);
+        BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure(callerGraph, entry, procedure_, request, needsToBlock);
+        if(needsToBlock) {
+            return proc.performSync(request);
+        } else {
+            proc.performAsync(request);
             return null;
         }
 
-        if(task.exception != null) throw task.exception;
-        else return task.result;
-
     }
 
     @Override
@@ -251,13 +176,46 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
 
     @Override
     public void execute(AsyncReadGraph graph, T result) {
-        setResult(result);
-        setReady();
+        Collection<SessionTask> tasks = null;
+        synchronized(this) {
+            setResult(result);
+            setReady();
+            if(pendingTaskSupport != null)
+                tasks = pendingTaskSupport.executePending();
+        }
+        if(tasks != null)
+            for(SessionTask task : tasks)
+                ((ReadGraphImpl)graph).processor.scheduleNow(task);
+    }
+
+    @Override
+    public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
+        Collection<SessionTask> tasks = null;
+        synchronized(this) {
+            except(throwable);
+            if(pendingTaskSupport != null)
+                tasks = pendingTaskSupport.executePending();
+        }
+        if(tasks != null)
+            for(SessionTask task : tasks)
+                ((ReadGraphImpl)graph).processor.scheduleNow(task);
+    }
+
+    public void executeWhenResultIsAvailable(QueryProcessor processor, SessionTask task) {
+        boolean ready = false;
+        synchronized(this) {
+            if(pendingTaskSupport == null)
+                pendingTaskSupport = new PendingTaskSupport(this);
+            ready = pendingTaskSupport.executeWhenResultIsAvailable(task);
+        }
+        if(ready) {
+            processor.scheduleNow(task);
+        }
     }
 
     @Override
-    public void exception(AsyncReadGraph graph, Throwable throwable) {
-        except(throwable);
+    public String classId() {
+        return null;
     }
 
 }