--- /dev/null
+package org.simantics.db.impl.query;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.debug.ListenerReport;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.graph.WriteGraphImpl;
+import org.simantics.db.procedure.ListenerBase;
+import org.simantics.utils.Development;
+import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.datastructures.collections.CollectionUtils;
+import org.slf4j.LoggerFactory;
+
+import gnu.trove.map.hash.THashMap;
+import gnu.trove.set.hash.THashSet;
+
+public class QueryListening {
+
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
+
+ final private QueryProcessor processor;
+ private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
+ private boolean firingListeners = false;
+ final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
+ private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
+ private Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
+
+ QueryListening(QueryProcessor processor) {
+ this.processor = processor;
+ new DependencyManagementThread(processor, tasks).start();
+ }
+
+ public void sync() {
+ Semaphore s = new Semaphore(0);
+ tasks.add(() -> {
+ s.release();
+ });
+ try {
+ s.acquire();
+ } catch (Throwable t) {
+ LOGGER.error("Error while waiting for query dependency management", t);
+ }
+ }
+
+ static class DependencyManagementThread extends Thread {
+
+ final private QueryProcessor processor;
+ final BlockingQueue<Runnable> tasks;
+
+ DependencyManagementThread(QueryProcessor processor, BlockingQueue<Runnable> tasks) {
+ setName("Query Dependency Manager");
+ this.processor = processor;
+ this.tasks = tasks;
+ }
+
+ @Override
+ public void run() {
+ while(processor.isAlive()) {
+ try {
+ Runnable r = tasks.take();
+ r.run();
+ } catch (Throwable t) {
+ // Spurious problems?
+ LOGGER.error("Error while waiting for query dependency management tasks", t);
+ }
+ }
+ }
+
+ }
+
+ public boolean hasScheduledUpdates() {
+ return !scheduledListeners.isEmpty();
+ }
+
+ void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
+
+ tasks.offer(() -> {
+
+ if (parent != null && !inferred) {
+ try {
+ if(!child.isImmutable(graph))
+ child.addParent(parent);
+ } catch (DatabaseException e) {
+ LOGGER.error("Error while registering query dependencies", e);
+ }
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+ System.out.println(child + " -> " + parent);
+ }
+ }
+ }
+
+ if (listener != null)
+ registerListener(child, listener, procedure);
+
+ });
+
+ }
+
+ void registerFirstKnown(ListenerBase base, Object result) {
+
+ tasks.offer(() -> {
+
+ ListenerEntry entry = addedEntries.get(base);
+ if(entry != null) entry.setLastKnown(result);
+
+ });
+
+ }
+
+ public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
+
+ assert (entry != null);
+
+ if (base.isDisposed())
+ return null;
+
+ return addListener(entry, base, procedure);
+
+ }
+
+ /*
+ * Registers a listener and returns an entry iff the entry was added
+ */
+ private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
+
+ assert (entry != null);
+ assert (procedure != null);
+
+ ArrayList<ListenerEntry> list = listeners.get(entry);
+ if (list == null) {
+ list = new ArrayList<>(1);
+ listeners.put(entry, list);
+ }
+
+ ListenerEntry result = new ListenerEntry(entry, base, procedure);
+ // Equals is here based on base
+ int currentIndex = list.indexOf(result);
+ // There was already a listener
+ if(currentIndex > -1) {
+ ListenerEntry current = list.get(currentIndex);
+ if(!current.base.isDisposed()) return null;
+ list.set(currentIndex, result);
+ } else {
+ list.add(result);
+ }
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
+ }
+ }
+
+ addedEntries.put(base, result);
+
+ return result;
+
+ }
+
+ void scheduleListener(ListenerEntry entry) {
+ assert (entry != null);
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ System.err.println("Scheduled " + entry.procedure);
+ }
+ }
+ scheduledListeners.add(entry);
+ }
+
+ private void removeListener(ListenerEntry entry) {
+ assert (entry != null);
+ ArrayList<ListenerEntry> list = listeners.get(entry.entry);
+ if(list == null) return;
+ boolean success = list.remove(entry);
+ assert (success);
+ if (list.isEmpty())
+ listeners.remove(entry.entry);
+ }
+
+ boolean hasListener(CacheEntry entry) {
+ if(listeners.get(entry) != null) return true;
+ return false;
+ }
+
+ boolean hasListenerAfterDisposing(CacheEntry entry) {
+ if(listeners.get(entry) != null) {
+ ArrayList<ListenerEntry> entries = listeners.get(entry);
+ ArrayList<ListenerEntry> list = null;
+ for (ListenerEntry e : entries) {
+ if (e.base.isDisposed()) {
+ if(list == null) list = new ArrayList<ListenerEntry>();
+ list.add(e);
+ }
+ }
+ if(list != null) {
+ for (ListenerEntry e : list) {
+ entries.remove(e);
+ }
+ }
+ if (entries.isEmpty()) {
+ listeners.remove(entry);
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ List<ListenerEntry> getListenerEntries(CacheEntry entry) {
+ hasListenerAfterDisposing(entry);
+ if(listeners.get(entry) != null)
+ return listeners.get(entry);
+ else
+ return Collections.emptyList();
+ }
+
+ void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
+
+ if(!workarea.containsKey(entry)) {
+
+ HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
+ for(ListenerEntry e : getListenerEntries(entry))
+ ls.add(e.base);
+
+ workarea.put(entry, ls);
+
+ for(CacheEntry parent : entry.getParents(processor)) {
+ processListenerReport(parent, workarea);
+ ls.addAll(workarea.get(parent));
+ }
+
+ }
+
+ }
+
+ public synchronized ListenerReport getListenerReport() throws IOException {
+
+ class ListenerReportImpl implements ListenerReport {
+
+ Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
+
+ @Override
+ public void print(PrintStream b) {
+ Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
+ for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
+ for(ListenerBase l : e.getValue()) {
+ Integer i = hist.get(l);
+ hist.put(l, i != null ? i-1 : -1);
+ }
+ }
+
+ for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
+ b.print("" + -p.second + " " + p.first + "\n");
+ }
+
+ b.flush();
+ }
+
+ }
+
+ ListenerReportImpl result = new ListenerReportImpl();
+
+ Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
+ for(CacheEntryBase entry : all) {
+ hasListenerAfterDisposing(entry);
+ }
+ for(CacheEntryBase entry : all) {
+ processListenerReport(entry, result.workarea);
+ }
+
+ return result;
+
+ }
+
+ public synchronized String reportListeners(File file) throws IOException {
+
+ if (!processor.isAlive())
+ return "Disposed!";
+
+ PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
+ ListenerReport report = getListenerReport();
+ report.print(b);
+
+ return "Done reporting listeners.";
+
+ }
+
+ public void fireListeners(WriteGraphImpl graph) {
+
+ assert (!processor.updating);
+ assert (!processor.cache.collecting);
+ assert (!firingListeners);
+
+ firingListeners = true;
+
+ try {
+
+ // Performing may cause further events to be scheduled.
+ while (!scheduledListeners.isEmpty()) {
+
+ // Clone current events to make new entries possible during
+ // firing.
+ THashSet<ListenerEntry> entries = scheduledListeners;
+ scheduledListeners = new THashSet<ListenerEntry>();
+
+ ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
+
+ for (ListenerEntry listenerEntry : entries) {
+
+ if (pruneListener(listenerEntry)) {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("Pruned " + listenerEntry.procedure);
+ }
+ }
+ continue;
+ }
+
+ final CacheEntry entry = listenerEntry.entry;
+ assert (entry != null);
+
+ Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
+
+ if (newValue != ListenerEntry.NOT_CHANGED) {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
+ }
+ }
+ schedule.add(listenerEntry);
+ listenerEntry.setLastKnown(entry.getResult());
+ }
+
+ }
+
+ for(ListenerEntry listenerEntry : schedule) {
+ final CacheEntry entry = listenerEntry.entry;
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ System.err.println("Firing " + listenerEntry.procedure);
+ }
+ }
+ try {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
+ }
+ }
+ entry.performFromCache(graph, listenerEntry.procedure);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+ }
+
+ } finally {
+ firingListeners = false;
+ }
+
+ }
+
+ void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
+
+ Iterable<CacheEntry> oldParents = entry.getParents(processor);
+ for (CacheEntry parent : oldParents) {
+ if(!parent.isDiscarded())
+ todo.push(new UpdateEntry(entry, parent, indent + 2));
+ }
+
+ }
+
+ private boolean pruneListener(ListenerEntry entry) {
+ if (entry.base.isDisposed()) {
+ removeListener(entry);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+}