import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.function.Consumer;
import org.simantics.databoard.Bindings;
import org.simantics.db.DevelopmentKeys;
import org.simantics.db.debug.ListenerReport;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.impl.graph.ReadGraphImpl;
-import org.simantics.db.impl.graph.WriteGraphImpl;
import org.simantics.db.procedure.ListenerBase;
import org.simantics.utils.Development;
import org.simantics.utils.datastructures.Pair;
public class QueryListening {
- private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
+ static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
private final QueryProcessor processor;
private final Scheduler scheduler;
- private final Consumer<Runnable> consumer;
private final Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
- private static class ThreadQueue extends ThreadLocal<ArrayList<Runnable>> {
-
- private final Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
-
- private ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
-
- @Override
- protected synchronized ArrayList<Runnable> initialValue() {
- ArrayList<Runnable> result = new ArrayList<>();
- allQueues.put(Thread.currentThread(), result);
- return result;
- }
-
- synchronized void sendToExecution() {
- ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
- dispatchedQueues.add(rs);
- notify();
- }
-
- synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
- ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
- dispatchedQueues = new ArrayList<>();
- return result;
- }
-
- }
-
- private static class Scheduler {
-
- private static final int BUFFER_SIZE = 100;
-
- private final QueryProcessor processor;
-
- private final ThreadQueue queues = new ThreadQueue();
-
- /*
- * Access to this field is synchronized using the monitor of 'queues'
- * The method flush() shall be called by the single writing thread and the field has been nulled before the
- * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time.
- */
- private Semaphore flush = null;
-
- Scheduler(QueryProcessor processor) {
- this.processor = processor;
- }
-
- private boolean isTerminated() {
- return processor.isDisposed();
- }
-
- Consumer<Runnable> newConsumer() {
- return (task) -> {
-
- ArrayList<Runnable> l = queues.get();
- l.add(task);
- if(l.size() == BUFFER_SIZE) {
- queues.remove();
- queues.sendToExecution();
- }
-
- };
- }
-
- void start() {
-
- Thread thread = new Thread() {
-
- public void run() {
-
- synchronized(queues) {
- while(!isTerminated()) {
- try {
- ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
- for(ArrayList<Runnable> queue : qs) {
- for(Runnable r : queue)
- r.run();
- }
- if(flush != null) {
- for(ArrayList<Runnable> queue : queues.allQueues.values()) {
- for(Runnable r : queue) {
- r.run();
- }
- queue.clear();
- }
- Semaphore s = flush;
- flush = null;
- s.release();
- }
- queues.wait(1000);
- } catch (InterruptedException e) {
- LOGGER.error("Unexpected interrupt", e);
- }
- }
- }
-
- };
-
- };
-
- thread.setName("QueryListening");
- thread.start();
-
- }
-
- private Semaphore createFlush() {
- synchronized(queues) {
- flush = new Semaphore(0);
- queues.notify();
- return flush;
- }
- }
-
- void flush() {
- try {
- createFlush().acquire();
- } catch (InterruptedException e) {
- LOGGER.error("Unexpected interrupt", e);
- }
- }
-
- }
-
- private static class RegisterParentRunnable implements Runnable {
-
- private final CacheEntry parent;
- private final CacheEntry child;
-
- public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
- this.parent = parent;
- this.child = child;
- }
-
- @Override
- public void run() {
- child.addParent(parent);
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
- System.out.println(child + " -> " + parent);
- }
- }
- }
-
- }
-
- private static class RegisterListenerRunnable implements Runnable {
-
- private final QueryListening queryListening;
- private final ListenerBase base;
- private final Object procedure;
- private final CacheEntry parent;
- private final CacheEntry entry;
-
- public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
- this.queryListening = queryListening;
- this.base = base;
- this.procedure = procedure;
- this.parent = parent;
- this.entry = entry;
- }
-
- @Override
- public void run() {
-
- assert (entry != null);
- assert (procedure != null);
-
- ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
- if (list == null) {
- list = new ArrayList<>(1);
- queryListening.listeners.put(entry, list);
- }
-
- ListenerEntry result = new ListenerEntry(entry, base, procedure);
- // Equals is here based on base
- int currentIndex = list.indexOf(result);
- // There was already a listener
- if(currentIndex > -1) {
- ListenerEntry current = list.get(currentIndex);
- if(!current.base.isDisposed())
- return;
- list.set(currentIndex, result);
- } else {
- list.add(result);
- }
-
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- new Exception().printStackTrace();
- System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
- }
- }
-
- queryListening.addedEntries.put(base, result);
-
- }
-
-
- }
QueryListening(QueryProcessor processor) {
this.processor = processor;
scheduler = new Scheduler(processor);
- consumer = scheduler.newConsumer();
scheduler.start();
}
void sync() {
try {
- scheduler.flush();
+ scheduler.sync();
} catch (Throwable t) {
LOGGER.error("Error while waiting for query dependency management", t);
}
if(parent != null) {
try {
if(!child.isImmutable(graph))
- consumer.accept(new RegisterParentRunnable(parent, child));
+ scheduler.accept(new RegisterParentRunnable(parent, child));
} catch (DatabaseException e) {
LOGGER.error("Error while registering query dependencies", e);
}
if(listener != null)
if(!listener.isDisposed())
- consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
+ scheduler.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
}
if(base == null) return;
- consumer.accept(() -> {
- ListenerEntry entry = addedEntries.get(base);
- if(entry != null) entry.setLastKnown(result);
- });
+ scheduler.accept(new RegisterFirstKnownRunnable(addedEntries, base, result));
}
else
return Collections.emptyList();
}
-
+
+ private static class RegisterParentRunnable implements Runnable {
+
+ private final CacheEntry parent;
+ private final CacheEntry child;
+
+ public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
+ this.parent = parent;
+ this.child = child;
+ }
+
+ @Override
+ public void run() {
+ child.addParent(parent);
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+ System.out.println(child + " -> " + parent);
+ }
+ }
+ }
+
+ }
+
+ private static class RegisterListenerRunnable implements Runnable {
+
+ private final QueryListening queryListening;
+ private final ListenerBase base;
+ private final Object procedure;
+ private final CacheEntry parent;
+ private final CacheEntry entry;
+
+ public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
+ this.queryListening = queryListening;
+ this.base = base;
+ this.procedure = procedure;
+ this.parent = parent;
+ this.entry = entry;
+ }
+
+ @Override
+ public void run() {
+
+ assert (entry != null);
+ assert (procedure != null);
+
+ ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
+ if (list == null) {
+ list = new ArrayList<>(1);
+ queryListening.listeners.put(entry, list);
+ }
+
+ ListenerEntry result = new ListenerEntry(entry, base, procedure);
+ // Equals is here based on base
+ int currentIndex = list.indexOf(result);
+ // There was already a listener
+ if(currentIndex > -1) {
+ ListenerEntry current = list.get(currentIndex);
+ if(!current.base.isDisposed())
+ return;
+ list.set(currentIndex, result);
+ } else {
+ list.add(result);
+ }
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
+ }
+ }
+
+ queryListening.addedEntries.put(base, result);
+
+ }
+
+
+ }
+
+ private static class RegisterFirstKnownRunnable implements Runnable {
+
+ private final Map<ListenerBase,ListenerEntry> addedEntries;
+ private final ListenerBase base;
+ private final Object result;
+
+ public RegisterFirstKnownRunnable(Map<ListenerBase,ListenerEntry> addedEntries, ListenerBase base, Object result) {
+ this.addedEntries = addedEntries;
+ this.base = base;
+ this.result = result;
+ }
+
+ @Override
+ public void run() {
+ ListenerEntry entry = addedEntries.get(base);
+ if(entry != null) entry.setLastKnown(result);
+ }
+
+ }
+
}
--- /dev/null
+package org.simantics.db.impl.query;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+class Scheduler extends ThreadLocal<TaskList> {
+
+ private final QueryProcessor processor;
+
+ private static final int MAX_TASKLIST_SIZE = 1<<8;
+
+ /*
+ * New TaskList objects shall be assigned this size.
+ * This shall grow exponentially upon task list creation to max size and shall reset to 1 when a write synchronizes the tasks.
+ * The intention is to make sure that upon frequent syncs the tasks shall be scheduled
+ */
+ private int currentTaskListSize = 1;
+
+ /*
+ * Anyone wishing to execute any tasks needs to hold this monitor
+ */
+ private Object taskExecutionMonitor = new Object();
+
+ /*
+ * Anyone wishing to manipulate the list of dispatched tasks needs to hold this monitor
+ */
+ private Object taskDispatchMonitor = new Object();
+
+ /*
+ * When a thread-specific TaskList becomes full it is added into this list for execution.
+ * The executor thread takes TaskLists from this list.
+ * The sync operation can also execute remaining tasks.
+ */
+ private ArrayList<TaskList> dispatchedTaskLists;
+
+ /*
+ * This replicates the ThreadLocal map of TaskList objects to enable listing of tasks active queues
+ */
+ private final Map<Thread,TaskList> activeTaskLists = new HashMap<>();
+
+ /*
+ * This semaphore is incremented each time s task list gets dispatched. The execution thread waits for the semaphore.
+ */
+ private final Semaphore taskDispatchNotifications = new Semaphore(0);
+
+ Scheduler(QueryProcessor processor) {
+ this.processor = processor;
+ }
+
+ /*
+ * This gets called by multiple query threads
+ */
+ void accept(Runnable task) {
+ TaskList l = get();
+ l.add(task);
+ if(l.shouldSchedule()) {
+ remove();
+ sendToExecution();
+ }
+ }
+
+ /*
+ * A task list has reached full size and is moved into execution by the execution thread
+ */
+ void sendToExecution() {
+ synchronized(activeTaskLists) {
+ // This is dispatched so it should not anymore be in list of active TaskList objects.
+ TaskList l = activeTaskLists.remove(Thread.currentThread());
+ synchronized(taskDispatchMonitor) {
+ if(dispatchedTaskLists == null)
+ dispatchedTaskLists = new ArrayList<TaskList>();
+ dispatchedTaskLists.add(l);
+ }
+ if(currentTaskListSize < MAX_TASKLIST_SIZE)
+ currentTaskListSize = currentTaskListSize<<1;
+ }
+ // Release the execution thread
+ taskDispatchNotifications.release();
+ }
+
+ /*
+ * Gets dispatched TaskList objects for execution.
+ * Returns null when there's nothing to execute (and thread shall go to sleep).
+ */
+ ArrayList<TaskList> getDispatchedQueues() {
+ synchronized(taskDispatchMonitor) {
+ if(dispatchedTaskLists == null)
+ return null;
+ ArrayList<TaskList> result = dispatchedTaskLists;
+ dispatchedTaskLists = null;
+ return result;
+ }
+ }
+
+ void start() {
+
+ Thread thread = new Thread() {
+
+ public void run() {
+
+ // We execute tasks as long as the query processor is alive
+ while(!processor.isDisposed()) {
+
+ try {
+
+ // This is a mutex between this place and sync()
+ synchronized(taskExecutionMonitor) {
+
+ // Execute busily while there is something to execute
+ while(true) {
+
+ ArrayList<TaskList> qs = getDispatchedQueues();
+ if(qs == null) {
+ // No tasks - go to sleep
+ break;
+ }
+
+ // Execute everything we got
+ for(TaskList queue : qs) {
+ for(Runnable r : queue) {
+ r.run();
+ }
+ }
+
+ }
+
+ }
+
+ // sendToExecution shall release this
+ taskDispatchNotifications.acquire();
+
+ } catch (InterruptedException e) {
+ QueryListening.LOGGER.error("Unexpected interrupt", e);
+ }
+
+ }
+
+ };
+
+ };
+
+ thread.setName("QueryListening");
+ thread.start();
+
+ }
+
+ /*
+ * ThreadLocal calls this to create a TaskList.
+ * Called with first list per thread and also when accepting next task after sendToExecution.
+ */
+ @Override
+ protected TaskList initialValue() {
+ TaskList result = new TaskList(currentTaskListSize);
+ synchronized(activeTaskLists) {
+ activeTaskLists.put(Thread.currentThread(), result);
+ }
+ return result;
+ }
+
+ /*
+ * This makes sure that all tasks have been executed upon return.
+ * It takes the taskExecutionMonitor from the thread and executes possible dispatched work and
+ * also executes anything in active task lists. It resets the dispatch sizes of current and
+ * subsequently created task lists to 1.
+ */
+ void sync() {
+
+ // This is a mutex between this place and execution thread
+ synchronized(taskExecutionMonitor) {
+
+ currentTaskListSize = 1;
+
+ // Perform all dispatched work
+ ArrayList<TaskList> qs = getDispatchedQueues();
+ if(qs != null) {
+ for(TaskList queue : qs) {
+ for(Runnable r : queue) {
+ r.run();
+ }
+ }
+ }
+
+ // Also perform any tasks in active queues.
+ // This assumes that all query threads that send new tasks are now parked and shall not interfere.
+ synchronized(activeTaskLists) {
+ for(TaskList queue : activeTaskLists.values()) {
+ // Run all tasks
+ for(Runnable r : queue) {
+ if(r != null)
+ r.run();
+ }
+ // Remove the tasks from list and reset the max size to 1.
+ queue.reset();
+ }
+ }
+
+ }
+
+ }
+
+}
\ No newline at end of file