]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ExternalReadEntry.java
Fixes based on feedback
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / ExternalReadEntry.java
index d61049744a3cbdbdb09c85040833ffc0e8329d91..661076e8604bacd47504ed86607df88bca54eee8 100644 (file)
@@ -17,14 +17,17 @@ import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.DebugPolicy;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.procedure.Listener;
 import org.simantics.db.request.ExternalRead;
 import org.simantics.db.request.RequestFlags;
 
-final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> {
+final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements Listener<T> {
 
     final LinkedList<T> items = new LinkedList<T>();
 
     protected ExternalRead<T> request;
+    protected ReadGraphImpl graph;
+    protected boolean registered = false;
 
     @Override
     int makeHash() {
@@ -44,6 +47,7 @@ final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>
     public void discard() {
         request.unregistered();
         request = null;
+        graph = null;
         super.discard();
     }
 
@@ -56,49 +60,15 @@ final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>
         result = REQUIRES_COMPUTATION;
     }
     
-    public ExternalReadEntry(ExternalRead<T> request) {
+    public ExternalReadEntry(ExternalRead<T> request, ReadGraphImpl graph) {
         assert request != null;
         this.request = request;
-    }
-    
-    final public void queue(T item) {
-       synchronized(items) {
-               items.addLast(item);
-               // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
-               // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
-       }
-    }
-    
-    final public void addOrSet(QueryProcessor processor, Object item) {
-
-        try {
-        
-            assert(isPending());
-
-            //ArrayList<Procedure<T>> p = null;
-
-            synchronized(this) {
-
-                setResult(item);
-                setReady();
-//                p = procs;
-//                procs = null;
-
-            }
-
-//            if(p != null)
-//                for(Procedure proc : p) {
-//                    proc.execute((T)item);
-//                }
-
-        } catch (Throwable t) {
-            t.printStackTrace();
-        }
-        
+        this.graph = graph;
     }
     
     @Override
     public void except(Throwable t) {
+       
         if(DebugPolicy.QUERY_STATE) System.out.println("[QUERY STATE]: excepted " + this);
         if(statusOrException != DISCARDED) {
             statusOrException = EXCEPTED;
@@ -106,14 +76,23 @@ final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>
         } else {
             result = t;
         }
+        
         assert(isExcepted());
+        
     }
     
     @Override
     public void setResult(Object result) {
+
         super.setResult(result);
         assert(!(result instanceof Throwable));
         assert(!isExcepted());
+        
+    }
+    
+    @Override
+    public void setReady() {
+       super.setReady();
     }
 
     @Override
@@ -126,7 +105,6 @@ final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>
 
                                synchronized(items) {
 
-
                                        // Update
                                        if(!items.isEmpty()) {
                                            setReady();
@@ -192,7 +170,59 @@ final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>
     }
 
     public Object compute(ReadGraphImpl graph, AsyncProcedure<T> procedure) throws DatabaseException {
-        return graph.processor.cache.performQuery(graph, request, this, procedure);
+       
+               try {
+                       
+                       ReadGraphImpl queryGraph = graph.withParent(this);
+
+                       if(!registered) {
+                               request.register(graph, this);
+                               registered = true;
+                       }
+                       
+                       queryGraph.asyncBarrier.waitBarrier(request, graph);
+
+               } catch (Throwable t) {
+
+                       except(t);
+
+               }
+
+               performFromCache(graph, procedure);
+               
+               return getResult();
+
     }
 
+       @Override
+       public void execute(T result) {
+
+       if(this.result == REQUIRES_COMPUTATION) {
+               
+                       setResult(result);
+                       setReady();
+
+       } else {
+
+               synchronized(items) {
+                       items.addLast(result);
+                               graph.processor.updatePrimitive(request);
+                       // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
+                       // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
+               }
+               
+       }
+               
+       }
+
+       @Override
+       public void exception(Throwable t) {
+               except(t);
+       }
+
+       @Override
+       public boolean isDisposed() {
+               return registered && (isDiscarded() || !graph.processor.isBound(this));
+       }
+    
 }