]> gerrit.simantics Code Review - simantics/platform.git/commitdiff
Optimise performance of job scheduling in QueryListening 06/3806/5
authorAntti Villberg <antti.villberg@semantum.fi>
Tue, 21 Jan 2020 23:01:04 +0000 (01:01 +0200)
committerTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Fri, 24 Jan 2020 21:02:48 +0000 (21:02 +0000)
Reino was correct - this synchronization was indeed broken.

gitlab #450

Change-Id: I65280947385963d52073df78f604d98741beb12f

bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java

index 9daab950e3964a8681a12fabc5ef0b2cfaf74ceb..dd410b4ce7e8c57fba7e86d94a30dbb7b9ec1e9f 100644 (file)
@@ -14,9 +14,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Semaphore;
+import java.util.function.Consumer;
 
 import org.simantics.databoard.Bindings;
 import org.simantics.db.DevelopmentKeys;
@@ -37,372 +36,498 @@ public class QueryListening {
 
     private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
 
-       final private QueryProcessor                            processor;
-       private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();
-       private boolean                                         firingListeners       = false;
-       final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
-       private BlockingQueue<Runnable>                         tasks = new ArrayBlockingQueue<Runnable>(2048);
-       private Map<ListenerBase,ListenerEntry>                 addedEntries = new HashMap<>();
-       
-       QueryListening(QueryProcessor processor) {
-               this.processor = processor;
-               new DependencyManagementThread(processor, tasks).start();
-       }
-       
-       public void sync() {
-               Semaphore s = new Semaphore(0);
-               try {
-                       tasks.put(() -> {
-                               s.release();
-                       });
-                       s.acquire();
-               } catch (Throwable t) {
-                       LOGGER.error("Error while waiting for query dependency management", t);
-               }
-       }
-       
-       static class DependencyManagementThread extends Thread {
-               
-               final private QueryProcessor processor;
-               final BlockingQueue<Runnable> tasks;
-               
-               DependencyManagementThread(QueryProcessor processor, BlockingQueue<Runnable> tasks) {
-                       setName("Query Dependency Manager");
-                       this.processor = processor;
-                       this.tasks = tasks;
-               }
-               
-               @Override
-               public void run() {
-                       while(processor.isAlive()) {
-                               try {
-                                       Runnable r = tasks.take();
-                                       r.run();
-                               } catch (Throwable t) {
-                                       // Spurious problems?
-                                       LOGGER.error("Error while waiting for query dependency management tasks", t);
-                               }
-                       }
-               }
-               
-       }
-       
-       public boolean hasScheduledUpdates() {
-               return !scheduledListeners.isEmpty();
-       }
-
-       void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
-               
-               try {
-                       tasks.put(() -> {
-
-                               if (parent != null && !inferred) {
-                                       try {
-                                               if(!child.isImmutable(graph))
-                                                       child.addParent(parent);
-                                       } catch (DatabaseException e) {
-                                               LOGGER.error("Error while registering query dependencies", e);
-                                       }
-                                       if (Development.DEVELOPMENT) {
-                                               if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
-                                                       System.out.println(child + " -> " + parent);
-                                               }
-                                       }
-                               }
-
-                               if (listener != null)
-                                       registerListener(child, listener, procedure);
-
-                       });
-               } catch (InterruptedException e) {
-                       LOGGER.error("Error while registering dependencies", e);
-               }
-               
-       }
-
-       void registerFirstKnown(ListenerBase base, Object result) {
-               
-               tasks.offer(() -> {
-
-                       ListenerEntry entry = addedEntries.get(base);
-                       if(entry != null) entry.setLastKnown(result);
-                       
-               });
-               
-       }
-
-       public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
-
-               assert (entry != null);
-
-               if (base.isDisposed())
-                       return null;
-
-               return addListener(entry, base, procedure);
-
-       }
-
-       /*
-        * Registers a listener and returns an entry iff the entry was added
-        */
-       private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
-
-               assert (entry != null);
-               assert (procedure != null);
-
-               ArrayList<ListenerEntry> list = listeners.get(entry);
-               if (list == null) {
-                       list = new ArrayList<>(1);
-                       listeners.put(entry, list);
-               }
-
-               ListenerEntry result = new ListenerEntry(entry, base, procedure);
-               // Equals is here based on base
-               int currentIndex = list.indexOf(result);
-               // There was already a listener
-               if(currentIndex > -1) {
-                       ListenerEntry current = list.get(currentIndex);
-                       if(!current.base.isDisposed()) return null;
-                       list.set(currentIndex, result);
-               } else {
-                       list.add(result);
-               }
-
-               if (Development.DEVELOPMENT) {
-                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                               new Exception().printStackTrace();
-                               System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
-                       }
-               }
-               
-               addedEntries.put(base, result);
-               
-               return result;
-
-       }
-
-       void scheduleListener(ListenerEntry entry) {
-               assert (entry != null);
-               if (Development.DEVELOPMENT) {
-                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                               System.err.println("Scheduled " + entry.procedure);
-                       }
-               }
-               scheduledListeners.add(entry);
-       }
-
-       private void removeListener(ListenerEntry entry) {
-               assert (entry != null);
-               ArrayList<ListenerEntry> list = listeners.get(entry.entry);
-               if(list == null) return;
-               boolean success = list.remove(entry);
-               assert (success);
-               if (list.isEmpty())
-                       listeners.remove(entry.entry);
-       }
-
-       boolean hasListener(CacheEntry entry) {
-               if(listeners.get(entry) != null) return true;
-               return false;
-       }
-
-       boolean hasListenerAfterDisposing(CacheEntry entry) {
-               if(listeners.get(entry) != null) {
-                       ArrayList<ListenerEntry> entries = listeners.get(entry);
-                       ArrayList<ListenerEntry> list = null;
-                       for (ListenerEntry e : entries) {
-                               if (e.base.isDisposed()) {
-                                       if(list == null) list = new ArrayList<ListenerEntry>();
-                                       list.add(e);
-                               }
-                       }
-                       if(list != null) {
-                               for (ListenerEntry e : list) {
-                                       entries.remove(e);
-                               }
-                       }
-                       if (entries.isEmpty()) {
-                               listeners.remove(entry);
-                               return false;
-                       }
-                       return true;
-               }
-               return false;
-       }
-
-       List<ListenerEntry> getListenerEntries(CacheEntry entry) {
-               hasListenerAfterDisposing(entry);
-               if(listeners.get(entry) != null)
-                       return listeners.get(entry);
-               else 
-                       return Collections.emptyList();
-       }
-
-       void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
-
-               if(!workarea.containsKey(entry)) {
-
-                       HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
-                       for(ListenerEntry e : getListenerEntries(entry))
-                               ls.add(e.base);
-
-                       workarea.put(entry, ls);
-
-                       for(CacheEntry parent : entry.getParents(processor)) {
-                               processListenerReport(parent, workarea);
-                               ls.addAll(workarea.get(parent));
-                       }
-
-               }
-
-       }
-
-       public synchronized ListenerReport getListenerReport() throws IOException {
-
-               class ListenerReportImpl implements ListenerReport {
-
-                       Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
-
-                       @Override
-                       public void print(PrintStream b) {
-                               Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
-                               for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
-                                       for(ListenerBase l : e.getValue()) {
-                                               Integer i = hist.get(l);
-                                               hist.put(l, i != null ? i-1 : -1);
-                                       }
-                               }
-
-                               for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
-                                       b.print("" + -p.second + " " + p.first + "\n");
-                               }
-
-                               b.flush();
-                       }
-
-               }
-
-               ListenerReportImpl result = new ListenerReportImpl();
-
-               Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
-               for(CacheEntryBase entry : all) {
-                       hasListenerAfterDisposing(entry);
-               }
-               for(CacheEntryBase entry : all) {
-                       processListenerReport(entry, result.workarea);
-               }
-
-               return result;
-
-       }
-
-       public synchronized String reportListeners(File file) throws IOException {
-
-               if (!processor.isAlive())
-                       return "Disposed!";
-
-               PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
-               ListenerReport report = getListenerReport();
-               report.print(b);
-
-               return "Done reporting listeners.";
-
-       }
-
-       public void fireListeners(WriteGraphImpl graph) {
-
-               assert (!processor.updating);
-               assert (!processor.cache.collecting);
-               assert (!firingListeners);
-
-               firingListeners = true;
-
-               try {
-
-                       // Performing may cause further events to be scheduled.
-                       while (!scheduledListeners.isEmpty()) {
-
-                               // Clone current events to make new entries possible during
-                               // firing.
-                               THashSet<ListenerEntry> entries = scheduledListeners;
-                               scheduledListeners = new THashSet<ListenerEntry>();
-
-                               ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
-
-                               for (ListenerEntry listenerEntry : entries) {
-
-                                       if (pruneListener(listenerEntry)) {
-                                               if (Development.DEVELOPMENT) {
-                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                                                               new Exception().printStackTrace();
-                                                               System.err.println("Pruned " + listenerEntry.procedure);
-                                                       }
-                                               }
-                                               continue;
-                                       }
-
-                                       final CacheEntry entry = listenerEntry.entry;
-                                       assert (entry != null);
-
-                                       Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
-
-                                       if (newValue != ListenerEntry.NOT_CHANGED) {
-                                               if (Development.DEVELOPMENT) {
-                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                                                               new Exception().printStackTrace();
-                                                               System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
-                                                       }
-                                               }
-                                               schedule.add(listenerEntry);
-                                               listenerEntry.setLastKnown(entry.getResult());
-                                       }
-
-                               }
-
-                               for(ListenerEntry listenerEntry : schedule) {
-                                       final CacheEntry entry = listenerEntry.entry;
-                                       if (Development.DEVELOPMENT) {
-                                               if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                                                       System.err.println("Firing " + listenerEntry.procedure);
-                                               }
-                                       }
-                                       try {
-                                               if (Development.DEVELOPMENT) {
-                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
-                                                               System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
-                                                       }
-                                               }
-                                               entry.performFromCache(graph, listenerEntry.procedure);
-                                       } catch (Throwable t) {
-                                               t.printStackTrace();
-                                       }
-                               }
-
-                       }
-
-               } finally {
-                       firingListeners = false;
-               }
-
-       }
-       
-       void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
-
-               Iterable<CacheEntry> oldParents = entry.getParents(processor);
-               for (CacheEntry parent : oldParents) {
-                       if(!parent.isDiscarded())
-                               todo.push(new UpdateEntry(entry, parent, indent + 2));
-               }
-
-       }
-
-       private boolean pruneListener(ListenerEntry entry) {
-               if (entry.base.isDisposed()) {
-                       removeListener(entry);
-                       return true;
-               } else {
-                       return false;
-               }
-       }
-       
+    private final QueryProcessor                                    processor;
+    private final Scheduler                                         scheduler;
+    private final Consumer<Runnable>                                consumer;
+    private final Map<ListenerBase,ListenerEntry>                   addedEntries = new HashMap<>();
+
+    private THashSet<ListenerEntry>                                 scheduledListeners    = new THashSet<ListenerEntry>();
+    private boolean                                                 firingListeners       = false;
+
+    final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
+
+    private static class ThreadQueue extends ThreadLocal<ArrayList<Runnable>> {
+
+        private final Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
+        
+        private ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
+
+        @Override
+        protected synchronized ArrayList<Runnable> initialValue() {
+            ArrayList<Runnable> result = new ArrayList<>();
+            allQueues.put(Thread.currentThread(), result);
+            return result;
+        }
+
+        synchronized void sendToExecution() {
+            ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
+            dispatchedQueues.add(rs);
+            notify();
+        }
+
+        synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
+            ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
+            dispatchedQueues = new ArrayList<>();
+            return result;
+        }
+
+    }
+
+    private static class Scheduler {
+
+        private static final int BUFFER_SIZE = 100;
+
+        private final QueryProcessor processor;
+        
+        private final ThreadQueue queues = new ThreadQueue();
+
+        /*
+         * Access to this field is synchronized using the monitor of 'queues'
+         * The method flush() shall be called by the single writing thread and the field has been nulled before the
+         * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time.
+         */
+        private Semaphore flush = null;
+
+        Scheduler(QueryProcessor processor) {
+            this.processor = processor;
+        }
+        
+        private boolean isTerminated() {
+            return processor.isDisposed();
+        }
+
+        Consumer<Runnable> newConsumer() {
+            return (task) -> {
+
+                ArrayList<Runnable> l = queues.get();
+                l.add(task);
+                if(l.size() == BUFFER_SIZE) {
+                    queues.remove();
+                    queues.sendToExecution();
+                }
+                
+            };
+        }
+
+        void start() {
+            
+            Thread thread = new Thread() {
+                
+                public void run() {
+
+                    synchronized(queues) {
+                        while(!isTerminated()) {
+                            try {
+                                ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
+                                for(ArrayList<Runnable> queue : qs) {
+                                    for(Runnable r : queue)
+                                        r.run();
+                                }
+                                if(flush != null) {
+                                    for(ArrayList<Runnable> queue : queues.allQueues.values()) {
+                                        for(Runnable r : queue) {
+                                            r.run();   
+                                        }
+                                        queue.clear();
+                                    }
+                                    Semaphore s = flush;
+                                    flush = null;
+                                    s.release();
+                                }
+                                queues.wait(1000);
+                            } catch (InterruptedException e) {
+                                LOGGER.error("Unexpected interrupt", e);
+                            }
+                        }
+                    }
+
+                };
+                
+            };
+            
+            thread.setName("QueryListening");
+            thread.start();
+            
+        }
+
+        private Semaphore createFlush() {
+            synchronized(queues) {
+                flush = new Semaphore(0);
+                queues.notify();
+                return flush;
+            }
+        }
+        
+        void flush() {
+            try {
+                createFlush().acquire();
+            } catch (InterruptedException e) {
+                LOGGER.error("Unexpected interrupt", e);
+            }
+        }
+
+    }
+
+    private static class RegisterParentRunnable implements Runnable {
+        
+        private final CacheEntry parent;
+        private final CacheEntry child;
+        
+        public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
+            this.parent = parent;
+            this.child = child;
+        }
+
+        @Override
+        public void run() {
+            child.addParent(parent);
+            if (Development.DEVELOPMENT) {
+                if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+                    System.out.println(child + " -> " + parent);
+                }
+            }
+        }
+        
+    }
+    
+    private static class RegisterListenerRunnable implements Runnable {
+
+        private final QueryListening queryListening;
+        private final ListenerBase base;
+        private final Object procedure;
+        private final CacheEntry parent;
+        private final CacheEntry entry;
+
+        public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
+            this.queryListening = queryListening;
+            this.base = base;
+            this.procedure = procedure;
+            this.parent = parent;
+            this.entry = entry;
+        }
+
+        @Override
+        public void run() {
+
+            assert (entry != null);
+            assert (procedure != null);
+
+            ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
+            if (list == null) {
+                list = new ArrayList<>(1);
+                queryListening.listeners.put(entry, list);
+            }
+
+            ListenerEntry result = new ListenerEntry(entry, base, procedure);
+            // Equals is here based on base
+            int currentIndex = list.indexOf(result);
+            // There was already a listener
+            if(currentIndex > -1) {
+                ListenerEntry current = list.get(currentIndex);
+                if(!current.base.isDisposed())
+                    return;
+                list.set(currentIndex, result);
+            } else {
+                list.add(result);
+            }
+
+            if (Development.DEVELOPMENT) {
+                if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                    new Exception().printStackTrace();
+                    System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
+                }
+            }
+
+            queryListening.addedEntries.put(base, result);
+
+        }
+
+        
+    }
+    
+    QueryListening(QueryProcessor processor) {
+        
+        this.processor = processor;
+        scheduler = new Scheduler(processor);
+        consumer = scheduler.newConsumer();
+        scheduler.start();
+        
+    }
+
+    public boolean hasScheduledUpdates() {
+        return !scheduledListeners.isEmpty();
+    }
+    
+    void sync() {
+        try {
+            scheduler.flush();
+        } catch (Throwable t) {
+            LOGGER.error("Error while waiting for query dependency management", t);
+        }
+    }
+
+    void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
+
+        if(inferred) {
+            assert(listener == null);
+            return;
+        }
+
+        if(parent != null) {
+            try {
+                if(!child.isImmutable(graph))
+                    consumer.accept(new RegisterParentRunnable(parent, child));
+            } catch (DatabaseException e) {
+                LOGGER.error("Error while registering query dependencies", e);
+            }
+        }
+
+        if(listener != null)
+            if(!listener.isDisposed())
+                consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
+
+    }
+
+    void registerFirstKnown(ListenerBase base, Object result) {
+        
+        if(base == null) return;
+
+        consumer.accept(() -> {
+            ListenerEntry entry = addedEntries.get(base);
+            if(entry != null) entry.setLastKnown(result);
+        });
+
+    }
+
+    void scheduleListener(ListenerEntry entry) {
+        
+        assert (entry != null);
+        
+        if (Development.DEVELOPMENT) {
+            if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                System.err.println("Scheduled " + entry.procedure);
+            }
+        }
+        
+        scheduledListeners.add(entry);
+        
+    }
+
+    boolean hasListener(CacheEntry entry) {
+        if(listeners.get(entry) != null) return true;
+        return false;
+    }
+
+    boolean hasListenerAfterDisposing(CacheEntry entry) {
+        if(listeners.get(entry) != null) {
+            ArrayList<ListenerEntry> entries = listeners.get(entry);
+            ArrayList<ListenerEntry> list = null;
+            for (ListenerEntry e : entries) {
+                if (e.base.isDisposed()) {
+                    if(list == null) list = new ArrayList<ListenerEntry>();
+                    list.add(e);
+                }
+            }
+            if(list != null) {
+                for (ListenerEntry e : list) {
+                    entries.remove(e);
+                }
+            }
+            if (entries.isEmpty()) {
+                listeners.remove(entry);
+                return false;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
+
+        if(!workarea.containsKey(entry)) {
+
+            HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
+            for(ListenerEntry e : getListenerEntries(entry))
+                ls.add(e.base);
+
+            workarea.put(entry, ls);
+
+            for(CacheEntry parent : entry.getParents(processor)) {
+                processListenerReport(parent, workarea);
+                ls.addAll(workarea.get(parent));
+            }
+
+        }
+
+    }
+
+    public synchronized ListenerReport getListenerReport() throws IOException {
+
+        class ListenerReportImpl implements ListenerReport {
+
+            Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
+
+            @Override
+            public void print(PrintStream b) {
+                Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
+                for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
+                    for(ListenerBase l : e.getValue()) {
+                        Integer i = hist.get(l);
+                        hist.put(l, i != null ? i-1 : -1);
+                    }
+                }
+
+                for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
+                    b.print("" + -p.second + " " + p.first + "\n");
+                }
+
+                b.flush();
+            }
+
+        }
+
+        ListenerReportImpl result = new ListenerReportImpl();
+
+        Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
+        for(CacheEntryBase entry : all) {
+            hasListenerAfterDisposing(entry);
+        }
+        for(CacheEntryBase entry : all) {
+            processListenerReport(entry, result.workarea);
+        }
+
+        return result;
+
+    }
+
+    public synchronized String reportListeners(File file) throws IOException {
+
+        if (!processor.isAlive())
+            return "Disposed!";
+
+        PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
+        ListenerReport report = getListenerReport();
+        report.print(b);
+
+        return "Done reporting listeners.";
+
+    }
+
+    public void fireListeners(WriteGraphImpl graph) {
+
+        assert (!processor.updating);
+        assert (!processor.cache.collecting);
+        assert (!firingListeners);
+
+        firingListeners = true;
+
+        try {
+
+            // Performing may cause further events to be scheduled.
+            while (!scheduledListeners.isEmpty()) {
+
+                // Clone current events to make new entries possible during
+                // firing.
+                THashSet<ListenerEntry> entries = scheduledListeners;
+                scheduledListeners = new THashSet<ListenerEntry>();
+
+                ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
+
+                for (ListenerEntry listenerEntry : entries) {
+
+                    if (pruneListener(listenerEntry)) {
+                        if (Development.DEVELOPMENT) {
+                            if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                                new Exception().printStackTrace();
+                                System.err.println("Pruned " + listenerEntry.procedure);
+                            }
+                        }
+                        continue;
+                    }
+
+                    final CacheEntry entry = listenerEntry.entry;
+                    assert (entry != null);
+
+                    Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
+
+                    if (newValue != ListenerEntry.NOT_CHANGED) {
+                        if (Development.DEVELOPMENT) {
+                            if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                                new Exception().printStackTrace();
+                                System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
+                            }
+                        }
+                        schedule.add(listenerEntry);
+                        listenerEntry.setLastKnown(entry.getResult());
+                    }
+
+                }
+
+                for(ListenerEntry listenerEntry : schedule) {
+                    final CacheEntry entry = listenerEntry.entry;
+                    if (Development.DEVELOPMENT) {
+                        if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                            System.err.println("Firing " + listenerEntry.procedure);
+                        }
+                    }
+                    try {
+                        if (Development.DEVELOPMENT) {
+                            if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                                System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
+                            }
+                        }
+                        entry.performFromCache(graph, listenerEntry.procedure);
+                    } catch (Throwable t) {
+                        LOGGER.error("Unexpected exception ", t);
+                    }
+                }
+
+            }
+
+        } finally {
+            firingListeners = false;
+        }
+
+    }
+
+    void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
+
+        Iterable<CacheEntry> oldParents = entry.getParents(processor);
+        for (CacheEntry parent : oldParents) {
+            if(!parent.isDiscarded())
+                todo.push(new UpdateEntry(entry, parent, indent + 2));
+        }
+
+    }
+
+    private boolean pruneListener(ListenerEntry entry) {
+        
+        if (entry.base.isDisposed()) {
+            
+            assert (entry != null);
+            ArrayList<ListenerEntry> list = listeners.get(entry.entry);
+            if(list != null) {
+                boolean success = list.remove(entry);
+                assert (success);
+                if (list.isEmpty())
+                    listeners.remove(entry.entry);
+            }
+
+            return true;
+            
+        } else {
+            
+            return false;
+            
+        }
+    }
+
+    private List<ListenerEntry> getListenerEntries(CacheEntry entry) {
+        hasListenerAfterDisposing(entry);
+        if(listeners.get(entry) != null)
+            return listeners.get(entry);
+        else 
+            return Collections.emptyList();
+    }
+    
 }