- final private QueryProcessor processor;
- private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
- private boolean firingListeners = false;
- final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
- private BlockingQueue<Runnable> tasks = new ArrayBlockingQueue<Runnable>(2048);
- private Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
-
- QueryListening(QueryProcessor processor) {
- this.processor = processor;
- new DependencyManagementThread(processor, tasks).start();
- }
-
- public void sync() {
- Semaphore s = new Semaphore(0);
- try {
- tasks.put(() -> {
- s.release();
- });
- s.acquire();
- } catch (Throwable t) {
- LOGGER.error("Error while waiting for query dependency management", t);
- }
- }
-
- static class DependencyManagementThread extends Thread {
-
- final private QueryProcessor processor;
- final BlockingQueue<Runnable> tasks;
-
- DependencyManagementThread(QueryProcessor processor, BlockingQueue<Runnable> tasks) {
- setName("Query Dependency Manager");
- this.processor = processor;
- this.tasks = tasks;
- }
-
- @Override
- public void run() {
- while(processor.isAlive()) {
- try {
- Runnable r = tasks.take();
- r.run();
- } catch (Throwable t) {
- // Spurious problems?
- LOGGER.error("Error while waiting for query dependency management tasks", t);
- }
- }
- }
-
- }
-
- public boolean hasScheduledUpdates() {
- return !scheduledListeners.isEmpty();
- }
-
- void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
-
- try {
- tasks.put(() -> {
-
- if (parent != null && !inferred) {
- try {
- if(!child.isImmutable(graph))
- child.addParent(parent);
- } catch (DatabaseException e) {
- LOGGER.error("Error while registering query dependencies", e);
- }
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
- System.out.println(child + " -> " + parent);
- }
- }
- }
-
- if (listener != null)
- registerListener(child, listener, procedure);
-
- });
- } catch (InterruptedException e) {
- LOGGER.error("Error while registering dependencies", e);
- }
-
- }
-
- void registerFirstKnown(ListenerBase base, Object result) {
-
- tasks.offer(() -> {
-
- ListenerEntry entry = addedEntries.get(base);
- if(entry != null) entry.setLastKnown(result);
-
- });
-
- }
-
- public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
-
- assert (entry != null);
-
- if (base.isDisposed())
- return null;
-
- return addListener(entry, base, procedure);
-
- }
-
- /*
- * Registers a listener and returns an entry iff the entry was added
- */
- private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
-
- assert (entry != null);
- assert (procedure != null);
-
- ArrayList<ListenerEntry> list = listeners.get(entry);
- if (list == null) {
- list = new ArrayList<>(1);
- listeners.put(entry, list);
- }
-
- ListenerEntry result = new ListenerEntry(entry, base, procedure);
- // Equals is here based on base
- int currentIndex = list.indexOf(result);
- // There was already a listener
- if(currentIndex > -1) {
- ListenerEntry current = list.get(currentIndex);
- if(!current.base.isDisposed()) return null;
- list.set(currentIndex, result);
- } else {
- list.add(result);
- }
-
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- new Exception().printStackTrace();
- System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
- }
- }
-
- addedEntries.put(base, result);
-
- return result;
-
- }
-
- void scheduleListener(ListenerEntry entry) {
- assert (entry != null);
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- System.err.println("Scheduled " + entry.procedure);
- }
- }
- scheduledListeners.add(entry);
- }
-
- private void removeListener(ListenerEntry entry) {
- assert (entry != null);
- ArrayList<ListenerEntry> list = listeners.get(entry.entry);
- if(list == null) return;
- boolean success = list.remove(entry);
- assert (success);
- if (list.isEmpty())
- listeners.remove(entry.entry);
- }
-
- boolean hasListener(CacheEntry entry) {
- if(listeners.get(entry) != null) return true;
- return false;
- }
-
- boolean hasListenerAfterDisposing(CacheEntry entry) {
- if(listeners.get(entry) != null) {
- ArrayList<ListenerEntry> entries = listeners.get(entry);
- ArrayList<ListenerEntry> list = null;
- for (ListenerEntry e : entries) {
- if (e.base.isDisposed()) {
- if(list == null) list = new ArrayList<ListenerEntry>();
- list.add(e);
- }
- }
- if(list != null) {
- for (ListenerEntry e : list) {
- entries.remove(e);
- }
- }
- if (entries.isEmpty()) {
- listeners.remove(entry);
- return false;
- }
- return true;
- }
- return false;
- }
-
- List<ListenerEntry> getListenerEntries(CacheEntry entry) {
- hasListenerAfterDisposing(entry);
- if(listeners.get(entry) != null)
- return listeners.get(entry);
- else
- return Collections.emptyList();
- }
-
- void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
-
- if(!workarea.containsKey(entry)) {
-
- HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
- for(ListenerEntry e : getListenerEntries(entry))
- ls.add(e.base);
-
- workarea.put(entry, ls);
-
- for(CacheEntry parent : entry.getParents(processor)) {
- processListenerReport(parent, workarea);
- ls.addAll(workarea.get(parent));
- }
-
- }
-
- }
-
- public synchronized ListenerReport getListenerReport() throws IOException {
-
- class ListenerReportImpl implements ListenerReport {
-
- Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
-
- @Override
- public void print(PrintStream b) {
- Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
- for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
- for(ListenerBase l : e.getValue()) {
- Integer i = hist.get(l);
- hist.put(l, i != null ? i-1 : -1);
- }
- }
-
- for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
- b.print("" + -p.second + " " + p.first + "\n");
- }
-
- b.flush();
- }
-
- }
-
- ListenerReportImpl result = new ListenerReportImpl();
-
- Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
- for(CacheEntryBase entry : all) {
- hasListenerAfterDisposing(entry);
- }
- for(CacheEntryBase entry : all) {
- processListenerReport(entry, result.workarea);
- }
-
- return result;
-
- }
-
- public synchronized String reportListeners(File file) throws IOException {
-
- if (!processor.isAlive())
- return "Disposed!";
-
- PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
- ListenerReport report = getListenerReport();
- report.print(b);
-
- return "Done reporting listeners.";
-
- }
-
- public void fireListeners(WriteGraphImpl graph) {
-
- assert (!processor.updating);
- assert (!processor.cache.collecting);
- assert (!firingListeners);
-
- firingListeners = true;
-
- try {
-
- // Performing may cause further events to be scheduled.
- while (!scheduledListeners.isEmpty()) {
-
- // Clone current events to make new entries possible during
- // firing.
- THashSet<ListenerEntry> entries = scheduledListeners;
- scheduledListeners = new THashSet<ListenerEntry>();
+ final private QueryProcessor processor;
+ private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
+ private boolean firingListeners = false;
+ final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
+ private EventBusImpl eventBus;
+ private Consumer<Runnable> consumer;
+ private Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
+
+ int execCount=0;
+ int queueCount = 0;
+
+ static class TL extends ThreadLocal<ArrayList<Runnable>> {
+
+ Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
+ ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
+
+ @Override
+ protected synchronized ArrayList<Runnable> initialValue() {
+ ArrayList<Runnable> result = new ArrayList<>();
+ allQueues.put(Thread.currentThread(), result);
+ return result;
+ }
+
+ synchronized void sendToExecution() {
+ ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
+ dispatchedQueues.add(rs);
+ notify();
+ }
+
+ synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
+ ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
+ dispatchedQueues = new ArrayList<>();
+ return result;
+ }
+
+ }
+
+ static class EventBusImpl {
+
+ private static final int BUFFER_SIZE = 100;
+
+ TL queues = new TL();
+ Semaphore flush = null;
+
+ public boolean isTerminated() {
+ return false;
+ }
+
+ public Consumer<Runnable> newConsumer() {
+ return new Consumer<Runnable>() {
+
+ @Override
+ public void accept(Runnable arg0) {
+ ArrayList<Runnable> l = queues.get();
+ l.add(arg0);
+ if(l.size() == BUFFER_SIZE) {
+ queues.remove();
+ queues.sendToExecution();
+ }
+ }
+ };
+ }
+
+ public void shutdown() {
+ }
+
+ public void start(ThreadFactory arg1) {
+ arg1.newThread(new Runnable() {
+
+ @Override
+ public void run() {
+ synchronized(queues) {
+ while(!isTerminated()) {
+ try {
+ ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
+ for(ArrayList<Runnable> queue : qs) {
+ for(Runnable r : queue)
+ r.run();
+ }
+ if(flush != null) {
+ for(ArrayList<Runnable> queue : queues.allQueues.values()) {
+ for(Runnable r : queue) {
+ r.run();
+ }
+ queue.clear();
+ }
+ Semaphore s = flush;
+ flush = null;
+ s.release();
+ }
+ queues.wait(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ }).start();
+ }
+
+ void flush() {
+ synchronized(queues) {
+ flush = new Semaphore(0);
+ queues.notify();
+ }
+ try {
+ flush.acquire();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ QueryListening(QueryProcessor processor) {
+ this.processor = processor;
+ eventBus = new EventBusImpl();
+ consumer = eventBus.newConsumer();
+ eventBus.start(new ThreadFactory()
+ {
+ @Override
+ public Thread newThread(Runnable r) {
+ System.err.println("new Thread " + r);
+ return new Thread(r, "QueryDependencyManager");
+ }
+ });
+ }
+
+ public void sync() {
+ try {
+ eventBus.flush();
+ } catch (Throwable t) {
+ LOGGER.error("Error while waiting for query dependency management", t);
+ }
+ }
+
+ public boolean hasScheduledUpdates() {
+ return !scheduledListeners.isEmpty();
+ }
+
+ public synchronized void dispatch(Runnable r) {
+ queueCount++;
+ consumer.accept(r);
+ }
+
+ void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
+
+ dispatch(() -> {
+
+ if (parent != null && !inferred) {
+ try {
+ if(!child.isImmutable(graph))
+ child.addParent(parent);
+ } catch (DatabaseException e) {
+ LOGGER.error("Error while registering query dependencies", e);
+ }
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+ System.out.println(child + " -> " + parent);
+ }
+ }
+ }
+
+ if (listener != null)
+ registerListener(child, listener, procedure);
+
+ });
+
+ }
+
+ void registerFirstKnown(ListenerBase base, Object result) {
+
+ if(base == null) return;
+
+ dispatch(() -> {
+
+ ListenerEntry entry = addedEntries.get(base);
+ if(entry != null) entry.setLastKnown(result);
+
+ });
+
+ }
+
+ public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
+
+ assert (entry != null);
+
+ if (base.isDisposed())
+ return null;
+
+ return addListener(entry, base, procedure);
+
+ }
+
+ /*
+ * Registers a listener and returns an entry iff the entry was added
+ */
+ private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
+
+ assert (entry != null);
+ assert (procedure != null);
+
+ ArrayList<ListenerEntry> list = listeners.get(entry);
+ if (list == null) {
+ list = new ArrayList<>(1);
+ listeners.put(entry, list);
+ }
+
+ ListenerEntry result = new ListenerEntry(entry, base, procedure);
+ // Equals is here based on base
+ int currentIndex = list.indexOf(result);
+ // There was already a listener
+ if(currentIndex > -1) {
+ ListenerEntry current = list.get(currentIndex);
+ if(!current.base.isDisposed()) return null;
+ list.set(currentIndex, result);
+ } else {
+ list.add(result);
+ }
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
+ }
+ }
+
+ addedEntries.put(base, result);
+
+ return result;
+
+ }
+
+ void scheduleListener(ListenerEntry entry) {
+ assert (entry != null);
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ System.err.println("Scheduled " + entry.procedure);
+ }
+ }
+ scheduledListeners.add(entry);
+ }
+
+ private void removeListener(ListenerEntry entry) {
+ assert (entry != null);
+ ArrayList<ListenerEntry> list = listeners.get(entry.entry);
+ if(list == null) return;
+ boolean success = list.remove(entry);
+ assert (success);
+ if (list.isEmpty())
+ listeners.remove(entry.entry);
+ }
+
+ boolean hasListener(CacheEntry entry) {
+ if(listeners.get(entry) != null) return true;
+ return false;
+ }
+
+ boolean hasListenerAfterDisposing(CacheEntry entry) {
+ if(listeners.get(entry) != null) {
+ ArrayList<ListenerEntry> entries = listeners.get(entry);
+ ArrayList<ListenerEntry> list = null;
+ for (ListenerEntry e : entries) {
+ if (e.base.isDisposed()) {
+ if(list == null) list = new ArrayList<ListenerEntry>();
+ list.add(e);
+ }
+ }
+ if(list != null) {
+ for (ListenerEntry e : list) {
+ entries.remove(e);
+ }
+ }
+ if (entries.isEmpty()) {
+ listeners.remove(entry);
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ List<ListenerEntry> getListenerEntries(CacheEntry entry) {
+ hasListenerAfterDisposing(entry);
+ if(listeners.get(entry) != null)
+ return listeners.get(entry);
+ else
+ return Collections.emptyList();
+ }
+
+ void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
+
+ if(!workarea.containsKey(entry)) {
+
+ HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
+ for(ListenerEntry e : getListenerEntries(entry))
+ ls.add(e.base);
+
+ workarea.put(entry, ls);
+
+ for(CacheEntry parent : entry.getParents(processor)) {
+ processListenerReport(parent, workarea);
+ ls.addAll(workarea.get(parent));
+ }
+
+ }
+
+ }
+
+ public synchronized ListenerReport getListenerReport() throws IOException {
+
+ class ListenerReportImpl implements ListenerReport {
+
+ Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
+
+ @Override
+ public void print(PrintStream b) {
+ Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
+ for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
+ for(ListenerBase l : e.getValue()) {
+ Integer i = hist.get(l);
+ hist.put(l, i != null ? i-1 : -1);
+ }
+ }
+
+ for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
+ b.print("" + -p.second + " " + p.first + "\n");
+ }
+
+ b.flush();
+ }
+
+ }
+
+ ListenerReportImpl result = new ListenerReportImpl();
+
+ Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
+ for(CacheEntryBase entry : all) {
+ hasListenerAfterDisposing(entry);
+ }
+ for(CacheEntryBase entry : all) {
+ processListenerReport(entry, result.workarea);
+ }
+
+ return result;
+
+ }
+
+ public synchronized String reportListeners(File file) throws IOException {
+
+ if (!processor.isAlive())
+ return "Disposed!";
+
+ PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
+ ListenerReport report = getListenerReport();
+ report.print(b);
+
+ return "Done reporting listeners.";
+
+ }
+
+ public void fireListeners(WriteGraphImpl graph) {
+
+ assert (!processor.updating);
+ assert (!processor.cache.collecting);
+ assert (!firingListeners);
+
+ firingListeners = true;
+
+ try {