package org.simantics.db.impl.query; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import org.simantics.databoard.Bindings; import org.simantics.db.DevelopmentKeys; import org.simantics.db.debug.ListenerReport; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.procedure.ListenerBase; import org.simantics.utils.Development; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.datastructures.collections.CollectionUtils; import org.slf4j.LoggerFactory; import gnu.trove.map.hash.THashMap; import gnu.trove.set.hash.THashSet; public class QueryListening { static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); private final QueryProcessor processor; private final Scheduler scheduler; private final Map addedEntries = new HashMap<>(); private THashSet scheduledListeners = new THashSet(); private boolean firingListeners = false; final THashMap> listeners = new THashMap>(10, 0.75f); QueryListening(QueryProcessor processor) { this.processor = processor; scheduler = new Scheduler(processor); scheduler.start(); } public boolean hasScheduledUpdates() { return !scheduledListeners.isEmpty(); } void sync() { try { scheduler.sync(); } catch (Throwable t) { LOGGER.error("Error while waiting for query dependency management", t); } } void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { if(inferred) { assert(listener == null); return; } if(parent != null) { try { if(!child.isImmutable(graph)) scheduler.accept(new RegisterParentRunnable(parent, child)); } catch (DatabaseException e) { LOGGER.error("Error while registering query dependencies", e); } } if(listener != null) if(!listener.isDisposed()) scheduler.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child)); } void registerFirstKnown(ListenerBase base, Object result) { if(base == null) return; scheduler.accept(new RegisterFirstKnownRunnable(addedEntries, base, result)); } void scheduleListener(ListenerEntry entry) { assert (entry != null); if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Scheduled " + entry.procedure); } } scheduledListeners.add(entry); } boolean hasListener(CacheEntry entry) { if(listeners.get(entry) != null) return true; return false; } boolean hasListenerAfterDisposing(CacheEntry entry) { if(listeners.get(entry) != null) { ArrayList entries = listeners.get(entry); ArrayList list = null; for (ListenerEntry e : entries) { if (e.base.isDisposed()) { if(list == null) list = new ArrayList(); list.add(e); } } if(list != null) { for (ListenerEntry e : list) { entries.remove(e); } } if (entries.isEmpty()) { listeners.remove(entry); return false; } return true; } return false; } void processListenerReport(CacheEntry entry, Map> workarea) { if(!workarea.containsKey(entry)) { HashSet ls = new HashSet(); for(ListenerEntry e : getListenerEntries(entry)) ls.add(e.base); workarea.put(entry, ls); for(CacheEntry parent : entry.getParents(processor)) { processListenerReport(parent, workarea); ls.addAll(workarea.get(parent)); } } } public synchronized ListenerReport getListenerReport() throws IOException { class ListenerReportImpl implements ListenerReport { Map> workarea = new HashMap>(); @Override public void print(PrintStream b) { Map hist = new HashMap(); for(Map.Entry> e : workarea.entrySet()) { for(ListenerBase l : e.getValue()) { Integer i = hist.get(l); hist.put(l, i != null ? i-1 : -1); } } for(Pair p : CollectionUtils.valueSortedEntries(hist)) { b.print("" + -p.second + " " + p.first + "\n"); } b.flush(); } } ListenerReportImpl result = new ListenerReportImpl(); Collection all = processor.allCaches(new CacheCollectionResult()).toCollection(); for(CacheEntryBase entry : all) { hasListenerAfterDisposing(entry); } for(CacheEntryBase entry : all) { processListenerReport(entry, result.workarea); } return result; } public synchronized String reportListeners(File file) throws IOException { if (!processor.isAlive()) return "Disposed!"; PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); ListenerReport report = getListenerReport(); report.print(b); return "Done reporting listeners."; } public void fireListeners(ReadGraphImpl graph) { assert (!processor.updating); assert (!processor.cache.collecting); assert (!firingListeners); firingListeners = true; try { // Performing may cause further events to be scheduled. while (!scheduledListeners.isEmpty()) { // Clone current events to make new entries possible during // firing. THashSet entries = scheduledListeners; scheduledListeners = new THashSet(); ArrayList schedule = new ArrayList(); for (ListenerEntry listenerEntry : entries) { if (pruneListener(listenerEntry)) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("Pruned " + listenerEntry.procedure); } } continue; } final CacheEntry entry = listenerEntry.entry; assert (entry != null); Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown()); if (newValue != ListenerEntry.NOT_CHANGED) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); } } schedule.add(listenerEntry); listenerEntry.setLastKnown(entry.getResult()); } } for(ListenerEntry listenerEntry : schedule) { final CacheEntry entry = listenerEntry.entry; if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Firing " + listenerEntry.procedure); } } try { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); } } entry.performFromCache(graph, listenerEntry.procedure); } catch (Throwable t) { LOGGER.error("Unexpected exception ", t); } } } } finally { firingListeners = false; } } void updateParents(int indent, CacheEntry entry, LinkedList todo) { Iterable oldParents = entry.getParents(processor); for (CacheEntry parent : oldParents) { if(!parent.isDiscarded()) todo.push(new UpdateEntry(entry, parent, indent + 2)); } } private boolean pruneListener(ListenerEntry entry) { if (entry.base.isDisposed()) { assert (entry != null); ArrayList list = listeners.get(entry.entry); if(list != null) { boolean success = list.remove(entry); assert (success); if (list.isEmpty()) listeners.remove(entry.entry); } return true; } else { return false; } } private List getListenerEntries(CacheEntry entry) { hasListenerAfterDisposing(entry); if(listeners.get(entry) != null) return listeners.get(entry); else return Collections.emptyList(); } private static class RegisterParentRunnable implements Runnable { private final CacheEntry parent; private final CacheEntry child; public RegisterParentRunnable(CacheEntry parent, CacheEntry child) { this.parent = parent; this.child = child; } @Override public void run() { child.addParent(parent); if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { System.out.println(child + " -> " + parent); } } } } private static class RegisterListenerRunnable implements Runnable { private final QueryListening queryListening; private final ListenerBase base; private final Object procedure; private final CacheEntry parent; private final CacheEntry entry; public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) { this.queryListening = queryListening; this.base = base; this.procedure = procedure; this.parent = parent; this.entry = entry; } @Override public void run() { assert (entry != null); assert (procedure != null); ArrayList list = queryListening.listeners.get(entry); if (list == null) { list = new ArrayList<>(1); queryListening.listeners.put(entry, list); } ListenerEntry result = new ListenerEntry(entry, base, procedure); // Equals is here based on base int currentIndex = list.indexOf(result); // There was already a listener if(currentIndex > -1) { ListenerEntry current = list.get(currentIndex); if(!current.base.isDisposed()) return; list.set(currentIndex, result); } else { list.add(result); } if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); } } queryListening.addedEntries.put(base, result); } } private static class RegisterFirstKnownRunnable implements Runnable { private final Map addedEntries; private final ListenerBase base; private final Object result; public RegisterFirstKnownRunnable(Map addedEntries, ListenerBase base, Object result) { this.addedEntries = addedEntries; this.base = base; this.result = result; } @Override public void run() { ListenerEntry entry = addedEntries.get(base); if(entry != null) entry.setLastKnown(result); } } }