X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryListening.java;fp=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryListening.java;h=e524a080a99a2940bbc5da9a5564202d115bf931;hp=0000000000000000000000000000000000000000;hb=0db1af75cd14f99f16c42c920562708abf37be93;hpb=b000e272429e157638c0384878b07b8dcd758472 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java new file mode 100644 index 000000000..e524a080a --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java @@ -0,0 +1,404 @@ +package org.simantics.db.impl.query; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; + +import org.simantics.databoard.Bindings; +import org.simantics.db.DevelopmentKeys; +import org.simantics.db.debug.ListenerReport; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.impl.graph.ReadGraphImpl; +import org.simantics.db.impl.graph.WriteGraphImpl; +import org.simantics.db.procedure.ListenerBase; +import org.simantics.utils.Development; +import org.simantics.utils.datastructures.Pair; +import org.simantics.utils.datastructures.collections.CollectionUtils; +import org.slf4j.LoggerFactory; + +import gnu.trove.map.hash.THashMap; +import gnu.trove.set.hash.THashSet; + +public class QueryListening { + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); + + final private QueryProcessor processor; + private THashSet scheduledListeners = new THashSet(); + private boolean firingListeners = false; + final THashMap> listeners = new THashMap>(10, 0.75f); + private BlockingQueue tasks = new LinkedBlockingQueue(); + private Map addedEntries = new HashMap<>(); + + QueryListening(QueryProcessor processor) { + this.processor = processor; + new DependencyManagementThread(processor, tasks).start(); + } + + public void sync() { + Semaphore s = new Semaphore(0); + tasks.add(() -> { + s.release(); + }); + try { + s.acquire(); + } catch (Throwable t) { + LOGGER.error("Error while waiting for query dependency management", t); + } + } + + static class DependencyManagementThread extends Thread { + + final private QueryProcessor processor; + final BlockingQueue tasks; + + DependencyManagementThread(QueryProcessor processor, BlockingQueue tasks) { + setName("Query Dependency Manager"); + this.processor = processor; + this.tasks = tasks; + } + + @Override + public void run() { + while(processor.isAlive()) { + try { + Runnable r = tasks.take(); + r.run(); + } catch (Throwable t) { + // Spurious problems? + LOGGER.error("Error while waiting for query dependency management tasks", t); + } + } + } + + } + + public boolean hasScheduledUpdates() { + return !scheduledListeners.isEmpty(); + } + + void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { + + tasks.offer(() -> { + + if (parent != null && !inferred) { + try { + if(!child.isImmutable(graph)) + child.addParent(parent); + } catch (DatabaseException e) { + LOGGER.error("Error while registering query dependencies", e); + } + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { + System.out.println(child + " -> " + parent); + } + } + } + + if (listener != null) + registerListener(child, listener, procedure); + + }); + + } + + void registerFirstKnown(ListenerBase base, Object result) { + + tasks.offer(() -> { + + ListenerEntry entry = addedEntries.get(base); + if(entry != null) entry.setLastKnown(result); + + }); + + } + + public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { + + assert (entry != null); + + if (base.isDisposed()) + return null; + + return addListener(entry, base, procedure); + + } + + /* + * Registers a listener and returns an entry iff the entry was added + */ + private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { + + assert (entry != null); + assert (procedure != null); + + ArrayList list = listeners.get(entry); + if (list == null) { + list = new ArrayList<>(1); + listeners.put(entry, list); + } + + ListenerEntry result = new ListenerEntry(entry, base, procedure); + // Equals is here based on base + int currentIndex = list.indexOf(result); + // There was already a listener + if(currentIndex > -1) { + ListenerEntry current = list.get(currentIndex); + if(!current.base.isDisposed()) return null; + list.set(currentIndex, result); + } else { + list.add(result); + } + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); + } + } + + addedEntries.put(base, result); + + return result; + + } + + void scheduleListener(ListenerEntry entry) { + assert (entry != null); + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Scheduled " + entry.procedure); + } + } + scheduledListeners.add(entry); + } + + private void removeListener(ListenerEntry entry) { + assert (entry != null); + ArrayList list = listeners.get(entry.entry); + if(list == null) return; + boolean success = list.remove(entry); + assert (success); + if (list.isEmpty()) + listeners.remove(entry.entry); + } + + boolean hasListener(CacheEntry entry) { + if(listeners.get(entry) != null) return true; + return false; + } + + boolean hasListenerAfterDisposing(CacheEntry entry) { + if(listeners.get(entry) != null) { + ArrayList entries = listeners.get(entry); + ArrayList list = null; + for (ListenerEntry e : entries) { + if (e.base.isDisposed()) { + if(list == null) list = new ArrayList(); + list.add(e); + } + } + if(list != null) { + for (ListenerEntry e : list) { + entries.remove(e); + } + } + if (entries.isEmpty()) { + listeners.remove(entry); + return false; + } + return true; + } + return false; + } + + List getListenerEntries(CacheEntry entry) { + hasListenerAfterDisposing(entry); + if(listeners.get(entry) != null) + return listeners.get(entry); + else + return Collections.emptyList(); + } + + void processListenerReport(CacheEntry entry, Map> workarea) { + + if(!workarea.containsKey(entry)) { + + HashSet ls = new HashSet(); + for(ListenerEntry e : getListenerEntries(entry)) + ls.add(e.base); + + workarea.put(entry, ls); + + for(CacheEntry parent : entry.getParents(processor)) { + processListenerReport(parent, workarea); + ls.addAll(workarea.get(parent)); + } + + } + + } + + public synchronized ListenerReport getListenerReport() throws IOException { + + class ListenerReportImpl implements ListenerReport { + + Map> workarea = new HashMap>(); + + @Override + public void print(PrintStream b) { + Map hist = new HashMap(); + for(Map.Entry> e : workarea.entrySet()) { + for(ListenerBase l : e.getValue()) { + Integer i = hist.get(l); + hist.put(l, i != null ? i-1 : -1); + } + } + + for(Pair p : CollectionUtils.valueSortedEntries(hist)) { + b.print("" + -p.second + " " + p.first + "\n"); + } + + b.flush(); + } + + } + + ListenerReportImpl result = new ListenerReportImpl(); + + Collection all = processor.allCaches(new CacheCollectionResult()).toCollection(); + for(CacheEntryBase entry : all) { + hasListenerAfterDisposing(entry); + } + for(CacheEntryBase entry : all) { + processListenerReport(entry, result.workarea); + } + + return result; + + } + + public synchronized String reportListeners(File file) throws IOException { + + if (!processor.isAlive()) + return "Disposed!"; + + PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); + ListenerReport report = getListenerReport(); + report.print(b); + + return "Done reporting listeners."; + + } + + public void fireListeners(WriteGraphImpl graph) { + + assert (!processor.updating); + assert (!processor.cache.collecting); + assert (!firingListeners); + + firingListeners = true; + + try { + + // Performing may cause further events to be scheduled. + while (!scheduledListeners.isEmpty()) { + + // Clone current events to make new entries possible during + // firing. + THashSet entries = scheduledListeners; + scheduledListeners = new THashSet(); + + ArrayList schedule = new ArrayList(); + + for (ListenerEntry listenerEntry : entries) { + + if (pruneListener(listenerEntry)) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("Pruned " + listenerEntry.procedure); + } + } + continue; + } + + final CacheEntry entry = listenerEntry.entry; + assert (entry != null); + + Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown()); + + if (newValue != ListenerEntry.NOT_CHANGED) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); + } + } + schedule.add(listenerEntry); + listenerEntry.setLastKnown(entry.getResult()); + } + + } + + for(ListenerEntry listenerEntry : schedule) { + final CacheEntry entry = listenerEntry.entry; + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Firing " + listenerEntry.procedure); + } + } + try { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); + } + } + entry.performFromCache(graph, listenerEntry.procedure); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + } + + } finally { + firingListeners = false; + } + + } + + void updateParents(int indent, CacheEntry entry, LinkedList todo) { + + Iterable oldParents = entry.getParents(processor); + for (CacheEntry parent : oldParents) { + if(!parent.isDiscarded()) + todo.push(new UpdateEntry(entry, parent, indent + 2)); + } + + } + + private boolean pruneListener(ListenerEntry entry) { + if (entry.base.isDisposed()) { + removeListener(entry); + return true; + } else { + return false; + } + } + +}