import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.function.Consumer;
import org.simantics.databoard.Bindings;
import org.simantics.db.DevelopmentKeys;
import org.simantics.db.debug.ListenerReport;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.impl.graph.ReadGraphImpl;
-import org.simantics.db.impl.graph.WriteGraphImpl;
import org.simantics.db.procedure.ListenerBase;
import org.simantics.utils.Development;
import org.simantics.utils.datastructures.Pair;
public class QueryListening {
- private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
+ static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
private final QueryProcessor processor;
private final Scheduler scheduler;
- private final Consumer<Runnable> consumer;
private final Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
- private static class ThreadQueue extends ThreadLocal<ArrayList<Runnable>> {
-
- private final Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
-
- private ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
-
- @Override
- protected synchronized ArrayList<Runnable> initialValue() {
- ArrayList<Runnable> result = new ArrayList<>();
- allQueues.put(Thread.currentThread(), result);
- return result;
- }
-
- synchronized void sendToExecution() {
- ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
- dispatchedQueues.add(rs);
- notify();
- }
-
- synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
- ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
- dispatchedQueues = new ArrayList<>();
- return result;
- }
-
- }
-
- private static class Scheduler {
-
- private static final int BUFFER_SIZE = 100;
-
- private final QueryProcessor processor;
-
- private final ThreadQueue queues = new ThreadQueue();
-
- /*
- * Access to this field is synchronized using the monitor of 'queues'
- * The method flush() shall be called by the single writing thread and the field has been nulled before the
- * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time.
- */
- private Semaphore flush = null;
-
- Scheduler(QueryProcessor processor) {
- this.processor = processor;
- }
-
- private boolean isTerminated() {
- return processor.isDisposed();
- }
-
- Consumer<Runnable> newConsumer() {
- return (task) -> {
-
- ArrayList<Runnable> l = queues.get();
- l.add(task);
- if(l.size() == BUFFER_SIZE) {
- queues.remove();
- queues.sendToExecution();
- }
-
- };
- }
-
- void start() {
-
- Thread thread = new Thread() {
-
- public void run() {
-
- synchronized(queues) {
- while(!isTerminated()) {
- try {
- ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
- for(ArrayList<Runnable> queue : qs) {
- for(Runnable r : queue)
- r.run();
- }
- if(flush != null) {
- for(ArrayList<Runnable> queue : queues.allQueues.values()) {
- for(Runnable r : queue) {
- r.run();
- }
- queue.clear();
- }
- Semaphore s = flush;
- flush = null;
- s.release();
- }
- queues.wait(1000);
- } catch (InterruptedException e) {
- LOGGER.error("Unexpected interrupt", e);
- }
- }
- }
-
- };
-
- };
-
- thread.setName("QueryListening");
- thread.start();
-
- }
-
- private Semaphore createFlush() {
- synchronized(queues) {
- flush = new Semaphore(0);
- queues.notify();
- return flush;
- }
- }
-
- void flush() {
- try {
- createFlush().acquire();
- } catch (InterruptedException e) {
- LOGGER.error("Unexpected interrupt", e);
- }
- }
-
- }
-
- private static class RegisterParentRunnable implements Runnable {
-
- private final CacheEntry parent;
- private final CacheEntry child;
-
- public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
- this.parent = parent;
- this.child = child;
- }
-
- @Override
- public void run() {
- child.addParent(parent);
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
- System.out.println(child + " -> " + parent);
- }
- }
- }
-
- }
-
- private static class RegisterListenerRunnable implements Runnable {
-
- private final QueryListening queryListening;
- private final ListenerBase base;
- private final Object procedure;
- private final CacheEntry parent;
- private final CacheEntry entry;
-
- public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
- this.queryListening = queryListening;
- this.base = base;
- this.procedure = procedure;
- this.parent = parent;
- this.entry = entry;
- }
-
- @Override
- public void run() {
-
- assert (entry != null);
- assert (procedure != null);
-
- ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
- if (list == null) {
- list = new ArrayList<>(1);
- queryListening.listeners.put(entry, list);
- }
-
- ListenerEntry result = new ListenerEntry(entry, base, procedure);
- // Equals is here based on base
- int currentIndex = list.indexOf(result);
- // There was already a listener
- if(currentIndex > -1) {
- ListenerEntry current = list.get(currentIndex);
- if(!current.base.isDisposed())
- return;
- list.set(currentIndex, result);
- } else {
- list.add(result);
- }
-
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- new Exception().printStackTrace();
- System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
- }
- }
-
- queryListening.addedEntries.put(base, result);
-
- }
-
-
- }
QueryListening(QueryProcessor processor) {
this.processor = processor;
scheduler = new Scheduler(processor);
- consumer = scheduler.newConsumer();
scheduler.start();
}
void sync() {
try {
- scheduler.flush();
+ scheduler.sync();
} catch (Throwable t) {
LOGGER.error("Error while waiting for query dependency management", t);
}
if(parent != null) {
try {
if(!child.isImmutable(graph))
- consumer.accept(new RegisterParentRunnable(parent, child));
+ scheduler.accept(new RegisterParentRunnable(parent, child));
} catch (DatabaseException e) {
LOGGER.error("Error while registering query dependencies", e);
}
if(listener != null)
if(!listener.isDisposed())
- consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
+ scheduler.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
}
if(base == null) return;
- consumer.accept(() -> {
- ListenerEntry entry = addedEntries.get(base);
- if(entry != null) entry.setLastKnown(result);
- });
+ scheduler.accept(new RegisterFirstKnownRunnable(addedEntries, base, result));
}
else
return Collections.emptyList();
}
-
+
+ private static class RegisterParentRunnable implements Runnable {
+
+ private final CacheEntry parent;
+ private final CacheEntry child;
+
+ public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
+ this.parent = parent;
+ this.child = child;
+ }
+
+ @Override
+ public void run() {
+ child.addParent(parent);
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+ System.out.println(child + " -> " + parent);
+ }
+ }
+ }
+
+ }
+
+ private static class RegisterListenerRunnable implements Runnable {
+
+ private final QueryListening queryListening;
+ private final ListenerBase base;
+ private final Object procedure;
+ private final CacheEntry parent;
+ private final CacheEntry entry;
+
+ public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
+ this.queryListening = queryListening;
+ this.base = base;
+ this.procedure = procedure;
+ this.parent = parent;
+ this.entry = entry;
+ }
+
+ @Override
+ public void run() {
+
+ assert (entry != null);
+ assert (procedure != null);
+
+ ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
+ if (list == null) {
+ list = new ArrayList<>(1);
+ queryListening.listeners.put(entry, list);
+ }
+
+ ListenerEntry result = new ListenerEntry(entry, base, procedure);
+ // Equals is here based on base
+ int currentIndex = list.indexOf(result);
+ // There was already a listener
+ if(currentIndex > -1) {
+ ListenerEntry current = list.get(currentIndex);
+ if(!current.base.isDisposed())
+ return;
+ list.set(currentIndex, result);
+ } else {
+ list.add(result);
+ }
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
+ }
+ }
+
+ queryListening.addedEntries.put(base, result);
+
+ }
+
+
+ }
+
+ private static class RegisterFirstKnownRunnable implements Runnable {
+
+ private final Map<ListenerBase,ListenerEntry> addedEntries;
+ private final ListenerBase base;
+ private final Object result;
+
+ public RegisterFirstKnownRunnable(Map<ListenerBase,ListenerEntry> addedEntries, ListenerBase base, Object result) {
+ this.addedEntries = addedEntries;
+ this.base = base;
+ this.result = result;
+ }
+
+ @Override
+ public void run() {
+ ListenerEntry entry = addedEntries.remove(base);
+ if(entry != null) entry.setLastKnown(result);
+ }
+
+ }
+
}