]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/ExternalReadEntry.java
QueryListening sync is slow
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / ExternalReadEntry.java
index 18d91dc0618643e092aa4d242f6a50545cbf6c6e..d044043394ef771f7e7e0cfab3e5257f77883db6 100644 (file)
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
  * in Industry THTH ry.
  * All rights reserved. This program and the accompanying materials
  * are made available under the terms of the Eclipse Public License v1.0
  *******************************************************************************/
 package org.simantics.db.impl.query;
 
-import java.util.ArrayList;
 import java.util.LinkedList;
 
+import org.simantics.databoard.Bindings;
+import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.graph.ReadGraphImpl;
-import org.simantics.db.procedure.Procedure;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.procedure.Listener;
 import org.simantics.db.request.ExternalRead;
 import org.simantics.db.request.RequestFlags;
+import org.simantics.utils.Development;
 
-final public class ExternalReadEntry<T> extends CacheEntryBase {
+final public class ExternalReadEntry<T> extends CacheEntryBase<AsyncProcedure<T>> implements Listener<T> {
 
     final LinkedList<T> items = new LinkedList<T>();
-    
-//    public ArrayList<Procedure<T>> procs;
 
-    protected ExternalRead<T> request;
+    protected ExternalRead<T> id;
+    protected ReadGraphImpl graph;
+    protected boolean registered = false;
 
     @Override
     int makeHash() {
-       return request.hashCode();
+       return id.hashCode();
     }
     
     @Override
     public Object getOriginalRequest() {
-        return request;
+        return id;
     }
     
     @Override
@@ -43,89 +47,88 @@ final public class ExternalReadEntry<T> extends CacheEntryBase {
     
     @Override
     public void discard() {
-        request.unregistered();
-        request = null;
+        id.unregistered();
+        id = null;
+        graph = null;
         super.discard();
     }
-    
-    public ExternalReadEntry(ExternalRead<T> request) {
-        assert request != null;
-        this.request = request;
+
+    @Override
+    public void setPending(QuerySupport querySupport) {
+        //if(result != NO_RESULT) {
+            //new Exception("result = " + result).printStackTrace();
+        //}
+        statusOrException = PENDING;
+        result = REQUIRES_COMPUTATION;
     }
     
-    final public void queue(T item) {
-       synchronized(items) {
-               items.addLast(item);
-               // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
-               // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
-       }
+    public ExternalReadEntry(ExternalRead<T> request, ReadGraphImpl graph) {
+        assert request != null;
+        this.id = request;
+        this.graph = graph;
     }
     
-    final public void addOrSet(QueryProcessor processor, Object item) {
+    @Override
+    public void except(Throwable t) {
+       
+               if (Development.DEVELOPMENT) {
+                       if(Development.<Boolean>getProperty(DevelopmentKeys.CACHE_ENTRY_STATE, Bindings.BOOLEAN)) {
+                               System.err.println("[QUERY STATE]: excepted " + this);
+                       }
+               }
 
-        try {
+        if(statusOrException != DISCARDED) {
+            statusOrException = EXCEPTED;
+            result = t;
+        } else {
+            result = t;
+        }
         
-            assert(isPending());
-
-            ArrayList<Procedure<T>> p = null;
-
-            synchronized(this) {
-
-                setResult(item);
-                setReady();
-//                p = procs;
-//                procs = null;
-
-            }
-
-//            if(p != null)
-//                for(Procedure proc : p) {
-//                    proc.execute((T)item);
-//                }
+        assert(isExcepted());
+        
+    }
+    
+    @Override
+    public void setResult(Object result) {
 
-        } catch (Throwable t) {
-            t.printStackTrace();
-        }
+        super.setResult(result);
+        assert(!(result instanceof Throwable));
+        assert(!isExcepted());
         
     }
     
+    @Override
+    public void setReady() {
+       super.setReady();
+    }
+
     @Override
     final public Query getQuery() {
        
         return new Query() {
 
                        @Override
-                       public void recompute(ReadGraphImpl graph, Object provider, CacheEntry entry) {
-
-                           final QueryProcessor qp = (QueryProcessor)provider;
-                           synchronized(items) {
-
-                               if(entry.isExcepted()) {
-                                       
-                                       // Exception persists
-                                       
-                               } else {
-                               
-                                       // Update
-                                       if(!items.isEmpty()) {
-                                               setResult(items.removeFirst());
-                                       }
-                                       // Reschedule
-                                           if(!items.isEmpty()) {
-                                               qp.updatePrimitive(request);
-                                           }
-                                           
+                       public void recompute(ReadGraphImpl graph) {
+
+                               synchronized(items) {
+
+                                       // Update
+                                       if(!items.isEmpty()) {
                                            setReady();
-                                           
-                               }
-                               
-                           }
+                                               setResult(items.removeFirst());
+                                       }
+                                       // Reschedule
+                                       if(!items.isEmpty()) {
+                                               graph.processor.updatePrimitive(id);
+                                       }
+
+                               }
                                
                        }
 
                        @Override
                        public void removeEntry(QueryProcessor processor) {
-                               processor.externalReadMap.remove(request);
+                               processor.cache.remove(ExternalReadEntry.this);
                        }
 
                        @Override
@@ -135,44 +138,37 @@ final public class ExternalReadEntry<T> extends CacheEntryBase {
                        
                        @Override
                        public String toString() {
-                               if(request == null) return "DISCARDED ExternalRead";
-                               else return request.toString();
+                               if(id == null) return "DISCARDED ExternalRead";
+                               else return id.toString();
                        }
                
         };
         
     }
 
-       public void performFromCache(Object procedure) {
-               
-        Procedure<T> proc = (Procedure<T>)procedure;
+       @Override
+       public String toString() {
+               if(id == null) return "DISCARDED ExternalRead " + System.identityHashCode(this);
+               else return id.toString() + " " + + System.identityHashCode(this);
+       }
+
+    @Override
+    public Object performFromCache(ReadGraphImpl graph, AsyncProcedure<T> procedure) {
+       
+        AsyncProcedure<T> proc = (AsyncProcedure<T>)procedure;
 
            if(isExcepted()) {
             
-            proc.exception((Throwable)getResult());
+            proc.exception(graph, (Throwable)getResult());
             
         } else {
             
-            proc.execute((T)getResult());
+            proc.execute(graph, (T)getResult());
 
         }
-               
-       }
-
-       @Override
-       public String toString() {
-               if(request == null) return "DISCARDED ExternalRead " + System.identityHashCode(this);
-               else return request.toString() + " " + + System.identityHashCode(this);
-       }
-
-    @Override
-    public void performFromCache(ReadGraphImpl graph, Object provider, Object procedure) {
-        performFromCache(procedure);
-    }
-    
-    @Override
-    public void setReady() {
-       super.setReady();
+           
+           return getResult();
+       
     }
     
     @Override
@@ -180,4 +176,60 @@ final public class ExternalReadEntry<T> extends CacheEntryBase {
        // Do nothing - the state is already set and cannot be recomputed on demand
     }
 
+    public Object compute(ReadGraphImpl graph, AsyncProcedure<T> procedure) throws DatabaseException {
+       
+               try {
+                       
+                       ReadGraphImpl queryGraph = graph.withParent(this, null, true);
+
+                       if(!registered) {
+                               id.register(graph, this);
+                               registered = true;
+                       }
+                       
+                       queryGraph.asyncBarrier.waitBarrier(id, graph);
+
+               } catch (Throwable t) {
+
+                       except(t);
+
+               }
+
+               performFromCache(graph, procedure);
+               
+               return getResult();
+
+    }
+
+       @Override
+       public void execute(T result) {
+
+       if(this.result == REQUIRES_COMPUTATION) {
+               
+                       setResult(result);
+                       setReady();
+
+       } else {
+
+               synchronized(items) {
+                       items.addLast(result);
+                               graph.processor.updatePrimitive(id);
+                       // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated
+                       // In some cases where data is produced really fast this might be necessary but currently this queueing will do.
+               }
+               
+       }
+               
+       }
+
+       @Override
+       public void exception(Throwable t) {
+               except(t);
+       }
+
+       @Override
+       public boolean isDisposed() {
+               return registered && (isDiscarded() || !graph.processor.isBound(this));
+       }
+    
 }