]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncReadEntry.java
Attempt to fix regressions in new code base
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / AsyncReadEntry.java
index 00abbebf65859e76d0e8dad481d01097cab04621..e6d42750d37b9caf62196bd50c043b802093d7e6 100644 (file)
@@ -16,7 +16,9 @@ import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.exception.RuntimeDatabaseException;
 import org.simantics.db.impl.BlockingAsyncProcedure;
 import org.simantics.db.impl.DebugPolicy;
+import org.simantics.db.impl.graph.AsyncBarrierImpl;
 import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 import org.simantics.db.procedure.AsyncProcedure;
 import org.simantics.db.request.AsyncRead;
 import org.slf4j.Logger;
@@ -133,7 +135,8 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
         } else {
 
             try {
-                proc.execute(graph, (T) getResult());
+                T result = (T) getResult();
+                proc.execute(graph, result);
             } catch (Throwable t) {
                 LOGGER.error("performFromCache proc.execute failed", t);
             }
@@ -144,46 +147,62 @@ final public class AsyncReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> i
 
     }
 
-    public static <T> void computeForEach(ReadGraphImpl parentGraph, AsyncRead<T> request, AsyncReadEntry<T> entry,
-            AsyncProcedure<T> procedure_) throws DatabaseException {
+    public static <T> T computeForEach(ReadGraphImpl graph, AsyncRead<T> request, AsyncReadEntry<T> entry,
+            AsyncProcedure<T> procedure_, boolean needsToBlock) throws DatabaseException {
 
         AsyncProcedure<T> procedure = entry != null ? entry : procedure_;
 
-        ReadGraphImpl queryGraph = parentGraph.withParent(entry);
+        ReadGraphImpl queryGraph = graph.withParent(entry);
         
-        BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph, new AsyncProcedure<T>() {
+        BlockingAsyncProcedure<T> proc = new BlockingAsyncProcedure<>(queryGraph, null, request);
+        
+        class AsyncTask extends SessionTask {
 
-            @Override
-            public void execute(AsyncReadGraph returnGraph, T result) {
-                try {
-                    procedure.execute(parentGraph, result);
-                } catch (Throwable t) {
-                    LOGGER.error("computeForEach procedure.execute failed", t);
-                }
+            T result;
+            DatabaseException exception;
+            
+            public AsyncTask(ReadGraphImpl graph) {
+                super(graph);
             }
 
             @Override
-            public void exception(AsyncReadGraph returnGraph, Throwable t) {
-                try {
-                    procedure.exception(parentGraph, t);
-                } catch (Throwable t2) {
-                    LOGGER.error("computeForEach procedure.exception failed", t2);
+            public void run(int thread) {
+                if(needsToBlock) proc.waitBarrier();
+                if(proc.isDone()) {
+                    try {
+                        result = (T)proc.get();
+                        if(procedure != null) procedure.execute(graph, result);
+                    } catch (DatabaseException e) {
+                        if(procedure != null) procedure.exception(graph, e);
+                        exception = e;
+                    } catch (Throwable t) {
+                        DatabaseException dbe = new DatabaseException(t);
+                        if(procedure != null) procedure.exception(graph, dbe);
+                        exception = dbe;
+                    } finally {
+                        if (entry != null)
+                            entry.performFromCache(queryGraph, procedure_);
+                    }
+                } else {
+                    graph.processor.schedule(this);            
                 }
             }
-
-            @Override
-            public String toString() {
-                return procedure.toString();
-            }
-
-        }, request);
-
+            
+        }
+        
         request.perform(queryGraph, proc);
         
-        proc.get();
+        AsyncTask task = new AsyncTask(graph);
 
-        if (entry != null)
-            entry.performFromCache(parentGraph, procedure_);
+        if(needsToBlock) task.run(0);
+        else if (proc.isDone()) task.run(0);
+        else  {
+            graph.processor.schedule(task);
+            return null;
+        }
+        
+        if(task.exception != null) throw task.exception;
+        else return task.result;
 
     }