X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryListening.java;h=91200f1fc3ad037393bc5e3794d6acc4c29a4e7b;hp=e3bd437416af310936e66533746b270d8228aba8;hb=c4d9561b1b35a0e8e594158fbb01a9c632997808;hpb=066c758d34f60a9da4e7b2387689475de1a9cf84 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java index e3bd43741..91200f1fc 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java @@ -14,15 +14,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Semaphore; -import java.util.function.Consumer; import org.simantics.databoard.Bindings; import org.simantics.db.DevelopmentKeys; import org.simantics.db.debug.ListenerReport; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.graph.ReadGraphImpl; -import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.procedure.ListenerBase; import org.simantics.utils.Development; import org.simantics.utils.datastructures.Pair; @@ -34,11 +31,10 @@ import gnu.trove.set.hash.THashSet; public class QueryListening { - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); + static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); private final QueryProcessor processor; private final Scheduler scheduler; - private final Consumer consumer; private final Map addedEntries = new HashMap<>(); private THashSet scheduledListeners = new THashSet(); @@ -46,210 +42,11 @@ public class QueryListening { final THashMap> listeners = new THashMap>(10, 0.75f); - private static class ThreadQueue extends ThreadLocal> { - - private final Map> allQueues = new HashMap<>(); - - private ArrayList> dispatchedQueues = new ArrayList<>(); - - @Override - protected synchronized ArrayList initialValue() { - ArrayList result = new ArrayList<>(); - allQueues.put(Thread.currentThread(), result); - return result; - } - - synchronized void sendToExecution() { - ArrayList rs = allQueues.remove(Thread.currentThread()); - dispatchedQueues.add(rs); - notify(); - } - - synchronized ArrayList> getDispatchedQueues() { - ArrayList> result = dispatchedQueues; - dispatchedQueues = new ArrayList<>(); - return result; - } - - } - - private static class Scheduler { - - private static final int BUFFER_SIZE = 100; - - private final QueryProcessor processor; - - private final ThreadQueue queues = new ThreadQueue(); - - /* - * Access to this field is synchronized using the monitor of 'queues' - * The method flush() shall be called by the single writing thread and the field has been nulled before the - * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time. - */ - private Semaphore flush = null; - - Scheduler(QueryProcessor processor) { - this.processor = processor; - } - - private boolean isTerminated() { - return processor.isDisposed(); - } - - Consumer newConsumer() { - return (task) -> { - - ArrayList l = queues.get(); - l.add(task); - if(l.size() == BUFFER_SIZE) { - queues.remove(); - queues.sendToExecution(); - } - - }; - } - - void start() { - - Thread thread = new Thread() { - - public void run() { - - synchronized(queues) { - while(!isTerminated()) { - try { - ArrayList> qs = queues.getDispatchedQueues(); - for(ArrayList queue : qs) { - for(Runnable r : queue) - r.run(); - } - if(flush != null) { - for(ArrayList queue : queues.allQueues.values()) { - for(Runnable r : queue) { - r.run(); - } - queue.clear(); - } - Semaphore s = flush; - flush = null; - s.release(); - } - queues.wait(1000); - } catch (InterruptedException e) { - LOGGER.error("Unexpected interrupt", e); - } - } - } - - }; - - }; - - thread.setName("QueryListening"); - thread.start(); - - } - - private Semaphore createFlush() { - synchronized(queues) { - flush = new Semaphore(0); - queues.notify(); - return flush; - } - } - - void flush() { - try { - createFlush().acquire(); - } catch (InterruptedException e) { - LOGGER.error("Unexpected interrupt", e); - } - } - - } - - private static class RegisterParentRunnable implements Runnable { - - private final CacheEntry parent; - private final CacheEntry child; - - public RegisterParentRunnable(CacheEntry parent, CacheEntry child) { - this.parent = parent; - this.child = child; - } - - @Override - public void run() { - child.addParent(parent); - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { - System.out.println(child + " -> " + parent); - } - } - } - - } - - private static class RegisterListenerRunnable implements Runnable { - - private final QueryListening queryListening; - private final ListenerBase base; - private final Object procedure; - private final CacheEntry parent; - private final CacheEntry entry; - - public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) { - this.queryListening = queryListening; - this.base = base; - this.procedure = procedure; - this.parent = parent; - this.entry = entry; - } - - @Override - public void run() { - - assert (entry != null); - assert (procedure != null); - - ArrayList list = queryListening.listeners.get(entry); - if (list == null) { - list = new ArrayList<>(1); - queryListening.listeners.put(entry, list); - } - - ListenerEntry result = new ListenerEntry(entry, base, procedure); - // Equals is here based on base - int currentIndex = list.indexOf(result); - // There was already a listener - if(currentIndex > -1) { - ListenerEntry current = list.get(currentIndex); - if(!current.base.isDisposed()) - return; - list.set(currentIndex, result); - } else { - list.add(result); - } - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); - } - } - - queryListening.addedEntries.put(base, result); - - } - - - } QueryListening(QueryProcessor processor) { this.processor = processor; scheduler = new Scheduler(processor); - consumer = scheduler.newConsumer(); scheduler.start(); } @@ -260,7 +57,7 @@ public class QueryListening { void sync() { try { - scheduler.flush(); + scheduler.sync(); } catch (Throwable t) { LOGGER.error("Error while waiting for query dependency management", t); } @@ -276,7 +73,7 @@ public class QueryListening { if(parent != null) { try { if(!child.isImmutable(graph)) - consumer.accept(new RegisterParentRunnable(parent, child)); + scheduler.accept(new RegisterParentRunnable(parent, child)); } catch (DatabaseException e) { LOGGER.error("Error while registering query dependencies", e); } @@ -284,7 +81,7 @@ public class QueryListening { if(listener != null) if(!listener.isDisposed()) - consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child)); + scheduler.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child)); } @@ -292,10 +89,7 @@ public class QueryListening { if(base == null) return; - consumer.accept(() -> { - ListenerEntry entry = addedEntries.get(base); - if(entry != null) entry.setLastKnown(result); - }); + scheduler.accept(new RegisterFirstKnownRunnable(addedEntries, base, result)); } @@ -529,5 +323,102 @@ public class QueryListening { else return Collections.emptyList(); } - + + private static class RegisterParentRunnable implements Runnable { + + private final CacheEntry parent; + private final CacheEntry child; + + public RegisterParentRunnable(CacheEntry parent, CacheEntry child) { + this.parent = parent; + this.child = child; + } + + @Override + public void run() { + child.addParent(parent); + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { + System.out.println(child + " -> " + parent); + } + } + } + + } + + private static class RegisterListenerRunnable implements Runnable { + + private final QueryListening queryListening; + private final ListenerBase base; + private final Object procedure; + private final CacheEntry parent; + private final CacheEntry entry; + + public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) { + this.queryListening = queryListening; + this.base = base; + this.procedure = procedure; + this.parent = parent; + this.entry = entry; + } + + @Override + public void run() { + + assert (entry != null); + assert (procedure != null); + + ArrayList list = queryListening.listeners.get(entry); + if (list == null) { + list = new ArrayList<>(1); + queryListening.listeners.put(entry, list); + } + + ListenerEntry result = new ListenerEntry(entry, base, procedure); + // Equals is here based on base + int currentIndex = list.indexOf(result); + // There was already a listener + if(currentIndex > -1) { + ListenerEntry current = list.get(currentIndex); + if(!current.base.isDisposed()) + return; + list.set(currentIndex, result); + } else { + list.add(result); + } + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); + } + } + + queryListening.addedEntries.put(base, result); + + } + + + } + + private static class RegisterFirstKnownRunnable implements Runnable { + + private final Map addedEntries; + private final ListenerBase base; + private final Object result; + + public RegisterFirstKnownRunnable(Map addedEntries, ListenerBase base, Object result) { + this.addedEntries = addedEntries; + this.base = base; + this.result = result; + } + + @Override + public void run() { + ListenerEntry entry = addedEntries.get(base); + if(entry != null) entry.setLastKnown(result); + } + + } + }