package org.simantics.db.impl.query; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.function.Consumer; import org.simantics.databoard.Bindings; import org.simantics.db.DevelopmentKeys; import org.simantics.db.debug.ListenerReport; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.procedure.ListenerBase; import org.simantics.utils.Development; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.datastructures.collections.CollectionUtils; import org.slf4j.LoggerFactory; import gnu.trove.map.hash.THashMap; import gnu.trove.set.hash.THashSet; public class QueryListening { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); private final QueryProcessor processor; private final Scheduler scheduler; private final Consumer consumer; private final Map addedEntries = new HashMap<>(); private THashSet scheduledListeners = new THashSet(); private boolean firingListeners = false; final THashMap> listeners = new THashMap>(10, 0.75f); private static class ThreadQueue extends ThreadLocal> { private final Map> allQueues = new HashMap<>(); private ArrayList> dispatchedQueues = new ArrayList<>(); @Override protected synchronized ArrayList initialValue() { ArrayList result = new ArrayList<>(); allQueues.put(Thread.currentThread(), result); return result; } synchronized void sendToExecution() { ArrayList rs = allQueues.remove(Thread.currentThread()); dispatchedQueues.add(rs); notify(); } synchronized ArrayList> getDispatchedQueues() { ArrayList> result = dispatchedQueues; dispatchedQueues = new ArrayList<>(); return result; } } private static class Scheduler { private static final int BUFFER_SIZE = 100; private final QueryProcessor processor; private final ThreadQueue queues = new ThreadQueue(); /* * Access to this field is synchronized using the monitor of 'queues' * The method flush() shall be called by the single writing thread and the field has been nulled before the * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time. */ private Semaphore flush = null; Scheduler(QueryProcessor processor) { this.processor = processor; } private boolean isTerminated() { return processor.isDisposed(); } Consumer newConsumer() { return (task) -> { ArrayList l = queues.get(); l.add(task); if(l.size() == BUFFER_SIZE) { queues.remove(); queues.sendToExecution(); } }; } void start() { Thread thread = new Thread() { public void run() { synchronized(queues) { while(!isTerminated()) { try { ArrayList> qs = queues.getDispatchedQueues(); for(ArrayList queue : qs) { for(Runnable r : queue) r.run(); } if(flush != null) { for(ArrayList queue : queues.allQueues.values()) { for(Runnable r : queue) { r.run(); } queue.clear(); } Semaphore s = flush; flush = null; s.release(); } queues.wait(1000); } catch (InterruptedException e) { LOGGER.error("Unexpected interrupt", e); } } } }; }; thread.setName("QueryListening"); thread.start(); } private Semaphore createFlush() { synchronized(queues) { flush = new Semaphore(0); queues.notify(); return flush; } } void flush() { try { createFlush().acquire(); } catch (InterruptedException e) { LOGGER.error("Unexpected interrupt", e); } } } private static class RegisterParentRunnable implements Runnable { private final CacheEntry parent; private final CacheEntry child; public RegisterParentRunnable(CacheEntry parent, CacheEntry child) { this.parent = parent; this.child = child; } @Override public void run() { child.addParent(parent); if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { System.out.println(child + " -> " + parent); } } } } private static class RegisterListenerRunnable implements Runnable { private final QueryListening queryListening; private final ListenerBase base; private final Object procedure; private final CacheEntry parent; private final CacheEntry entry; public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) { this.queryListening = queryListening; this.base = base; this.procedure = procedure; this.parent = parent; this.entry = entry; } @Override public void run() { assert (entry != null); assert (procedure != null); ArrayList list = queryListening.listeners.get(entry); if (list == null) { list = new ArrayList<>(1); queryListening.listeners.put(entry, list); } ListenerEntry result = new ListenerEntry(entry, base, procedure); // Equals is here based on base int currentIndex = list.indexOf(result); // There was already a listener if(currentIndex > -1) { ListenerEntry current = list.get(currentIndex); if(!current.base.isDisposed()) return; list.set(currentIndex, result); } else { list.add(result); } if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); } } queryListening.addedEntries.put(base, result); } } QueryListening(QueryProcessor processor) { this.processor = processor; scheduler = new Scheduler(processor); consumer = scheduler.newConsumer(); scheduler.start(); } public boolean hasScheduledUpdates() { return !scheduledListeners.isEmpty(); } void sync() { try { scheduler.flush(); } catch (Throwable t) { LOGGER.error("Error while waiting for query dependency management", t); } } void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { if(inferred) { assert(listener == null); return; } if(parent != null) { try { if(!child.isImmutable(graph)) consumer.accept(new RegisterParentRunnable(parent, child)); } catch (DatabaseException e) { LOGGER.error("Error while registering query dependencies", e); } } if(listener != null) if(!listener.isDisposed()) consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child)); } void registerFirstKnown(ListenerBase base, Object result) { if(base == null) return; consumer.accept(() -> { ListenerEntry entry = addedEntries.get(base); if(entry != null) entry.setLastKnown(result); }); } void scheduleListener(ListenerEntry entry) { assert (entry != null); if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Scheduled " + entry.procedure); } } scheduledListeners.add(entry); } boolean hasListener(CacheEntry entry) { if(listeners.get(entry) != null) return true; return false; } boolean hasListenerAfterDisposing(CacheEntry entry) { if(listeners.get(entry) != null) { ArrayList entries = listeners.get(entry); ArrayList list = null; for (ListenerEntry e : entries) { if (e.base.isDisposed()) { if(list == null) list = new ArrayList(); list.add(e); } } if(list != null) { for (ListenerEntry e : list) { entries.remove(e); } } if (entries.isEmpty()) { listeners.remove(entry); return false; } return true; } return false; } void processListenerReport(CacheEntry entry, Map> workarea) { if(!workarea.containsKey(entry)) { HashSet ls = new HashSet(); for(ListenerEntry e : getListenerEntries(entry)) ls.add(e.base); workarea.put(entry, ls); for(CacheEntry parent : entry.getParents(processor)) { processListenerReport(parent, workarea); ls.addAll(workarea.get(parent)); } } } public synchronized ListenerReport getListenerReport() throws IOException { class ListenerReportImpl implements ListenerReport { Map> workarea = new HashMap>(); @Override public void print(PrintStream b) { Map hist = new HashMap(); for(Map.Entry> e : workarea.entrySet()) { for(ListenerBase l : e.getValue()) { Integer i = hist.get(l); hist.put(l, i != null ? i-1 : -1); } } for(Pair p : CollectionUtils.valueSortedEntries(hist)) { b.print("" + -p.second + " " + p.first + "\n"); } b.flush(); } } ListenerReportImpl result = new ListenerReportImpl(); Collection all = processor.allCaches(new CacheCollectionResult()).toCollection(); for(CacheEntryBase entry : all) { hasListenerAfterDisposing(entry); } for(CacheEntryBase entry : all) { processListenerReport(entry, result.workarea); } return result; } public synchronized String reportListeners(File file) throws IOException { if (!processor.isAlive()) return "Disposed!"; PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); ListenerReport report = getListenerReport(); report.print(b); return "Done reporting listeners."; } public void fireListeners(WriteGraphImpl graph) { assert (!processor.updating); assert (!processor.cache.collecting); assert (!firingListeners); firingListeners = true; try { // Performing may cause further events to be scheduled. while (!scheduledListeners.isEmpty()) { // Clone current events to make new entries possible during // firing. THashSet entries = scheduledListeners; scheduledListeners = new THashSet(); ArrayList schedule = new ArrayList(); for (ListenerEntry listenerEntry : entries) { if (pruneListener(listenerEntry)) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("Pruned " + listenerEntry.procedure); } } continue; } final CacheEntry entry = listenerEntry.entry; assert (entry != null); Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown()); if (newValue != ListenerEntry.NOT_CHANGED) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); } } schedule.add(listenerEntry); listenerEntry.setLastKnown(entry.getResult()); } } for(ListenerEntry listenerEntry : schedule) { final CacheEntry entry = listenerEntry.entry; if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Firing " + listenerEntry.procedure); } } try { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); } } entry.performFromCache(graph, listenerEntry.procedure); } catch (Throwable t) { LOGGER.error("Unexpected exception ", t); } } } } finally { firingListeners = false; } } void updateParents(int indent, CacheEntry entry, LinkedList todo) { Iterable oldParents = entry.getParents(processor); for (CacheEntry parent : oldParents) { if(!parent.isDiscarded()) todo.push(new UpdateEntry(entry, parent, indent + 2)); } } private boolean pruneListener(ListenerEntry entry) { if (entry.base.isDisposed()) { assert (entry != null); ArrayList list = listeners.get(entry.entry); if(list != null) { boolean success = list.remove(entry); assert (success); if (list.isEmpty()) listeners.remove(entry.entry); } return true; } else { return false; } } private List getListenerEntries(CacheEntry entry) { hasListenerAfterDisposing(entry); if(listeners.get(entry) != null) return listeners.get(entry); else return Collections.emptyList(); } }