]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
Separate query dependency management from QueryProcessor
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryListening.java
diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
new file mode 100644 (file)
index 0000000..e524a08
--- /dev/null
@@ -0,0 +1,404 @@
+package org.simantics.db.impl.query;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.debug.ListenerReport;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.graph.WriteGraphImpl;
+import org.simantics.db.procedure.ListenerBase;
+import org.simantics.utils.Development;
+import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.datastructures.collections.CollectionUtils;
+import org.slf4j.LoggerFactory;
+
+import gnu.trove.map.hash.THashMap;
+import gnu.trove.set.hash.THashSet;
+
+public class QueryListening {
+
+    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
+
+       final private QueryProcessor                            processor;
+       private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();
+       private boolean                                         firingListeners       = false;
+       final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
+       private BlockingQueue<Runnable>                         tasks = new LinkedBlockingQueue<Runnable>();
+       private Map<ListenerBase,ListenerEntry>                 addedEntries = new HashMap<>();
+       
+       QueryListening(QueryProcessor processor) {
+               this.processor = processor;
+               new DependencyManagementThread(processor, tasks).start();
+       }
+       
+       public void sync() {
+               Semaphore s = new Semaphore(0);
+               tasks.add(() -> {
+                       s.release();
+               });
+               try {
+                       s.acquire();
+               } catch (Throwable t) {
+                       LOGGER.error("Error while waiting for query dependency management", t);
+               }
+       }
+       
+       static class DependencyManagementThread extends Thread {
+               
+               final private QueryProcessor processor;
+               final BlockingQueue<Runnable> tasks;
+               
+               DependencyManagementThread(QueryProcessor processor, BlockingQueue<Runnable> tasks) {
+                       setName("Query Dependency Manager");
+                       this.processor = processor;
+                       this.tasks = tasks;
+               }
+               
+               @Override
+               public void run() {
+                       while(processor.isAlive()) {
+                               try {
+                                       Runnable r = tasks.take();
+                                       r.run();
+                               } catch (Throwable t) {
+                                       // Spurious problems?
+                                       LOGGER.error("Error while waiting for query dependency management tasks", t);
+                               }
+                       }
+               }
+               
+       }
+       
+       public boolean hasScheduledUpdates() {
+               return !scheduledListeners.isEmpty();
+       }
+
+       void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
+               
+               tasks.offer(() -> {
+
+                       if (parent != null && !inferred) {
+                               try {
+                                       if(!child.isImmutable(graph))
+                                               child.addParent(parent);
+                               } catch (DatabaseException e) {
+                                       LOGGER.error("Error while registering query dependencies", e);
+                               }
+                               if (Development.DEVELOPMENT) {
+                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+                                               System.out.println(child + " -> " + parent);
+                                       }
+                               }
+                       }
+
+                       if (listener != null)
+                               registerListener(child, listener, procedure);
+
+               });
+               
+       }
+
+       void registerFirstKnown(ListenerBase base, Object result) {
+               
+               tasks.offer(() -> {
+
+                       ListenerEntry entry = addedEntries.get(base);
+                       if(entry != null) entry.setLastKnown(result);
+                       
+               });
+               
+       }
+
+       public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
+
+               assert (entry != null);
+
+               if (base.isDisposed())
+                       return null;
+
+               return addListener(entry, base, procedure);
+
+       }
+
+       /*
+        * Registers a listener and returns an entry iff the entry was added
+        */
+       private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
+
+               assert (entry != null);
+               assert (procedure != null);
+
+               ArrayList<ListenerEntry> list = listeners.get(entry);
+               if (list == null) {
+                       list = new ArrayList<>(1);
+                       listeners.put(entry, list);
+               }
+
+               ListenerEntry result = new ListenerEntry(entry, base, procedure);
+               // Equals is here based on base
+               int currentIndex = list.indexOf(result);
+               // There was already a listener
+               if(currentIndex > -1) {
+                       ListenerEntry current = list.get(currentIndex);
+                       if(!current.base.isDisposed()) return null;
+                       list.set(currentIndex, result);
+               } else {
+                       list.add(result);
+               }
+
+               if (Development.DEVELOPMENT) {
+                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                               new Exception().printStackTrace();
+                               System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
+                       }
+               }
+               
+               addedEntries.put(base, result);
+               
+               return result;
+
+       }
+
+       void scheduleListener(ListenerEntry entry) {
+               assert (entry != null);
+               if (Development.DEVELOPMENT) {
+                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                               System.err.println("Scheduled " + entry.procedure);
+                       }
+               }
+               scheduledListeners.add(entry);
+       }
+
+       private void removeListener(ListenerEntry entry) {
+               assert (entry != null);
+               ArrayList<ListenerEntry> list = listeners.get(entry.entry);
+               if(list == null) return;
+               boolean success = list.remove(entry);
+               assert (success);
+               if (list.isEmpty())
+                       listeners.remove(entry.entry);
+       }
+
+       boolean hasListener(CacheEntry entry) {
+               if(listeners.get(entry) != null) return true;
+               return false;
+       }
+
+       boolean hasListenerAfterDisposing(CacheEntry entry) {
+               if(listeners.get(entry) != null) {
+                       ArrayList<ListenerEntry> entries = listeners.get(entry);
+                       ArrayList<ListenerEntry> list = null;
+                       for (ListenerEntry e : entries) {
+                               if (e.base.isDisposed()) {
+                                       if(list == null) list = new ArrayList<ListenerEntry>();
+                                       list.add(e);
+                               }
+                       }
+                       if(list != null) {
+                               for (ListenerEntry e : list) {
+                                       entries.remove(e);
+                               }
+                       }
+                       if (entries.isEmpty()) {
+                               listeners.remove(entry);
+                               return false;
+                       }
+                       return true;
+               }
+               return false;
+       }
+
+       List<ListenerEntry> getListenerEntries(CacheEntry entry) {
+               hasListenerAfterDisposing(entry);
+               if(listeners.get(entry) != null)
+                       return listeners.get(entry);
+               else 
+                       return Collections.emptyList();
+       }
+
+       void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
+
+               if(!workarea.containsKey(entry)) {
+
+                       HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
+                       for(ListenerEntry e : getListenerEntries(entry))
+                               ls.add(e.base);
+
+                       workarea.put(entry, ls);
+
+                       for(CacheEntry parent : entry.getParents(processor)) {
+                               processListenerReport(parent, workarea);
+                               ls.addAll(workarea.get(parent));
+                       }
+
+               }
+
+       }
+
+       public synchronized ListenerReport getListenerReport() throws IOException {
+
+               class ListenerReportImpl implements ListenerReport {
+
+                       Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
+
+                       @Override
+                       public void print(PrintStream b) {
+                               Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
+                               for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
+                                       for(ListenerBase l : e.getValue()) {
+                                               Integer i = hist.get(l);
+                                               hist.put(l, i != null ? i-1 : -1);
+                                       }
+                               }
+
+                               for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
+                                       b.print("" + -p.second + " " + p.first + "\n");
+                               }
+
+                               b.flush();
+                       }
+
+               }
+
+               ListenerReportImpl result = new ListenerReportImpl();
+
+               Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
+               for(CacheEntryBase entry : all) {
+                       hasListenerAfterDisposing(entry);
+               }
+               for(CacheEntryBase entry : all) {
+                       processListenerReport(entry, result.workarea);
+               }
+
+               return result;
+
+       }
+
+       public synchronized String reportListeners(File file) throws IOException {
+
+               if (!processor.isAlive())
+                       return "Disposed!";
+
+               PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
+               ListenerReport report = getListenerReport();
+               report.print(b);
+
+               return "Done reporting listeners.";
+
+       }
+
+       public void fireListeners(WriteGraphImpl graph) {
+
+               assert (!processor.updating);
+               assert (!processor.cache.collecting);
+               assert (!firingListeners);
+
+               firingListeners = true;
+
+               try {
+
+                       // Performing may cause further events to be scheduled.
+                       while (!scheduledListeners.isEmpty()) {
+
+                               // Clone current events to make new entries possible during
+                               // firing.
+                               THashSet<ListenerEntry> entries = scheduledListeners;
+                               scheduledListeners = new THashSet<ListenerEntry>();
+
+                               ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
+
+                               for (ListenerEntry listenerEntry : entries) {
+
+                                       if (pruneListener(listenerEntry)) {
+                                               if (Development.DEVELOPMENT) {
+                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                                                               new Exception().printStackTrace();
+                                                               System.err.println("Pruned " + listenerEntry.procedure);
+                                                       }
+                                               }
+                                               continue;
+                                       }
+
+                                       final CacheEntry entry = listenerEntry.entry;
+                                       assert (entry != null);
+
+                                       Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
+
+                                       if (newValue != ListenerEntry.NOT_CHANGED) {
+                                               if (Development.DEVELOPMENT) {
+                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                                                               new Exception().printStackTrace();
+                                                               System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
+                                                       }
+                                               }
+                                               schedule.add(listenerEntry);
+                                               listenerEntry.setLastKnown(entry.getResult());
+                                       }
+
+                               }
+
+                               for(ListenerEntry listenerEntry : schedule) {
+                                       final CacheEntry entry = listenerEntry.entry;
+                                       if (Development.DEVELOPMENT) {
+                                               if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                                                       System.err.println("Firing " + listenerEntry.procedure);
+                                               }
+                                       }
+                                       try {
+                                               if (Development.DEVELOPMENT) {
+                                                       if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+                                                               System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
+                                                       }
+                                               }
+                                               entry.performFromCache(graph, listenerEntry.procedure);
+                                       } catch (Throwable t) {
+                                               t.printStackTrace();
+                                       }
+                               }
+
+                       }
+
+               } finally {
+                       firingListeners = false;
+               }
+
+       }
+       
+       void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
+
+               Iterable<CacheEntry> oldParents = entry.getParents(processor);
+               for (CacheEntry parent : oldParents) {
+                       if(!parent.isDiscarded())
+                               todo.push(new UpdateEntry(entry, parent, indent + 2));
+               }
+
+       }
+
+       private boolean pruneListener(ListenerEntry entry) {
+               if (entry.base.isDisposed()) {
+                       removeListener(entry);
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+       
+}