]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryListening.java
index e3bd437416af310936e66533746b270d8228aba8..e0b39d6516055656995689f90c1506c19bb26265 100644 (file)
@@ -14,15 +14,12 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.function.Consumer;
 
 import org.simantics.databoard.Bindings;
 import org.simantics.db.DevelopmentKeys;
 import org.simantics.db.debug.ListenerReport;
 import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.graph.ReadGraphImpl;
-import org.simantics.db.impl.graph.WriteGraphImpl;
 import org.simantics.db.procedure.ListenerBase;
 import org.simantics.utils.Development;
 import org.simantics.utils.datastructures.Pair;
@@ -34,11 +31,10 @@ import gnu.trove.set.hash.THashSet;
 
 public class QueryListening {
 
-    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
+    static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
 
     private final QueryProcessor                                    processor;
     private final Scheduler                                         scheduler;
-    private final Consumer<Runnable>                                consumer;
     private final Map<ListenerBase,ListenerEntry>                   addedEntries = new HashMap<>();
 
     private THashSet<ListenerEntry>                                 scheduledListeners    = new THashSet<ListenerEntry>();
@@ -46,210 +42,11 @@ public class QueryListening {
 
     final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
 
-    private static class ThreadQueue extends ThreadLocal<ArrayList<Runnable>> {
-
-        private final Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
-        
-        private ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
-
-        @Override
-        protected synchronized ArrayList<Runnable> initialValue() {
-            ArrayList<Runnable> result = new ArrayList<>();
-            allQueues.put(Thread.currentThread(), result);
-            return result;
-        }
-
-        synchronized void sendToExecution() {
-            ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
-            dispatchedQueues.add(rs);
-            notify();
-        }
-
-        synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
-            ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
-            dispatchedQueues = new ArrayList<>();
-            return result;
-        }
-
-    }
-
-    private static class Scheduler {
-
-        private static final int BUFFER_SIZE = 100;
-
-        private final QueryProcessor processor;
-        
-        private final ThreadQueue queues = new ThreadQueue();
-
-        /*
-         * Access to this field is synchronized using the monitor of 'queues'
-         * The method flush() shall be called by the single writing thread and the field has been nulled before the
-         * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time.
-         */
-        private Semaphore flush = null;
-
-        Scheduler(QueryProcessor processor) {
-            this.processor = processor;
-        }
-        
-        private boolean isTerminated() {
-            return processor.isDisposed();
-        }
-
-        Consumer<Runnable> newConsumer() {
-            return (task) -> {
-
-                ArrayList<Runnable> l = queues.get();
-                l.add(task);
-                if(l.size() == BUFFER_SIZE) {
-                    queues.remove();
-                    queues.sendToExecution();
-                }
-                
-            };
-        }
-
-        void start() {
-            
-            Thread thread = new Thread() {
-                
-                public void run() {
-
-                    synchronized(queues) {
-                        while(!isTerminated()) {
-                            try {
-                                ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
-                                for(ArrayList<Runnable> queue : qs) {
-                                    for(Runnable r : queue)
-                                        r.run();
-                                }
-                                if(flush != null) {
-                                    for(ArrayList<Runnable> queue : queues.allQueues.values()) {
-                                        for(Runnable r : queue) {
-                                            r.run();   
-                                        }
-                                        queue.clear();
-                                    }
-                                    Semaphore s = flush;
-                                    flush = null;
-                                    s.release();
-                                }
-                                queues.wait(1000);
-                            } catch (InterruptedException e) {
-                                LOGGER.error("Unexpected interrupt", e);
-                            }
-                        }
-                    }
-
-                };
-                
-            };
-            
-            thread.setName("QueryListening");
-            thread.start();
-            
-        }
-
-        private Semaphore createFlush() {
-            synchronized(queues) {
-                flush = new Semaphore(0);
-                queues.notify();
-                return flush;
-            }
-        }
-        
-        void flush() {
-            try {
-                createFlush().acquire();
-            } catch (InterruptedException e) {
-                LOGGER.error("Unexpected interrupt", e);
-            }
-        }
-
-    }
-
-    private static class RegisterParentRunnable implements Runnable {
-        
-        private final CacheEntry parent;
-        private final CacheEntry child;
-        
-        public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
-            this.parent = parent;
-            this.child = child;
-        }
-
-        @Override
-        public void run() {
-            child.addParent(parent);
-            if (Development.DEVELOPMENT) {
-                if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
-                    System.out.println(child + " -> " + parent);
-                }
-            }
-        }
-        
-    }
-    
-    private static class RegisterListenerRunnable implements Runnable {
-
-        private final QueryListening queryListening;
-        private final ListenerBase base;
-        private final Object procedure;
-        private final CacheEntry parent;
-        private final CacheEntry entry;
-
-        public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
-            this.queryListening = queryListening;
-            this.base = base;
-            this.procedure = procedure;
-            this.parent = parent;
-            this.entry = entry;
-        }
-
-        @Override
-        public void run() {
-
-            assert (entry != null);
-            assert (procedure != null);
-
-            ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
-            if (list == null) {
-                list = new ArrayList<>(1);
-                queryListening.listeners.put(entry, list);
-            }
-
-            ListenerEntry result = new ListenerEntry(entry, base, procedure);
-            // Equals is here based on base
-            int currentIndex = list.indexOf(result);
-            // There was already a listener
-            if(currentIndex > -1) {
-                ListenerEntry current = list.get(currentIndex);
-                if(!current.base.isDisposed())
-                    return;
-                list.set(currentIndex, result);
-            } else {
-                list.add(result);
-            }
-
-            if (Development.DEVELOPMENT) {
-                if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                    new Exception().printStackTrace();
-                    System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
-                }
-            }
-
-            queryListening.addedEntries.put(base, result);
-
-        }
-
-        
-    }
     
     QueryListening(QueryProcessor processor) {
         
         this.processor = processor;
         scheduler = new Scheduler(processor);
-        consumer = scheduler.newConsumer();
         scheduler.start();
         
     }
@@ -260,7 +57,7 @@ public class QueryListening {
     
     void sync() {
         try {
-            scheduler.flush();
+            scheduler.sync();
         } catch (Throwable t) {
             LOGGER.error("Error while waiting for query dependency management", t);
         }
@@ -276,7 +73,7 @@ public class QueryListening {
         if(parent != null) {
             try {
                 if(!child.isImmutable(graph))
-                    consumer.accept(new RegisterParentRunnable(parent, child));
+                    scheduler.accept(new RegisterParentRunnable(parent, child));
             } catch (DatabaseException e) {
                 LOGGER.error("Error while registering query dependencies", e);
             }
@@ -284,7 +81,7 @@ public class QueryListening {
 
         if(listener != null)
             if(!listener.isDisposed())
-                consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
+               scheduler.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
 
     }
 
@@ -292,10 +89,7 @@ public class QueryListening {
         
         if(base == null) return;
 
-        consumer.accept(() -> {
-            ListenerEntry entry = addedEntries.get(base);
-            if(entry != null) entry.setLastKnown(result);
-        });
+        scheduler.accept(new RegisterFirstKnownRunnable(addedEntries, base, result));
 
     }
 
@@ -529,5 +323,102 @@ public class QueryListening {
         else 
             return Collections.emptyList();
     }
-    
+
+    private static class RegisterParentRunnable implements Runnable {
+
+        private final CacheEntry parent;
+        private final CacheEntry child;
+
+        public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
+            this.parent = parent;
+            this.child = child;
+        }
+
+        @Override
+        public void run() {
+            child.addParent(parent);
+            if (Development.DEVELOPMENT) {
+                if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+                    System.out.println(child + " -> " + parent);
+                }
+            }
+        }
+
+    }
+
+    private static class RegisterListenerRunnable implements Runnable {
+
+        private final QueryListening queryListening;
+        private final ListenerBase base;
+        private final Object procedure;
+        private final CacheEntry parent;
+        private final CacheEntry entry;
+
+        public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
+            this.queryListening = queryListening;
+            this.base = base;
+            this.procedure = procedure;
+            this.parent = parent;
+            this.entry = entry;
+        }
+
+        @Override
+        public void run() {
+
+            assert (entry != null);
+            assert (procedure != null);
+
+            ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
+            if (list == null) {
+                list = new ArrayList<>(1);
+                queryListening.listeners.put(entry, list);
+            }
+
+            ListenerEntry result = new ListenerEntry(entry, base, procedure);
+            // Equals is here based on base
+            int currentIndex = list.indexOf(result);
+            // There was already a listener
+            if(currentIndex > -1) {
+                ListenerEntry current = list.get(currentIndex);
+                if(!current.base.isDisposed())
+                    return;
+                list.set(currentIndex, result);
+            } else {
+                list.add(result);
+            }
+
+            if (Development.DEVELOPMENT) {
+                if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                    new Exception().printStackTrace();
+                    System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
+                }
+            }
+
+            queryListening.addedEntries.put(base, result);
+
+        }
+
+
+    }
+
+    private static class RegisterFirstKnownRunnable implements Runnable {
+
+        private final Map<ListenerBase,ListenerEntry> addedEntries;
+        private final ListenerBase base;
+        private final Object result;
+
+        public RegisterFirstKnownRunnable(Map<ListenerBase,ListenerEntry> addedEntries, ListenerBase base, Object result) {
+            this.addedEntries = addedEntries;
+            this.base = base;
+            this.result = result;
+        }
+
+        @Override
+        public void run() {
+            ListenerEntry entry = addedEntries.remove(base);
+            if(entry != null) entry.setLastKnown(result);
+        }
+
+    }
+
 }