]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java
Merge branch 'private/db-threads' into private/threads-testing
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryCacheBase.java
index d6924c708a9738fa1cd517fc686f1cb461369cc7..a75d6901079fa6d4bab47e41fe24c8524825c5fe 100644 (file)
@@ -2,6 +2,7 @@ package org.simantics.db.impl.query;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.simantics.db.AsyncReadGraph;
@@ -12,6 +13,7 @@ import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.DebugPolicy;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.impl.procedure.InternalProcedure;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 import org.simantics.db.procedure.AsyncMultiProcedure;
 import org.simantics.db.procedure.AsyncProcedure;
 import org.simantics.db.procedure.Listener;
@@ -633,31 +635,38 @@ public class QueryCacheBase {
                }
        }
        
-       public static void waitPending(CacheEntry entry) throws DatabaseException {
+       public static void waitPending(ReadGraphImpl graph, CacheEntry entry) throws DatabaseException {
+               
+               QueryProcessor processor = graph.processor;
                
                int counter = 0;
                while(entry.isPending()) {
                        try {
-                               Thread.sleep(1);
-                               counter++;
-                               if(counter > 1000) {
-                                       CacheEntryBase base = ((CacheEntryBase)entry);
-//                                     if(base.created != null) {
-//                                             System.err.println("created:");
-//                                             base.created.printStackTrace();
-//                                     }
-//                                     if(base.performed != null) {
-//                                             System.err.println("performed:");
-//                                             base.performed.printStackTrace();
-//                                     }
-//                                     if(base.ready != null) {
-//                                             System.err.println("ready:");
-//                                             base.ready.printStackTrace();
-//                                     }
-                                       new Exception("Timeout waiting for request to complete: " + entry.getOriginalRequest().toString()).printStackTrace();
-                                       throw new DatabaseException("Timeout waiting for request to complete.");
-                                       //System.err.println("asd");
-                                       //base.getQuery().recompute(null, null, entry);
+                               SessionTask task = processor.getOwnTask(processor.thread.get());
+                               if(task != null) {
+                                       task.run(processor.thread.get());
+                               } else {
+                                       Thread.sleep(1);
+                                       counter++;
+                                       if(counter > 5000) {
+                                               CacheEntryBase base = ((CacheEntryBase)entry);
+//                                             if(base.created != null) {
+//                                                     System.err.println("created:");
+//                                                     base.created.printStackTrace();
+//                                             }
+//                                             if(base.performed != null) {
+//                                                     System.err.println("performed:");
+//                                                     base.performed.printStackTrace();
+//                                             }
+//                                             if(base.ready != null) {
+//                                                     System.err.println("ready:");
+//                                                     base.ready.printStackTrace();
+//                                             }
+                                               new Exception("Timeout waiting for request to complete: " + entry.getOriginalRequest().toString()).printStackTrace();
+                                               throw new DatabaseException("Timeout waiting for request to complete.");
+                                               //System.err.println("asd");
+                                               //base.getQuery().recompute(null, null, entry);
+                                       }
                                }
                        } catch (InterruptedException e) {
                        }
@@ -918,6 +927,7 @@ public class QueryCacheBase {
        private AsyncProcedure<T> procedure;
        private T result = null;
        private Throwable throwable = null;
+       private Semaphore s = new Semaphore(0);
        
        AsyncProcedureWrapper(AsyncProcedure<T> procedure) {
                this.procedure = procedure;
@@ -927,15 +937,22 @@ public class QueryCacheBase {
                public void execute(AsyncReadGraph graph, T result) {
                        if(procedure != null) procedure.execute(graph, result);
                        this.result = result;
+                       s.release();
                }
 
                @Override
                public void exception(AsyncReadGraph graph, Throwable throwable) {
                        if(procedure != null) procedure.exception(graph, throwable);
                        this.throwable = throwable;
+                       s.release();
                }
                
                public T get() throws DatabaseException {
+                       try {
+                               s.acquire();
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                       }
                        if(throwable != null) {
                                if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
                                else throw new DatabaseException(throwable);
@@ -1100,7 +1117,13 @@ public class QueryCacheBase {
 
     public static <T> T resultReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, AsyncProcedure<T> procedure) throws DatabaseException {
        AsyncProcedureWrapper<T> wrap = new AsyncProcedureWrapper<>(procedure);
-        QueryCache.runnerReadEntry(graph, r, parent, listener, wrap);
+        QueryCache.runnerReadEntry(graph, r, parent, listener, wrap, true);
+        return wrap.get();
+    }
+
+    public static <T> T resultAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, AsyncProcedure<T> procedure) throws DatabaseException {
+       AsyncProcedureWrapper<T> wrap = new AsyncProcedureWrapper<>(procedure);
+        QueryCache.runnerAsyncReadEntry(graph, r, parent, listener, wrap, true);
         return wrap.get();
     }