From: Antti Villberg Date: Sat, 21 Mar 2020 18:56:50 +0000 (+0200) Subject: QueryListening sync is slow X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=commitdiff_plain;h=c4d9561b1b35a0e8e594158fbb01a9c632997808;p=simantics%2Fplatform.git QueryListening sync is slow gitlab #503 Change-Id: Ied3aaf386094c3d52b3840d000fb9de01c9ebd03 --- diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java index e3bd43741..91200f1fc 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java @@ -14,15 +14,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Semaphore; -import java.util.function.Consumer; import org.simantics.databoard.Bindings; import org.simantics.db.DevelopmentKeys; import org.simantics.db.debug.ListenerReport; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.graph.ReadGraphImpl; -import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.procedure.ListenerBase; import org.simantics.utils.Development; import org.simantics.utils.datastructures.Pair; @@ -34,11 +31,10 @@ import gnu.trove.set.hash.THashSet; public class QueryListening { - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); + static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); private final QueryProcessor processor; private final Scheduler scheduler; - private final Consumer consumer; private final Map addedEntries = new HashMap<>(); private THashSet scheduledListeners = new THashSet(); @@ -46,210 +42,11 @@ public class QueryListening { final THashMap> listeners = new THashMap>(10, 0.75f); - private static class ThreadQueue extends ThreadLocal> { - - private final Map> allQueues = new HashMap<>(); - - private ArrayList> dispatchedQueues = new ArrayList<>(); - - @Override - protected synchronized ArrayList initialValue() { - ArrayList result = new ArrayList<>(); - allQueues.put(Thread.currentThread(), result); - return result; - } - - synchronized void sendToExecution() { - ArrayList rs = allQueues.remove(Thread.currentThread()); - dispatchedQueues.add(rs); - notify(); - } - - synchronized ArrayList> getDispatchedQueues() { - ArrayList> result = dispatchedQueues; - dispatchedQueues = new ArrayList<>(); - return result; - } - - } - - private static class Scheduler { - - private static final int BUFFER_SIZE = 100; - - private final QueryProcessor processor; - - private final ThreadQueue queues = new ThreadQueue(); - - /* - * Access to this field is synchronized using the monitor of 'queues' - * The method flush() shall be called by the single writing thread and the field has been nulled before the - * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time. - */ - private Semaphore flush = null; - - Scheduler(QueryProcessor processor) { - this.processor = processor; - } - - private boolean isTerminated() { - return processor.isDisposed(); - } - - Consumer newConsumer() { - return (task) -> { - - ArrayList l = queues.get(); - l.add(task); - if(l.size() == BUFFER_SIZE) { - queues.remove(); - queues.sendToExecution(); - } - - }; - } - - void start() { - - Thread thread = new Thread() { - - public void run() { - - synchronized(queues) { - while(!isTerminated()) { - try { - ArrayList> qs = queues.getDispatchedQueues(); - for(ArrayList queue : qs) { - for(Runnable r : queue) - r.run(); - } - if(flush != null) { - for(ArrayList queue : queues.allQueues.values()) { - for(Runnable r : queue) { - r.run(); - } - queue.clear(); - } - Semaphore s = flush; - flush = null; - s.release(); - } - queues.wait(1000); - } catch (InterruptedException e) { - LOGGER.error("Unexpected interrupt", e); - } - } - } - - }; - - }; - - thread.setName("QueryListening"); - thread.start(); - - } - - private Semaphore createFlush() { - synchronized(queues) { - flush = new Semaphore(0); - queues.notify(); - return flush; - } - } - - void flush() { - try { - createFlush().acquire(); - } catch (InterruptedException e) { - LOGGER.error("Unexpected interrupt", e); - } - } - - } - - private static class RegisterParentRunnable implements Runnable { - - private final CacheEntry parent; - private final CacheEntry child; - - public RegisterParentRunnable(CacheEntry parent, CacheEntry child) { - this.parent = parent; - this.child = child; - } - - @Override - public void run() { - child.addParent(parent); - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { - System.out.println(child + " -> " + parent); - } - } - } - - } - - private static class RegisterListenerRunnable implements Runnable { - - private final QueryListening queryListening; - private final ListenerBase base; - private final Object procedure; - private final CacheEntry parent; - private final CacheEntry entry; - - public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) { - this.queryListening = queryListening; - this.base = base; - this.procedure = procedure; - this.parent = parent; - this.entry = entry; - } - - @Override - public void run() { - - assert (entry != null); - assert (procedure != null); - - ArrayList list = queryListening.listeners.get(entry); - if (list == null) { - list = new ArrayList<>(1); - queryListening.listeners.put(entry, list); - } - - ListenerEntry result = new ListenerEntry(entry, base, procedure); - // Equals is here based on base - int currentIndex = list.indexOf(result); - // There was already a listener - if(currentIndex > -1) { - ListenerEntry current = list.get(currentIndex); - if(!current.base.isDisposed()) - return; - list.set(currentIndex, result); - } else { - list.add(result); - } - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); - } - } - - queryListening.addedEntries.put(base, result); - - } - - - } QueryListening(QueryProcessor processor) { this.processor = processor; scheduler = new Scheduler(processor); - consumer = scheduler.newConsumer(); scheduler.start(); } @@ -260,7 +57,7 @@ public class QueryListening { void sync() { try { - scheduler.flush(); + scheduler.sync(); } catch (Throwable t) { LOGGER.error("Error while waiting for query dependency management", t); } @@ -276,7 +73,7 @@ public class QueryListening { if(parent != null) { try { if(!child.isImmutable(graph)) - consumer.accept(new RegisterParentRunnable(parent, child)); + scheduler.accept(new RegisterParentRunnable(parent, child)); } catch (DatabaseException e) { LOGGER.error("Error while registering query dependencies", e); } @@ -284,7 +81,7 @@ public class QueryListening { if(listener != null) if(!listener.isDisposed()) - consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child)); + scheduler.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child)); } @@ -292,10 +89,7 @@ public class QueryListening { if(base == null) return; - consumer.accept(() -> { - ListenerEntry entry = addedEntries.get(base); - if(entry != null) entry.setLastKnown(result); - }); + scheduler.accept(new RegisterFirstKnownRunnable(addedEntries, base, result)); } @@ -529,5 +323,102 @@ public class QueryListening { else return Collections.emptyList(); } - + + private static class RegisterParentRunnable implements Runnable { + + private final CacheEntry parent; + private final CacheEntry child; + + public RegisterParentRunnable(CacheEntry parent, CacheEntry child) { + this.parent = parent; + this.child = child; + } + + @Override + public void run() { + child.addParent(parent); + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { + System.out.println(child + " -> " + parent); + } + } + } + + } + + private static class RegisterListenerRunnable implements Runnable { + + private final QueryListening queryListening; + private final ListenerBase base; + private final Object procedure; + private final CacheEntry parent; + private final CacheEntry entry; + + public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) { + this.queryListening = queryListening; + this.base = base; + this.procedure = procedure; + this.parent = parent; + this.entry = entry; + } + + @Override + public void run() { + + assert (entry != null); + assert (procedure != null); + + ArrayList list = queryListening.listeners.get(entry); + if (list == null) { + list = new ArrayList<>(1); + queryListening.listeners.put(entry, list); + } + + ListenerEntry result = new ListenerEntry(entry, base, procedure); + // Equals is here based on base + int currentIndex = list.indexOf(result); + // There was already a listener + if(currentIndex > -1) { + ListenerEntry current = list.get(currentIndex); + if(!current.base.isDisposed()) + return; + list.set(currentIndex, result); + } else { + list.add(result); + } + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); + } + } + + queryListening.addedEntries.put(base, result); + + } + + + } + + private static class RegisterFirstKnownRunnable implements Runnable { + + private final Map addedEntries; + private final ListenerBase base; + private final Object result; + + public RegisterFirstKnownRunnable(Map addedEntries, ListenerBase base, Object result) { + this.addedEntries = addedEntries; + this.base = base; + this.result = result; + } + + @Override + public void run() { + ListenerEntry entry = addedEntries.get(base); + if(entry != null) entry.setLastKnown(result); + } + + } + } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java new file mode 100644 index 000000000..2c9c47574 --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/Scheduler.java @@ -0,0 +1,203 @@ +package org.simantics.db.impl.query; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Semaphore; + +class Scheduler extends ThreadLocal { + + private final QueryProcessor processor; + + private static final int MAX_TASKLIST_SIZE = 1<<8; + + /* + * New TaskList objects shall be assigned this size. + * This shall grow exponentially upon task list creation to max size and shall reset to 1 when a write synchronizes the tasks. + * The intention is to make sure that upon frequent syncs the tasks shall be scheduled + */ + private int currentTaskListSize = 1; + + /* + * Anyone wishing to execute any tasks needs to hold this monitor + */ + private Object taskExecutionMonitor = new Object(); + + /* + * Anyone wishing to manipulate the list of dispatched tasks needs to hold this monitor + */ + private Object taskDispatchMonitor = new Object(); + + /* + * When a thread-specific TaskList becomes full it is added into this list for execution. + * The executor thread takes TaskLists from this list. + * The sync operation can also execute remaining tasks. + */ + private ArrayList dispatchedTaskLists; + + /* + * This replicates the ThreadLocal map of TaskList objects to enable listing of tasks active queues + */ + private final Map activeTaskLists = new HashMap<>(); + + /* + * This semaphore is incremented each time s task list gets dispatched. The execution thread waits for the semaphore. + */ + private final Semaphore taskDispatchNotifications = new Semaphore(0); + + Scheduler(QueryProcessor processor) { + this.processor = processor; + } + + /* + * This gets called by multiple query threads + */ + void accept(Runnable task) { + TaskList l = get(); + l.add(task); + if(l.shouldSchedule()) { + remove(); + sendToExecution(); + } + } + + /* + * A task list has reached full size and is moved into execution by the execution thread + */ + void sendToExecution() { + synchronized(activeTaskLists) { + // This is dispatched so it should not anymore be in list of active TaskList objects. + TaskList l = activeTaskLists.remove(Thread.currentThread()); + synchronized(taskDispatchMonitor) { + if(dispatchedTaskLists == null) + dispatchedTaskLists = new ArrayList(); + dispatchedTaskLists.add(l); + } + if(currentTaskListSize < MAX_TASKLIST_SIZE) + currentTaskListSize = currentTaskListSize<<1; + } + // Release the execution thread + taskDispatchNotifications.release(); + } + + /* + * Gets dispatched TaskList objects for execution. + * Returns null when there's nothing to execute (and thread shall go to sleep). + */ + ArrayList getDispatchedQueues() { + synchronized(taskDispatchMonitor) { + if(dispatchedTaskLists == null) + return null; + ArrayList result = dispatchedTaskLists; + dispatchedTaskLists = null; + return result; + } + } + + void start() { + + Thread thread = new Thread() { + + public void run() { + + // We execute tasks as long as the query processor is alive + while(!processor.isDisposed()) { + + try { + + // This is a mutex between this place and sync() + synchronized(taskExecutionMonitor) { + + // Execute busily while there is something to execute + while(true) { + + ArrayList qs = getDispatchedQueues(); + if(qs == null) { + // No tasks - go to sleep + break; + } + + // Execute everything we got + for(TaskList queue : qs) { + for(Runnable r : queue) { + r.run(); + } + } + + } + + } + + // sendToExecution shall release this + taskDispatchNotifications.acquire(); + + } catch (InterruptedException e) { + QueryListening.LOGGER.error("Unexpected interrupt", e); + } + + } + + }; + + }; + + thread.setName("QueryListening"); + thread.start(); + + } + + /* + * ThreadLocal calls this to create a TaskList. + * Called with first list per thread and also when accepting next task after sendToExecution. + */ + @Override + protected TaskList initialValue() { + TaskList result = new TaskList(currentTaskListSize); + synchronized(activeTaskLists) { + activeTaskLists.put(Thread.currentThread(), result); + } + return result; + } + + /* + * This makes sure that all tasks have been executed upon return. + * It takes the taskExecutionMonitor from the thread and executes possible dispatched work and + * also executes anything in active task lists. It resets the dispatch sizes of current and + * subsequently created task lists to 1. + */ + void sync() { + + // This is a mutex between this place and execution thread + synchronized(taskExecutionMonitor) { + + currentTaskListSize = 1; + + // Perform all dispatched work + ArrayList qs = getDispatchedQueues(); + if(qs != null) { + for(TaskList queue : qs) { + for(Runnable r : queue) { + r.run(); + } + } + } + + // Also perform any tasks in active queues. + // This assumes that all query threads that send new tasks are now parked and shall not interfere. + synchronized(activeTaskLists) { + for(TaskList queue : activeTaskLists.values()) { + // Run all tasks + for(Runnable r : queue) { + if(r != null) + r.run(); + } + // Remove the tasks from list and reset the max size to 1. + queue.reset(); + } + } + + } + + } + +} \ No newline at end of file diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/TaskList.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/TaskList.java new file mode 100644 index 000000000..7734bae77 --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/TaskList.java @@ -0,0 +1,24 @@ +package org.simantics.db.impl.query; + +import java.util.ArrayList; + +class TaskList extends ArrayList { + + private static final long serialVersionUID = -8047504882681478972L; + + private int maxSize; + + TaskList(int maxSize) { + this.maxSize = maxSize; + } + + boolean shouldSchedule() { + return size() == maxSize; + } + + void reset() { + clear(); + maxSize = 1; + } + +} \ No newline at end of file