]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java
Work in progress
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryCacheBase.java
index f75cd9dcd80b790727383a5d078f84f9c374c994..a75d6901079fa6d4bab47e41fe24c8524825c5fe 100644 (file)
@@ -2,6 +2,7 @@ package org.simantics.db.impl.query;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.simantics.db.AsyncReadGraph;
@@ -12,10 +13,12 @@ import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.DebugPolicy;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.impl.procedure.InternalProcedure;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
 import org.simantics.db.procedure.AsyncMultiProcedure;
 import org.simantics.db.procedure.AsyncProcedure;
 import org.simantics.db.procedure.Listener;
 import org.simantics.db.procedure.ListenerBase;
+import org.simantics.db.procedure.Procedure;
 import org.simantics.db.request.AsyncMultiRead;
 import org.simantics.db.request.AsyncRead;
 import org.simantics.db.request.ExternalRead;
@@ -200,9 +203,9 @@ public class QueryCacheBase {
                                        
                                        // Just for safety
                                        if(entry.isDiscarded()) return;
-                                       if(entry.isExcepted()) entry.setPending();
                                        
                                        if(used.compareAndSet(false, true)) {
+                           //entry.setPending();
                                                entry.addOrSet(parentGraph.processor, result);
                                                procedure.execute(parentGraph, result);
                                        } else {
@@ -632,31 +635,38 @@ public class QueryCacheBase {
                }
        }
        
-       public static void waitPending(CacheEntry entry) throws DatabaseException {
+       public static void waitPending(ReadGraphImpl graph, CacheEntry entry) throws DatabaseException {
+               
+               QueryProcessor processor = graph.processor;
                
                int counter = 0;
                while(entry.isPending()) {
                        try {
-                               Thread.sleep(1);
-                               counter++;
-                               if(counter > 1000) {
-                                       CacheEntryBase base = ((CacheEntryBase)entry);
-//                                     if(base.created != null) {
-//                                             System.err.println("created:");
-//                                             base.created.printStackTrace();
-//                                     }
-//                                     if(base.performed != null) {
-//                                             System.err.println("performed:");
-//                                             base.performed.printStackTrace();
-//                                     }
-//                                     if(base.ready != null) {
-//                                             System.err.println("ready:");
-//                                             base.ready.printStackTrace();
-//                                     }
-                                       new Exception("Timeout waiting for request to complete: " + entry.getOriginalRequest().toString()).printStackTrace();
-                                       throw new DatabaseException("Timeout waiting for request to complete.");
-                                       //System.err.println("asd");
-                                       //base.getQuery().recompute(null, null, entry);
+                               SessionTask task = processor.getOwnTask(processor.thread.get());
+                               if(task != null) {
+                                       task.run(processor.thread.get());
+                               } else {
+                                       Thread.sleep(1);
+                                       counter++;
+                                       if(counter > 5000) {
+                                               CacheEntryBase base = ((CacheEntryBase)entry);
+//                                             if(base.created != null) {
+//                                                     System.err.println("created:");
+//                                                     base.created.printStackTrace();
+//                                             }
+//                                             if(base.performed != null) {
+//                                                     System.err.println("performed:");
+//                                                     base.performed.printStackTrace();
+//                                             }
+//                                             if(base.ready != null) {
+//                                                     System.err.println("ready:");
+//                                                     base.ready.printStackTrace();
+//                                             }
+                                               new Exception("Timeout waiting for request to complete: " + entry.getOriginalRequest().toString()).printStackTrace();
+                                               throw new DatabaseException("Timeout waiting for request to complete.");
+                                               //System.err.println("asd");
+                                               //base.getQuery().recompute(null, null, entry);
+                                       }
                                }
                        } catch (InterruptedException e) {
                        }
@@ -917,6 +927,7 @@ public class QueryCacheBase {
        private AsyncProcedure<T> procedure;
        private T result = null;
        private Throwable throwable = null;
+       private Semaphore s = new Semaphore(0);
        
        AsyncProcedureWrapper(AsyncProcedure<T> procedure) {
                this.procedure = procedure;
@@ -926,15 +937,22 @@ public class QueryCacheBase {
                public void execute(AsyncReadGraph graph, T result) {
                        if(procedure != null) procedure.execute(graph, result);
                        this.result = result;
+                       s.release();
                }
 
                @Override
                public void exception(AsyncReadGraph graph, Throwable throwable) {
                        if(procedure != null) procedure.exception(graph, throwable);
                        this.throwable = throwable;
+                       s.release();
                }
                
                public T get() throws DatabaseException {
+                       try {
+                               s.acquire();
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                       }
                        if(throwable != null) {
                                if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
                                else throw new DatabaseException(throwable);
@@ -945,6 +963,39 @@ public class QueryCacheBase {
                
     }
 
+    static class ExternalProcedureWrapper<T> implements AsyncProcedure<T> {
+        
+        private Procedure<T> procedure;
+        private T result = null;
+        private Throwable throwable = null;
+        
+        ExternalProcedureWrapper(Procedure<T> procedure) {
+            this.procedure = procedure;
+        }
+
+        @Override
+        public void execute(AsyncReadGraph graph, T result) {
+            if(procedure != null) procedure.execute(result);
+            this.result = result;
+        }
+
+        @Override
+        public void exception(AsyncReadGraph graph, Throwable throwable) {
+            if(procedure != null) procedure.exception(throwable);
+            this.throwable = throwable;
+        }
+        
+        public T get() throws DatabaseException {
+            if(throwable != null) {
+                if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
+                else throw new DatabaseException(throwable);
+            } else {
+                return result;
+            }
+        }
+        
+    }
+
     
     static class InternalProcedureWrapper<T> implements InternalProcedure<T> {
        
@@ -1058,9 +1109,21 @@ public class QueryCacheBase {
 
     }
 
+    public static <T> T resultExternalReadEntry(ReadGraphImpl graph, ExternalRead r, CacheEntry parent, ListenerBase listener, Procedure<T> procedure) throws DatabaseException {
+        ExternalProcedureWrapper<T> wrap = new ExternalProcedureWrapper<>(procedure);
+        QueryCache.runnerExternalReadEntry(graph, r, parent, listener, wrap);
+        return wrap.get();
+    }
+
     public static <T> T resultReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, AsyncProcedure<T> procedure) throws DatabaseException {
        AsyncProcedureWrapper<T> wrap = new AsyncProcedureWrapper<>(procedure);
-        QueryCache.runnerReadEntry(graph, r, parent, listener, wrap);
+        QueryCache.runnerReadEntry(graph, r, parent, listener, wrap, true);
+        return wrap.get();
+    }
+
+    public static <T> T resultAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, AsyncProcedure<T> procedure) throws DatabaseException {
+       AsyncProcedureWrapper<T> wrap = new AsyncProcedureWrapper<>(procedure);
+        QueryCache.runnerAsyncReadEntry(graph, r, parent, listener, wrap, true);
         return wrap.get();
     }