From: Antti Villberg Date: Tue, 21 Jan 2020 23:01:04 +0000 (+0200) Subject: Optimise performance of job scheduling in QueryListening X-Git-Tag: v1.43.0~116^2 X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=commitdiff_plain;h=7d77f64b4f0fba1fd72a89eeb0918db92f4e51b2 Optimise performance of job scheduling in QueryListening Reino was correct - this synchronization was indeed broken. gitlab #450 Change-Id: I65280947385963d52073df78f604d98741beb12f --- diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java index 9daab950e..dd410b4ce 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java @@ -14,9 +14,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Semaphore; +import java.util.function.Consumer; import org.simantics.databoard.Bindings; import org.simantics.db.DevelopmentKeys; @@ -37,372 +36,498 @@ public class QueryListening { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); - final private QueryProcessor processor; - private THashSet scheduledListeners = new THashSet(); - private boolean firingListeners = false; - final THashMap> listeners = new THashMap>(10, 0.75f); - private BlockingQueue tasks = new ArrayBlockingQueue(2048); - private Map addedEntries = new HashMap<>(); - - QueryListening(QueryProcessor processor) { - this.processor = processor; - new DependencyManagementThread(processor, tasks).start(); - } - - public void sync() { - Semaphore s = new Semaphore(0); - try { - tasks.put(() -> { - s.release(); - }); - s.acquire(); - } catch (Throwable t) { - LOGGER.error("Error while waiting for query dependency management", t); - } - } - - static class DependencyManagementThread extends Thread { - - final private QueryProcessor processor; - final BlockingQueue tasks; - - DependencyManagementThread(QueryProcessor processor, BlockingQueue tasks) { - setName("Query Dependency Manager"); - this.processor = processor; - this.tasks = tasks; - } - - @Override - public void run() { - while(processor.isAlive()) { - try { - Runnable r = tasks.take(); - r.run(); - } catch (Throwable t) { - // Spurious problems? - LOGGER.error("Error while waiting for query dependency management tasks", t); - } - } - } - - } - - public boolean hasScheduledUpdates() { - return !scheduledListeners.isEmpty(); - } - - void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { - - try { - tasks.put(() -> { - - if (parent != null && !inferred) { - try { - if(!child.isImmutable(graph)) - child.addParent(parent); - } catch (DatabaseException e) { - LOGGER.error("Error while registering query dependencies", e); - } - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { - System.out.println(child + " -> " + parent); - } - } - } - - if (listener != null) - registerListener(child, listener, procedure); - - }); - } catch (InterruptedException e) { - LOGGER.error("Error while registering dependencies", e); - } - - } - - void registerFirstKnown(ListenerBase base, Object result) { - - tasks.offer(() -> { - - ListenerEntry entry = addedEntries.get(base); - if(entry != null) entry.setLastKnown(result); - - }); - - } - - public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { - - assert (entry != null); - - if (base.isDisposed()) - return null; - - return addListener(entry, base, procedure); - - } - - /* - * Registers a listener and returns an entry iff the entry was added - */ - private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { - - assert (entry != null); - assert (procedure != null); - - ArrayList list = listeners.get(entry); - if (list == null) { - list = new ArrayList<>(1); - listeners.put(entry, list); - } - - ListenerEntry result = new ListenerEntry(entry, base, procedure); - // Equals is here based on base - int currentIndex = list.indexOf(result); - // There was already a listener - if(currentIndex > -1) { - ListenerEntry current = list.get(currentIndex); - if(!current.base.isDisposed()) return null; - list.set(currentIndex, result); - } else { - list.add(result); - } - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); - } - } - - addedEntries.put(base, result); - - return result; - - } - - void scheduleListener(ListenerEntry entry) { - assert (entry != null); - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Scheduled " + entry.procedure); - } - } - scheduledListeners.add(entry); - } - - private void removeListener(ListenerEntry entry) { - assert (entry != null); - ArrayList list = listeners.get(entry.entry); - if(list == null) return; - boolean success = list.remove(entry); - assert (success); - if (list.isEmpty()) - listeners.remove(entry.entry); - } - - boolean hasListener(CacheEntry entry) { - if(listeners.get(entry) != null) return true; - return false; - } - - boolean hasListenerAfterDisposing(CacheEntry entry) { - if(listeners.get(entry) != null) { - ArrayList entries = listeners.get(entry); - ArrayList list = null; - for (ListenerEntry e : entries) { - if (e.base.isDisposed()) { - if(list == null) list = new ArrayList(); - list.add(e); - } - } - if(list != null) { - for (ListenerEntry e : list) { - entries.remove(e); - } - } - if (entries.isEmpty()) { - listeners.remove(entry); - return false; - } - return true; - } - return false; - } - - List getListenerEntries(CacheEntry entry) { - hasListenerAfterDisposing(entry); - if(listeners.get(entry) != null) - return listeners.get(entry); - else - return Collections.emptyList(); - } - - void processListenerReport(CacheEntry entry, Map> workarea) { - - if(!workarea.containsKey(entry)) { - - HashSet ls = new HashSet(); - for(ListenerEntry e : getListenerEntries(entry)) - ls.add(e.base); - - workarea.put(entry, ls); - - for(CacheEntry parent : entry.getParents(processor)) { - processListenerReport(parent, workarea); - ls.addAll(workarea.get(parent)); - } - - } - - } - - public synchronized ListenerReport getListenerReport() throws IOException { - - class ListenerReportImpl implements ListenerReport { - - Map> workarea = new HashMap>(); - - @Override - public void print(PrintStream b) { - Map hist = new HashMap(); - for(Map.Entry> e : workarea.entrySet()) { - for(ListenerBase l : e.getValue()) { - Integer i = hist.get(l); - hist.put(l, i != null ? i-1 : -1); - } - } - - for(Pair p : CollectionUtils.valueSortedEntries(hist)) { - b.print("" + -p.second + " " + p.first + "\n"); - } - - b.flush(); - } - - } - - ListenerReportImpl result = new ListenerReportImpl(); - - Collection all = processor.allCaches(new CacheCollectionResult()).toCollection(); - for(CacheEntryBase entry : all) { - hasListenerAfterDisposing(entry); - } - for(CacheEntryBase entry : all) { - processListenerReport(entry, result.workarea); - } - - return result; - - } - - public synchronized String reportListeners(File file) throws IOException { - - if (!processor.isAlive()) - return "Disposed!"; - - PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); - ListenerReport report = getListenerReport(); - report.print(b); - - return "Done reporting listeners."; - - } - - public void fireListeners(WriteGraphImpl graph) { - - assert (!processor.updating); - assert (!processor.cache.collecting); - assert (!firingListeners); - - firingListeners = true; - - try { - - // Performing may cause further events to be scheduled. - while (!scheduledListeners.isEmpty()) { - - // Clone current events to make new entries possible during - // firing. - THashSet entries = scheduledListeners; - scheduledListeners = new THashSet(); - - ArrayList schedule = new ArrayList(); - - for (ListenerEntry listenerEntry : entries) { - - if (pruneListener(listenerEntry)) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Pruned " + listenerEntry.procedure); - } - } - continue; - } - - final CacheEntry entry = listenerEntry.entry; - assert (entry != null); - - Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown()); - - if (newValue != ListenerEntry.NOT_CHANGED) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); - } - } - schedule.add(listenerEntry); - listenerEntry.setLastKnown(entry.getResult()); - } - - } - - for(ListenerEntry listenerEntry : schedule) { - final CacheEntry entry = listenerEntry.entry; - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure); - } - } - try { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); - } - } - entry.performFromCache(graph, listenerEntry.procedure); - } catch (Throwable t) { - t.printStackTrace(); - } - } - - } - - } finally { - firingListeners = false; - } - - } - - void updateParents(int indent, CacheEntry entry, LinkedList todo) { - - Iterable oldParents = entry.getParents(processor); - for (CacheEntry parent : oldParents) { - if(!parent.isDiscarded()) - todo.push(new UpdateEntry(entry, parent, indent + 2)); - } - - } - - private boolean pruneListener(ListenerEntry entry) { - if (entry.base.isDisposed()) { - removeListener(entry); - return true; - } else { - return false; - } - } - + private final QueryProcessor processor; + private final Scheduler scheduler; + private final Consumer consumer; + private final Map addedEntries = new HashMap<>(); + + private THashSet scheduledListeners = new THashSet(); + private boolean firingListeners = false; + + final THashMap> listeners = new THashMap>(10, 0.75f); + + private static class ThreadQueue extends ThreadLocal> { + + private final Map> allQueues = new HashMap<>(); + + private ArrayList> dispatchedQueues = new ArrayList<>(); + + @Override + protected synchronized ArrayList initialValue() { + ArrayList result = new ArrayList<>(); + allQueues.put(Thread.currentThread(), result); + return result; + } + + synchronized void sendToExecution() { + ArrayList rs = allQueues.remove(Thread.currentThread()); + dispatchedQueues.add(rs); + notify(); + } + + synchronized ArrayList> getDispatchedQueues() { + ArrayList> result = dispatchedQueues; + dispatchedQueues = new ArrayList<>(); + return result; + } + + } + + private static class Scheduler { + + private static final int BUFFER_SIZE = 100; + + private final QueryProcessor processor; + + private final ThreadQueue queues = new ThreadQueue(); + + /* + * Access to this field is synchronized using the monitor of 'queues' + * The method flush() shall be called by the single writing thread and the field has been nulled before the + * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time. + */ + private Semaphore flush = null; + + Scheduler(QueryProcessor processor) { + this.processor = processor; + } + + private boolean isTerminated() { + return processor.isDisposed(); + } + + Consumer newConsumer() { + return (task) -> { + + ArrayList l = queues.get(); + l.add(task); + if(l.size() == BUFFER_SIZE) { + queues.remove(); + queues.sendToExecution(); + } + + }; + } + + void start() { + + Thread thread = new Thread() { + + public void run() { + + synchronized(queues) { + while(!isTerminated()) { + try { + ArrayList> qs = queues.getDispatchedQueues(); + for(ArrayList queue : qs) { + for(Runnable r : queue) + r.run(); + } + if(flush != null) { + for(ArrayList queue : queues.allQueues.values()) { + for(Runnable r : queue) { + r.run(); + } + queue.clear(); + } + Semaphore s = flush; + flush = null; + s.release(); + } + queues.wait(1000); + } catch (InterruptedException e) { + LOGGER.error("Unexpected interrupt", e); + } + } + } + + }; + + }; + + thread.setName("QueryListening"); + thread.start(); + + } + + private Semaphore createFlush() { + synchronized(queues) { + flush = new Semaphore(0); + queues.notify(); + return flush; + } + } + + void flush() { + try { + createFlush().acquire(); + } catch (InterruptedException e) { + LOGGER.error("Unexpected interrupt", e); + } + } + + } + + private static class RegisterParentRunnable implements Runnable { + + private final CacheEntry parent; + private final CacheEntry child; + + public RegisterParentRunnable(CacheEntry parent, CacheEntry child) { + this.parent = parent; + this.child = child; + } + + @Override + public void run() { + child.addParent(parent); + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { + System.out.println(child + " -> " + parent); + } + } + } + + } + + private static class RegisterListenerRunnable implements Runnable { + + private final QueryListening queryListening; + private final ListenerBase base; + private final Object procedure; + private final CacheEntry parent; + private final CacheEntry entry; + + public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) { + this.queryListening = queryListening; + this.base = base; + this.procedure = procedure; + this.parent = parent; + this.entry = entry; + } + + @Override + public void run() { + + assert (entry != null); + assert (procedure != null); + + ArrayList list = queryListening.listeners.get(entry); + if (list == null) { + list = new ArrayList<>(1); + queryListening.listeners.put(entry, list); + } + + ListenerEntry result = new ListenerEntry(entry, base, procedure); + // Equals is here based on base + int currentIndex = list.indexOf(result); + // There was already a listener + if(currentIndex > -1) { + ListenerEntry current = list.get(currentIndex); + if(!current.base.isDisposed()) + return; + list.set(currentIndex, result); + } else { + list.add(result); + } + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); + } + } + + queryListening.addedEntries.put(base, result); + + } + + + } + + QueryListening(QueryProcessor processor) { + + this.processor = processor; + scheduler = new Scheduler(processor); + consumer = scheduler.newConsumer(); + scheduler.start(); + + } + + public boolean hasScheduledUpdates() { + return !scheduledListeners.isEmpty(); + } + + void sync() { + try { + scheduler.flush(); + } catch (Throwable t) { + LOGGER.error("Error while waiting for query dependency management", t); + } + } + + void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { + + if(inferred) { + assert(listener == null); + return; + } + + if(parent != null) { + try { + if(!child.isImmutable(graph)) + consumer.accept(new RegisterParentRunnable(parent, child)); + } catch (DatabaseException e) { + LOGGER.error("Error while registering query dependencies", e); + } + } + + if(listener != null) + if(!listener.isDisposed()) + consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child)); + + } + + void registerFirstKnown(ListenerBase base, Object result) { + + if(base == null) return; + + consumer.accept(() -> { + ListenerEntry entry = addedEntries.get(base); + if(entry != null) entry.setLastKnown(result); + }); + + } + + void scheduleListener(ListenerEntry entry) { + + assert (entry != null); + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Scheduled " + entry.procedure); + } + } + + scheduledListeners.add(entry); + + } + + boolean hasListener(CacheEntry entry) { + if(listeners.get(entry) != null) return true; + return false; + } + + boolean hasListenerAfterDisposing(CacheEntry entry) { + if(listeners.get(entry) != null) { + ArrayList entries = listeners.get(entry); + ArrayList list = null; + for (ListenerEntry e : entries) { + if (e.base.isDisposed()) { + if(list == null) list = new ArrayList(); + list.add(e); + } + } + if(list != null) { + for (ListenerEntry e : list) { + entries.remove(e); + } + } + if (entries.isEmpty()) { + listeners.remove(entry); + return false; + } + return true; + } + return false; + } + + void processListenerReport(CacheEntry entry, Map> workarea) { + + if(!workarea.containsKey(entry)) { + + HashSet ls = new HashSet(); + for(ListenerEntry e : getListenerEntries(entry)) + ls.add(e.base); + + workarea.put(entry, ls); + + for(CacheEntry parent : entry.getParents(processor)) { + processListenerReport(parent, workarea); + ls.addAll(workarea.get(parent)); + } + + } + + } + + public synchronized ListenerReport getListenerReport() throws IOException { + + class ListenerReportImpl implements ListenerReport { + + Map> workarea = new HashMap>(); + + @Override + public void print(PrintStream b) { + Map hist = new HashMap(); + for(Map.Entry> e : workarea.entrySet()) { + for(ListenerBase l : e.getValue()) { + Integer i = hist.get(l); + hist.put(l, i != null ? i-1 : -1); + } + } + + for(Pair p : CollectionUtils.valueSortedEntries(hist)) { + b.print("" + -p.second + " " + p.first + "\n"); + } + + b.flush(); + } + + } + + ListenerReportImpl result = new ListenerReportImpl(); + + Collection all = processor.allCaches(new CacheCollectionResult()).toCollection(); + for(CacheEntryBase entry : all) { + hasListenerAfterDisposing(entry); + } + for(CacheEntryBase entry : all) { + processListenerReport(entry, result.workarea); + } + + return result; + + } + + public synchronized String reportListeners(File file) throws IOException { + + if (!processor.isAlive()) + return "Disposed!"; + + PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); + ListenerReport report = getListenerReport(); + report.print(b); + + return "Done reporting listeners."; + + } + + public void fireListeners(WriteGraphImpl graph) { + + assert (!processor.updating); + assert (!processor.cache.collecting); + assert (!firingListeners); + + firingListeners = true; + + try { + + // Performing may cause further events to be scheduled. + while (!scheduledListeners.isEmpty()) { + + // Clone current events to make new entries possible during + // firing. + THashSet entries = scheduledListeners; + scheduledListeners = new THashSet(); + + ArrayList schedule = new ArrayList(); + + for (ListenerEntry listenerEntry : entries) { + + if (pruneListener(listenerEntry)) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("Pruned " + listenerEntry.procedure); + } + } + continue; + } + + final CacheEntry entry = listenerEntry.entry; + assert (entry != null); + + Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown()); + + if (newValue != ListenerEntry.NOT_CHANGED) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); + } + } + schedule.add(listenerEntry); + listenerEntry.setLastKnown(entry.getResult()); + } + + } + + for(ListenerEntry listenerEntry : schedule) { + final CacheEntry entry = listenerEntry.entry; + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Firing " + listenerEntry.procedure); + } + } + try { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); + } + } + entry.performFromCache(graph, listenerEntry.procedure); + } catch (Throwable t) { + LOGGER.error("Unexpected exception ", t); + } + } + + } + + } finally { + firingListeners = false; + } + + } + + void updateParents(int indent, CacheEntry entry, LinkedList todo) { + + Iterable oldParents = entry.getParents(processor); + for (CacheEntry parent : oldParents) { + if(!parent.isDiscarded()) + todo.push(new UpdateEntry(entry, parent, indent + 2)); + } + + } + + private boolean pruneListener(ListenerEntry entry) { + + if (entry.base.isDisposed()) { + + assert (entry != null); + ArrayList list = listeners.get(entry.entry); + if(list != null) { + boolean success = list.remove(entry); + assert (success); + if (list.isEmpty()) + listeners.remove(entry.entry); + } + + return true; + + } else { + + return false; + + } + } + + private List getListenerEntries(CacheEntry entry) { + hasListenerAfterDisposing(entry); + if(listeners.get(entry) != null) + return listeners.get(entry); + else + return Collections.emptyList(); + } + }