package org.simantics.db.impl.query; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Semaphore; import org.simantics.databoard.Bindings; import org.simantics.db.DevelopmentKeys; import org.simantics.db.debug.ListenerReport; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.procedure.ListenerBase; import org.simantics.utils.Development; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.datastructures.collections.CollectionUtils; import org.slf4j.LoggerFactory; import gnu.trove.map.hash.THashMap; import gnu.trove.set.hash.THashSet; public class QueryListening { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); final private QueryProcessor processor; private THashSet scheduledListeners = new THashSet(); private boolean firingListeners = false; final THashMap> listeners = new THashMap>(10, 0.75f); private BlockingQueue tasks = new ArrayBlockingQueue(2048); private Map addedEntries = new HashMap<>(); QueryListening(QueryProcessor processor) { this.processor = processor; new DependencyManagementThread(processor, tasks).start(); } public void sync() { Semaphore s = new Semaphore(0); try { tasks.put(() -> { s.release(); }); s.acquire(); } catch (Throwable t) { LOGGER.error("Error while waiting for query dependency management", t); } } static class DependencyManagementThread extends Thread { final private QueryProcessor processor; final BlockingQueue tasks; DependencyManagementThread(QueryProcessor processor, BlockingQueue tasks) { setName("Query Dependency Manager"); this.processor = processor; this.tasks = tasks; } @Override public void run() { while(processor.isAlive()) { try { Runnable r = tasks.take(); r.run(); } catch (Throwable t) { // Spurious problems? LOGGER.error("Error while waiting for query dependency management tasks", t); } } } } public boolean hasScheduledUpdates() { return !scheduledListeners.isEmpty(); } void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { try { tasks.put(() -> { if (parent != null && !inferred) { try { if(!child.isImmutable(graph)) child.addParent(parent); } catch (DatabaseException e) { LOGGER.error("Error while registering query dependencies", e); } if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { System.out.println(child + " -> " + parent); } } } if (listener != null) registerListener(child, listener, procedure); }); } catch (InterruptedException e) { LOGGER.error("Error while registering dependencies", e); } } void registerFirstKnown(ListenerBase base, Object result) { tasks.offer(() -> { ListenerEntry entry = addedEntries.get(base); if(entry != null) entry.setLastKnown(result); }); } public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { assert (entry != null); if (base.isDisposed()) return null; return addListener(entry, base, procedure); } /* * Registers a listener and returns an entry iff the entry was added */ private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { assert (entry != null); assert (procedure != null); ArrayList list = listeners.get(entry); if (list == null) { list = new ArrayList<>(1); listeners.put(entry, list); } ListenerEntry result = new ListenerEntry(entry, base, procedure); // Equals is here based on base int currentIndex = list.indexOf(result); // There was already a listener if(currentIndex > -1) { ListenerEntry current = list.get(currentIndex); if(!current.base.isDisposed()) return null; list.set(currentIndex, result); } else { list.add(result); } if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); } } addedEntries.put(base, result); return result; } void scheduleListener(ListenerEntry entry) { assert (entry != null); if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Scheduled " + entry.procedure); } } scheduledListeners.add(entry); } private void removeListener(ListenerEntry entry) { assert (entry != null); ArrayList list = listeners.get(entry.entry); if(list == null) return; boolean success = list.remove(entry); assert (success); if (list.isEmpty()) listeners.remove(entry.entry); } boolean hasListener(CacheEntry entry) { if(listeners.get(entry) != null) return true; return false; } boolean hasListenerAfterDisposing(CacheEntry entry) { if(listeners.get(entry) != null) { ArrayList entries = listeners.get(entry); ArrayList list = null; for (ListenerEntry e : entries) { if (e.base.isDisposed()) { if(list == null) list = new ArrayList(); list.add(e); } } if(list != null) { for (ListenerEntry e : list) { entries.remove(e); } } if (entries.isEmpty()) { listeners.remove(entry); return false; } return true; } return false; } List getListenerEntries(CacheEntry entry) { hasListenerAfterDisposing(entry); if(listeners.get(entry) != null) return listeners.get(entry); else return Collections.emptyList(); } void processListenerReport(CacheEntry entry, Map> workarea) { if(!workarea.containsKey(entry)) { HashSet ls = new HashSet(); for(ListenerEntry e : getListenerEntries(entry)) ls.add(e.base); workarea.put(entry, ls); for(CacheEntry parent : entry.getParents(processor)) { processListenerReport(parent, workarea); ls.addAll(workarea.get(parent)); } } } public synchronized ListenerReport getListenerReport() throws IOException { class ListenerReportImpl implements ListenerReport { Map> workarea = new HashMap>(); @Override public void print(PrintStream b) { Map hist = new HashMap(); for(Map.Entry> e : workarea.entrySet()) { for(ListenerBase l : e.getValue()) { Integer i = hist.get(l); hist.put(l, i != null ? i-1 : -1); } } for(Pair p : CollectionUtils.valueSortedEntries(hist)) { b.print("" + -p.second + " " + p.first + "\n"); } b.flush(); } } ListenerReportImpl result = new ListenerReportImpl(); Collection all = processor.allCaches(new CacheCollectionResult()).toCollection(); for(CacheEntryBase entry : all) { hasListenerAfterDisposing(entry); } for(CacheEntryBase entry : all) { processListenerReport(entry, result.workarea); } return result; } public synchronized String reportListeners(File file) throws IOException { if (!processor.isAlive()) return "Disposed!"; PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); ListenerReport report = getListenerReport(); report.print(b); return "Done reporting listeners."; } public void fireListeners(WriteGraphImpl graph) { assert (!processor.updating); assert (!processor.cache.collecting); assert (!firingListeners); firingListeners = true; try { // Performing may cause further events to be scheduled. while (!scheduledListeners.isEmpty()) { // Clone current events to make new entries possible during // firing. THashSet entries = scheduledListeners; scheduledListeners = new THashSet(); ArrayList schedule = new ArrayList(); for (ListenerEntry listenerEntry : entries) { if (pruneListener(listenerEntry)) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("Pruned " + listenerEntry.procedure); } } continue; } final CacheEntry entry = listenerEntry.entry; assert (entry != null); Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown()); if (newValue != ListenerEntry.NOT_CHANGED) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); } } schedule.add(listenerEntry); listenerEntry.setLastKnown(entry.getResult()); } } for(ListenerEntry listenerEntry : schedule) { final CacheEntry entry = listenerEntry.entry; if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Firing " + listenerEntry.procedure); } } try { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); } } entry.performFromCache(graph, listenerEntry.procedure); } catch (Throwable t) { t.printStackTrace(); } } } } finally { firingListeners = false; } } void updateParents(int indent, CacheEntry entry, LinkedList todo) { Iterable oldParents = entry.getParents(processor); for (CacheEntry parent : oldParents) { if(!parent.isDiscarded()) todo.push(new UpdateEntry(entry, parent, indent + 2)); } } private boolean pruneListener(ListenerEntry entry) { if (entry.base.isDisposed()) { removeListener(entry); return true; } else { return false; } } }