package org.simantics.db.impl.query; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.simantics.databoard.Bindings; import org.simantics.db.DevelopmentKeys; import org.simantics.db.debug.ListenerReport; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.procedure.ListenerBase; import org.simantics.utils.Development; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.datastructures.collections.CollectionUtils; import org.slf4j.LoggerFactory; import gnu.trove.map.hash.THashMap; import gnu.trove.set.hash.THashSet; public class QueryListening { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); final private QueryProcessor processor; private THashSet scheduledListeners = new THashSet(); private boolean firingListeners = false; final THashMap> listeners = new THashMap>(10, 0.75f); private EventBusImpl eventBus; private Consumer consumer; private Map addedEntries = new HashMap<>(); int execCount=0; int queueCount = 0; static class TL extends ThreadLocal> { Map> allQueues = new HashMap<>(); ArrayList> dispatchedQueues = new ArrayList<>(); @Override protected synchronized ArrayList initialValue() { ArrayList result = new ArrayList<>(); allQueues.put(Thread.currentThread(), result); return result; } synchronized void sendToExecution() { ArrayList rs = allQueues.remove(Thread.currentThread()); dispatchedQueues.add(rs); notify(); } synchronized ArrayList> getDispatchedQueues() { ArrayList> result = dispatchedQueues; dispatchedQueues = new ArrayList<>(); return result; } } static class EventBusImpl { private static final int BUFFER_SIZE = 100; TL queues = new TL(); Semaphore flush = null; public boolean isTerminated() { return false; } public Consumer newConsumer() { return new Consumer() { @Override public void accept(Runnable arg0) { ArrayList l = queues.get(); l.add(arg0); if(l.size() == BUFFER_SIZE) { queues.remove(); queues.sendToExecution(); } } }; } public void shutdown() { } public void start(ThreadFactory arg1) { arg1.newThread(new Runnable() { @Override public void run() { synchronized(queues) { while(!isTerminated()) { try { ArrayList> qs = queues.getDispatchedQueues(); for(ArrayList queue : qs) { for(Runnable r : queue) r.run(); } if(flush != null) { for(ArrayList queue : queues.allQueues.values()) { for(Runnable r : queue) { r.run(); } queue.clear(); } Semaphore s = flush; flush = null; s.release(); } queues.wait(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }).start(); } void flush() { synchronized(queues) { flush = new Semaphore(0); queues.notify(); } try { flush.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } } } QueryListening(QueryProcessor processor) { this.processor = processor; eventBus = new EventBusImpl(); consumer = eventBus.newConsumer(); eventBus.start(new ThreadFactory() { @Override public Thread newThread(Runnable r) { System.err.println("new Thread " + r); return new Thread(r, "QueryDependencyManager"); } }); } public void sync() { try { eventBus.flush(); } catch (Throwable t) { LOGGER.error("Error while waiting for query dependency management", t); } } public boolean hasScheduledUpdates() { return !scheduledListeners.isEmpty(); } public synchronized void dispatch(Runnable r) { queueCount++; consumer.accept(r); } void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { dispatch(() -> { if (parent != null && !inferred) { try { if(!child.isImmutable(graph)) child.addParent(parent); } catch (DatabaseException e) { LOGGER.error("Error while registering query dependencies", e); } if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { System.out.println(child + " -> " + parent); } } } if (listener != null) registerListener(child, listener, procedure); }); } void registerFirstKnown(ListenerBase base, Object result) { if(base == null) return; dispatch(() -> { ListenerEntry entry = addedEntries.get(base); if(entry != null) entry.setLastKnown(result); }); } public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { assert (entry != null); if (base.isDisposed()) return null; return addListener(entry, base, procedure); } /* * Registers a listener and returns an entry iff the entry was added */ private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { assert (entry != null); assert (procedure != null); ArrayList list = listeners.get(entry); if (list == null) { list = new ArrayList<>(1); listeners.put(entry, list); } ListenerEntry result = new ListenerEntry(entry, base, procedure); // Equals is here based on base int currentIndex = list.indexOf(result); // There was already a listener if(currentIndex > -1) { ListenerEntry current = list.get(currentIndex); if(!current.base.isDisposed()) return null; list.set(currentIndex, result); } else { list.add(result); } if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); } } addedEntries.put(base, result); return result; } void scheduleListener(ListenerEntry entry) { assert (entry != null); if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Scheduled " + entry.procedure); } } scheduledListeners.add(entry); } private void removeListener(ListenerEntry entry) { assert (entry != null); ArrayList list = listeners.get(entry.entry); if(list == null) return; boolean success = list.remove(entry); assert (success); if (list.isEmpty()) listeners.remove(entry.entry); } boolean hasListener(CacheEntry entry) { if(listeners.get(entry) != null) return true; return false; } boolean hasListenerAfterDisposing(CacheEntry entry) { if(listeners.get(entry) != null) { ArrayList entries = listeners.get(entry); ArrayList list = null; for (ListenerEntry e : entries) { if (e.base.isDisposed()) { if(list == null) list = new ArrayList(); list.add(e); } } if(list != null) { for (ListenerEntry e : list) { entries.remove(e); } } if (entries.isEmpty()) { listeners.remove(entry); return false; } return true; } return false; } List getListenerEntries(CacheEntry entry) { hasListenerAfterDisposing(entry); if(listeners.get(entry) != null) return listeners.get(entry); else return Collections.emptyList(); } void processListenerReport(CacheEntry entry, Map> workarea) { if(!workarea.containsKey(entry)) { HashSet ls = new HashSet(); for(ListenerEntry e : getListenerEntries(entry)) ls.add(e.base); workarea.put(entry, ls); for(CacheEntry parent : entry.getParents(processor)) { processListenerReport(parent, workarea); ls.addAll(workarea.get(parent)); } } } public synchronized ListenerReport getListenerReport() throws IOException { class ListenerReportImpl implements ListenerReport { Map> workarea = new HashMap>(); @Override public void print(PrintStream b) { Map hist = new HashMap(); for(Map.Entry> e : workarea.entrySet()) { for(ListenerBase l : e.getValue()) { Integer i = hist.get(l); hist.put(l, i != null ? i-1 : -1); } } for(Pair p : CollectionUtils.valueSortedEntries(hist)) { b.print("" + -p.second + " " + p.first + "\n"); } b.flush(); } } ListenerReportImpl result = new ListenerReportImpl(); Collection all = processor.allCaches(new CacheCollectionResult()).toCollection(); for(CacheEntryBase entry : all) { hasListenerAfterDisposing(entry); } for(CacheEntryBase entry : all) { processListenerReport(entry, result.workarea); } return result; } public synchronized String reportListeners(File file) throws IOException { if (!processor.isAlive()) return "Disposed!"; PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); ListenerReport report = getListenerReport(); report.print(b); return "Done reporting listeners."; } public void fireListeners(WriteGraphImpl graph) { assert (!processor.updating); assert (!processor.cache.collecting); assert (!firingListeners); firingListeners = true; try { // Performing may cause further events to be scheduled. while (!scheduledListeners.isEmpty()) { // Clone current events to make new entries possible during // firing. THashSet entries = scheduledListeners; scheduledListeners = new THashSet(); ArrayList schedule = new ArrayList(); for (ListenerEntry listenerEntry : entries) { if (pruneListener(listenerEntry)) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("Pruned " + listenerEntry.procedure); } } continue; } final CacheEntry entry = listenerEntry.entry; assert (entry != null); Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown()); if (newValue != ListenerEntry.NOT_CHANGED) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { new Exception().printStackTrace(); System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); } } schedule.add(listenerEntry); listenerEntry.setLastKnown(entry.getResult()); } } for(ListenerEntry listenerEntry : schedule) { final CacheEntry entry = listenerEntry.entry; if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Firing " + listenerEntry.procedure); } } try { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); } } entry.performFromCache(graph, listenerEntry.procedure); } catch (Throwable t) { t.printStackTrace(); } } } } finally { firingListeners = false; } } void updateParents(int indent, CacheEntry entry, LinkedList todo) { Iterable oldParents = entry.getParents(processor); for (CacheEntry parent : oldParents) { if(!parent.isDiscarded()) todo.push(new UpdateEntry(entry, parent, indent + 2)); } } private boolean pruneListener(ListenerEntry entry) { if (entry.base.isDisposed()) { removeListener(entry); return true; } else { return false; } } }