X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryListening.java;fp=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryListening.java;h=b873bfd73ef8f9b97773977b2459d7cdb502e9c6;hb=63bb6d595c37b3a2fb55e07fb810779cae3b4d03;hp=9daab950e3964a8681a12fabc5ef0b2cfaf74ceb;hpb=26b755c7e98b7bb3d9038abba139bef0e71f6607;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java index 9daab950e..b873bfd73 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java @@ -14,9 +14,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.simantics.databoard.Bindings; import org.simantics.db.DevelopmentKeys; @@ -37,372 +38,462 @@ public class QueryListening { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class); - final private QueryProcessor processor; - private THashSet scheduledListeners = new THashSet(); - private boolean firingListeners = false; - final THashMap> listeners = new THashMap>(10, 0.75f); - private BlockingQueue tasks = new ArrayBlockingQueue(2048); - private Map addedEntries = new HashMap<>(); - - QueryListening(QueryProcessor processor) { - this.processor = processor; - new DependencyManagementThread(processor, tasks).start(); - } - - public void sync() { - Semaphore s = new Semaphore(0); - try { - tasks.put(() -> { - s.release(); - }); - s.acquire(); - } catch (Throwable t) { - LOGGER.error("Error while waiting for query dependency management", t); - } - } - - static class DependencyManagementThread extends Thread { - - final private QueryProcessor processor; - final BlockingQueue tasks; - - DependencyManagementThread(QueryProcessor processor, BlockingQueue tasks) { - setName("Query Dependency Manager"); - this.processor = processor; - this.tasks = tasks; - } - - @Override - public void run() { - while(processor.isAlive()) { - try { - Runnable r = tasks.take(); - r.run(); - } catch (Throwable t) { - // Spurious problems? - LOGGER.error("Error while waiting for query dependency management tasks", t); - } - } - } - - } - - public boolean hasScheduledUpdates() { - return !scheduledListeners.isEmpty(); - } - - void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { - - try { - tasks.put(() -> { - - if (parent != null && !inferred) { - try { - if(!child.isImmutable(graph)) - child.addParent(parent); - } catch (DatabaseException e) { - LOGGER.error("Error while registering query dependencies", e); - } - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { - System.out.println(child + " -> " + parent); - } - } - } - - if (listener != null) - registerListener(child, listener, procedure); - - }); - } catch (InterruptedException e) { - LOGGER.error("Error while registering dependencies", e); - } - - } - - void registerFirstKnown(ListenerBase base, Object result) { - - tasks.offer(() -> { - - ListenerEntry entry = addedEntries.get(base); - if(entry != null) entry.setLastKnown(result); - - }); - - } - - public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { - - assert (entry != null); - - if (base.isDisposed()) - return null; - - return addListener(entry, base, procedure); - - } - - /* - * Registers a listener and returns an entry iff the entry was added - */ - private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { - - assert (entry != null); - assert (procedure != null); - - ArrayList list = listeners.get(entry); - if (list == null) { - list = new ArrayList<>(1); - listeners.put(entry, list); - } - - ListenerEntry result = new ListenerEntry(entry, base, procedure); - // Equals is here based on base - int currentIndex = list.indexOf(result); - // There was already a listener - if(currentIndex > -1) { - ListenerEntry current = list.get(currentIndex); - if(!current.base.isDisposed()) return null; - list.set(currentIndex, result); - } else { - list.add(result); - } - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); - } - } - - addedEntries.put(base, result); - - return result; - - } - - void scheduleListener(ListenerEntry entry) { - assert (entry != null); - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Scheduled " + entry.procedure); - } - } - scheduledListeners.add(entry); - } - - private void removeListener(ListenerEntry entry) { - assert (entry != null); - ArrayList list = listeners.get(entry.entry); - if(list == null) return; - boolean success = list.remove(entry); - assert (success); - if (list.isEmpty()) - listeners.remove(entry.entry); - } - - boolean hasListener(CacheEntry entry) { - if(listeners.get(entry) != null) return true; - return false; - } - - boolean hasListenerAfterDisposing(CacheEntry entry) { - if(listeners.get(entry) != null) { - ArrayList entries = listeners.get(entry); - ArrayList list = null; - for (ListenerEntry e : entries) { - if (e.base.isDisposed()) { - if(list == null) list = new ArrayList(); - list.add(e); - } - } - if(list != null) { - for (ListenerEntry e : list) { - entries.remove(e); - } - } - if (entries.isEmpty()) { - listeners.remove(entry); - return false; - } - return true; - } - return false; - } - - List getListenerEntries(CacheEntry entry) { - hasListenerAfterDisposing(entry); - if(listeners.get(entry) != null) - return listeners.get(entry); - else - return Collections.emptyList(); - } - - void processListenerReport(CacheEntry entry, Map> workarea) { - - if(!workarea.containsKey(entry)) { - - HashSet ls = new HashSet(); - for(ListenerEntry e : getListenerEntries(entry)) - ls.add(e.base); - - workarea.put(entry, ls); - - for(CacheEntry parent : entry.getParents(processor)) { - processListenerReport(parent, workarea); - ls.addAll(workarea.get(parent)); - } - - } - - } - - public synchronized ListenerReport getListenerReport() throws IOException { - - class ListenerReportImpl implements ListenerReport { - - Map> workarea = new HashMap>(); - - @Override - public void print(PrintStream b) { - Map hist = new HashMap(); - for(Map.Entry> e : workarea.entrySet()) { - for(ListenerBase l : e.getValue()) { - Integer i = hist.get(l); - hist.put(l, i != null ? i-1 : -1); - } - } - - for(Pair p : CollectionUtils.valueSortedEntries(hist)) { - b.print("" + -p.second + " " + p.first + "\n"); - } - - b.flush(); - } - - } - - ListenerReportImpl result = new ListenerReportImpl(); - - Collection all = processor.allCaches(new CacheCollectionResult()).toCollection(); - for(CacheEntryBase entry : all) { - hasListenerAfterDisposing(entry); - } - for(CacheEntryBase entry : all) { - processListenerReport(entry, result.workarea); - } - - return result; - - } - - public synchronized String reportListeners(File file) throws IOException { - - if (!processor.isAlive()) - return "Disposed!"; - - PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); - ListenerReport report = getListenerReport(); - report.print(b); - - return "Done reporting listeners."; - - } - - public void fireListeners(WriteGraphImpl graph) { - - assert (!processor.updating); - assert (!processor.cache.collecting); - assert (!firingListeners); - - firingListeners = true; - - try { - - // Performing may cause further events to be scheduled. - while (!scheduledListeners.isEmpty()) { - - // Clone current events to make new entries possible during - // firing. - THashSet entries = scheduledListeners; - scheduledListeners = new THashSet(); + final private QueryProcessor processor; + private THashSet scheduledListeners = new THashSet(); + private boolean firingListeners = false; + final THashMap> listeners = new THashMap>(10, 0.75f); + private EventBusImpl eventBus; + private Consumer consumer; + private Map addedEntries = new HashMap<>(); + + int execCount=0; + int queueCount = 0; + + static class TL extends ThreadLocal> { + + Map> allQueues = new HashMap<>(); + ArrayList> dispatchedQueues = new ArrayList<>(); + + @Override + protected synchronized ArrayList initialValue() { + ArrayList result = new ArrayList<>(); + allQueues.put(Thread.currentThread(), result); + return result; + } + + synchronized void sendToExecution() { + ArrayList rs = allQueues.remove(Thread.currentThread()); + dispatchedQueues.add(rs); + notify(); + } + + synchronized ArrayList> getDispatchedQueues() { + ArrayList> result = dispatchedQueues; + dispatchedQueues = new ArrayList<>(); + return result; + } + + } + + static class EventBusImpl { + + private static final int BUFFER_SIZE = 100; + + TL queues = new TL(); + Semaphore flush = null; + + public boolean isTerminated() { + return false; + } + + public Consumer newConsumer() { + return new Consumer() { + + @Override + public void accept(Runnable arg0) { + ArrayList l = queues.get(); + l.add(arg0); + if(l.size() == BUFFER_SIZE) { + queues.remove(); + queues.sendToExecution(); + } + } + }; + } + + public void shutdown() { + } + + public void start(ThreadFactory arg1) { + arg1.newThread(new Runnable() { + + @Override + public void run() { + synchronized(queues) { + while(!isTerminated()) { + try { + ArrayList> qs = queues.getDispatchedQueues(); + for(ArrayList queue : qs) { + for(Runnable r : queue) + r.run(); + } + if(flush != null) { + for(ArrayList queue : queues.allQueues.values()) { + for(Runnable r : queue) { + r.run(); + } + queue.clear(); + } + Semaphore s = flush; + flush = null; + s.release(); + } + queues.wait(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + }).start(); + } + + void flush() { + synchronized(queues) { + flush = new Semaphore(0); + queues.notify(); + } + try { + flush.acquire(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } + + QueryListening(QueryProcessor processor) { + this.processor = processor; + eventBus = new EventBusImpl(); + consumer = eventBus.newConsumer(); + eventBus.start(new ThreadFactory() + { + @Override + public Thread newThread(Runnable r) { + System.err.println("new Thread " + r); + return new Thread(r, "QueryDependencyManager"); + } + }); + } + + public void sync() { + try { + eventBus.flush(); + } catch (Throwable t) { + LOGGER.error("Error while waiting for query dependency management", t); + } + } + + public boolean hasScheduledUpdates() { + return !scheduledListeners.isEmpty(); + } + + public synchronized void dispatch(Runnable r) { + queueCount++; + consumer.accept(r); + } + + void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { + + dispatch(() -> { + + if (parent != null && !inferred) { + try { + if(!child.isImmutable(graph)) + child.addParent(parent); + } catch (DatabaseException e) { + LOGGER.error("Error while registering query dependencies", e); + } + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { + System.out.println(child + " -> " + parent); + } + } + } + + if (listener != null) + registerListener(child, listener, procedure); + + }); + + } + + void registerFirstKnown(ListenerBase base, Object result) { + + if(base == null) return; + + dispatch(() -> { + + ListenerEntry entry = addedEntries.get(base); + if(entry != null) entry.setLastKnown(result); + + }); + + } + + public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { + + assert (entry != null); + + if (base.isDisposed()) + return null; + + return addListener(entry, base, procedure); + + } + + /* + * Registers a listener and returns an entry iff the entry was added + */ + private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { + + assert (entry != null); + assert (procedure != null); + + ArrayList list = listeners.get(entry); + if (list == null) { + list = new ArrayList<>(1); + listeners.put(entry, list); + } + + ListenerEntry result = new ListenerEntry(entry, base, procedure); + // Equals is here based on base + int currentIndex = list.indexOf(result); + // There was already a listener + if(currentIndex > -1) { + ListenerEntry current = list.get(currentIndex); + if(!current.base.isDisposed()) return null; + list.set(currentIndex, result); + } else { + list.add(result); + } + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); + } + } + + addedEntries.put(base, result); + + return result; + + } + + void scheduleListener(ListenerEntry entry) { + assert (entry != null); + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Scheduled " + entry.procedure); + } + } + scheduledListeners.add(entry); + } + + private void removeListener(ListenerEntry entry) { + assert (entry != null); + ArrayList list = listeners.get(entry.entry); + if(list == null) return; + boolean success = list.remove(entry); + assert (success); + if (list.isEmpty()) + listeners.remove(entry.entry); + } + + boolean hasListener(CacheEntry entry) { + if(listeners.get(entry) != null) return true; + return false; + } + + boolean hasListenerAfterDisposing(CacheEntry entry) { + if(listeners.get(entry) != null) { + ArrayList entries = listeners.get(entry); + ArrayList list = null; + for (ListenerEntry e : entries) { + if (e.base.isDisposed()) { + if(list == null) list = new ArrayList(); + list.add(e); + } + } + if(list != null) { + for (ListenerEntry e : list) { + entries.remove(e); + } + } + if (entries.isEmpty()) { + listeners.remove(entry); + return false; + } + return true; + } + return false; + } + + List getListenerEntries(CacheEntry entry) { + hasListenerAfterDisposing(entry); + if(listeners.get(entry) != null) + return listeners.get(entry); + else + return Collections.emptyList(); + } + + void processListenerReport(CacheEntry entry, Map> workarea) { + + if(!workarea.containsKey(entry)) { + + HashSet ls = new HashSet(); + for(ListenerEntry e : getListenerEntries(entry)) + ls.add(e.base); + + workarea.put(entry, ls); + + for(CacheEntry parent : entry.getParents(processor)) { + processListenerReport(parent, workarea); + ls.addAll(workarea.get(parent)); + } + + } + + } + + public synchronized ListenerReport getListenerReport() throws IOException { + + class ListenerReportImpl implements ListenerReport { + + Map> workarea = new HashMap>(); + + @Override + public void print(PrintStream b) { + Map hist = new HashMap(); + for(Map.Entry> e : workarea.entrySet()) { + for(ListenerBase l : e.getValue()) { + Integer i = hist.get(l); + hist.put(l, i != null ? i-1 : -1); + } + } + + for(Pair p : CollectionUtils.valueSortedEntries(hist)) { + b.print("" + -p.second + " " + p.first + "\n"); + } + + b.flush(); + } + + } + + ListenerReportImpl result = new ListenerReportImpl(); + + Collection all = processor.allCaches(new CacheCollectionResult()).toCollection(); + for(CacheEntryBase entry : all) { + hasListenerAfterDisposing(entry); + } + for(CacheEntryBase entry : all) { + processListenerReport(entry, result.workarea); + } + + return result; + + } + + public synchronized String reportListeners(File file) throws IOException { + + if (!processor.isAlive()) + return "Disposed!"; + + PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); + ListenerReport report = getListenerReport(); + report.print(b); + + return "Done reporting listeners."; + + } + + public void fireListeners(WriteGraphImpl graph) { + + assert (!processor.updating); + assert (!processor.cache.collecting); + assert (!firingListeners); + + firingListeners = true; + + try { - ArrayList schedule = new ArrayList(); + // Performing may cause further events to be scheduled. + while (!scheduledListeners.isEmpty()) { - for (ListenerEntry listenerEntry : entries) { + // Clone current events to make new entries possible during + // firing. + THashSet entries = scheduledListeners; + scheduledListeners = new THashSet(); - if (pruneListener(listenerEntry)) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Pruned " + listenerEntry.procedure); - } - } - continue; - } + ArrayList schedule = new ArrayList(); - final CacheEntry entry = listenerEntry.entry; - assert (entry != null); + for (ListenerEntry listenerEntry : entries) { - Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown()); + if (pruneListener(listenerEntry)) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("Pruned " + listenerEntry.procedure); + } + } + continue; + } + + final CacheEntry entry = listenerEntry.entry; + assert (entry != null); + + Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown()); + + if (newValue != ListenerEntry.NOT_CHANGED) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); + } + } + schedule.add(listenerEntry); + listenerEntry.setLastKnown(entry.getResult()); + } + + } + + for(ListenerEntry listenerEntry : schedule) { + final CacheEntry entry = listenerEntry.entry; + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Firing " + listenerEntry.procedure); + } + } + try { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); + } + } + entry.performFromCache(graph, listenerEntry.procedure); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + } + + } finally { + firingListeners = false; + } + + } + + void updateParents(int indent, CacheEntry entry, LinkedList todo) { + + Iterable oldParents = entry.getParents(processor); + for (CacheEntry parent : oldParents) { + if(!parent.isDiscarded()) + todo.push(new UpdateEntry(entry, parent, indent + 2)); + } + + } + + private boolean pruneListener(ListenerEntry entry) { + if (entry.base.isDisposed()) { + removeListener(entry); + return true; + } else { + return false; + } + } - if (newValue != ListenerEntry.NOT_CHANGED) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); - } - } - schedule.add(listenerEntry); - listenerEntry.setLastKnown(entry.getResult()); - } - - } - - for(ListenerEntry listenerEntry : schedule) { - final CacheEntry entry = listenerEntry.entry; - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure); - } - } - try { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); - } - } - entry.performFromCache(graph, listenerEntry.procedure); - } catch (Throwable t) { - t.printStackTrace(); - } - } - - } - - } finally { - firingListeners = false; - } - - } - - void updateParents(int indent, CacheEntry entry, LinkedList todo) { - - Iterable oldParents = entry.getParents(processor); - for (CacheEntry parent : oldParents) { - if(!parent.isDiscarded()) - todo.push(new UpdateEntry(entry, parent, indent + 2)); - } - - } - - private boolean pruneListener(ListenerEntry entry) { - if (entry.base.isDisposed()) { - removeListener(entry); - return true; - } else { - return false; - } - } - }