import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
+import java.util.function.Consumer;
import org.simantics.databoard.Bindings;
import org.simantics.db.DevelopmentKeys;
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
- final private QueryProcessor processor;
- private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
- private boolean firingListeners = false;
- final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
- private BlockingQueue<Runnable> tasks = new ArrayBlockingQueue<Runnable>(2048);
- private Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
-
- QueryListening(QueryProcessor processor) {
- this.processor = processor;
- new DependencyManagementThread(processor, tasks).start();
- }
-
- public void sync() {
- Semaphore s = new Semaphore(0);
- try {
- tasks.put(() -> {
- s.release();
- });
- s.acquire();
- } catch (Throwable t) {
- LOGGER.error("Error while waiting for query dependency management", t);
- }
- }
-
- static class DependencyManagementThread extends Thread {
-
- final private QueryProcessor processor;
- final BlockingQueue<Runnable> tasks;
-
- DependencyManagementThread(QueryProcessor processor, BlockingQueue<Runnable> tasks) {
- setName("Query Dependency Manager");
- this.processor = processor;
- this.tasks = tasks;
- }
-
- @Override
- public void run() {
- while(processor.isAlive()) {
- try {
- Runnable r = tasks.take();
- r.run();
- } catch (Throwable t) {
- // Spurious problems?
- LOGGER.error("Error while waiting for query dependency management tasks", t);
- }
- }
- }
-
- }
-
- public boolean hasScheduledUpdates() {
- return !scheduledListeners.isEmpty();
- }
-
- void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
-
- try {
- tasks.put(() -> {
-
- if (parent != null && !inferred) {
- try {
- if(!child.isImmutable(graph))
- child.addParent(parent);
- } catch (DatabaseException e) {
- LOGGER.error("Error while registering query dependencies", e);
- }
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
- System.out.println(child + " -> " + parent);
- }
- }
- }
-
- if (listener != null)
- registerListener(child, listener, procedure);
-
- });
- } catch (InterruptedException e) {
- LOGGER.error("Error while registering dependencies", e);
- }
-
- }
-
- void registerFirstKnown(ListenerBase base, Object result) {
-
- tasks.offer(() -> {
-
- ListenerEntry entry = addedEntries.get(base);
- if(entry != null) entry.setLastKnown(result);
-
- });
-
- }
-
- public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
-
- assert (entry != null);
-
- if (base.isDisposed())
- return null;
-
- return addListener(entry, base, procedure);
-
- }
-
- /*
- * Registers a listener and returns an entry iff the entry was added
- */
- private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
-
- assert (entry != null);
- assert (procedure != null);
-
- ArrayList<ListenerEntry> list = listeners.get(entry);
- if (list == null) {
- list = new ArrayList<>(1);
- listeners.put(entry, list);
- }
-
- ListenerEntry result = new ListenerEntry(entry, base, procedure);
- // Equals is here based on base
- int currentIndex = list.indexOf(result);
- // There was already a listener
- if(currentIndex > -1) {
- ListenerEntry current = list.get(currentIndex);
- if(!current.base.isDisposed()) return null;
- list.set(currentIndex, result);
- } else {
- list.add(result);
- }
-
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- new Exception().printStackTrace();
- System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
- }
- }
-
- addedEntries.put(base, result);
-
- return result;
-
- }
-
- void scheduleListener(ListenerEntry entry) {
- assert (entry != null);
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- System.err.println("Scheduled " + entry.procedure);
- }
- }
- scheduledListeners.add(entry);
- }
-
- private void removeListener(ListenerEntry entry) {
- assert (entry != null);
- ArrayList<ListenerEntry> list = listeners.get(entry.entry);
- if(list == null) return;
- boolean success = list.remove(entry);
- assert (success);
- if (list.isEmpty())
- listeners.remove(entry.entry);
- }
-
- boolean hasListener(CacheEntry entry) {
- if(listeners.get(entry) != null) return true;
- return false;
- }
-
- boolean hasListenerAfterDisposing(CacheEntry entry) {
- if(listeners.get(entry) != null) {
- ArrayList<ListenerEntry> entries = listeners.get(entry);
- ArrayList<ListenerEntry> list = null;
- for (ListenerEntry e : entries) {
- if (e.base.isDisposed()) {
- if(list == null) list = new ArrayList<ListenerEntry>();
- list.add(e);
- }
- }
- if(list != null) {
- for (ListenerEntry e : list) {
- entries.remove(e);
- }
- }
- if (entries.isEmpty()) {
- listeners.remove(entry);
- return false;
- }
- return true;
- }
- return false;
- }
-
- List<ListenerEntry> getListenerEntries(CacheEntry entry) {
- hasListenerAfterDisposing(entry);
- if(listeners.get(entry) != null)
- return listeners.get(entry);
- else
- return Collections.emptyList();
- }
-
- void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
-
- if(!workarea.containsKey(entry)) {
-
- HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
- for(ListenerEntry e : getListenerEntries(entry))
- ls.add(e.base);
-
- workarea.put(entry, ls);
-
- for(CacheEntry parent : entry.getParents(processor)) {
- processListenerReport(parent, workarea);
- ls.addAll(workarea.get(parent));
- }
-
- }
-
- }
-
- public synchronized ListenerReport getListenerReport() throws IOException {
-
- class ListenerReportImpl implements ListenerReport {
-
- Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
-
- @Override
- public void print(PrintStream b) {
- Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
- for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
- for(ListenerBase l : e.getValue()) {
- Integer i = hist.get(l);
- hist.put(l, i != null ? i-1 : -1);
- }
- }
-
- for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
- b.print("" + -p.second + " " + p.first + "\n");
- }
-
- b.flush();
- }
-
- }
-
- ListenerReportImpl result = new ListenerReportImpl();
-
- Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
- for(CacheEntryBase entry : all) {
- hasListenerAfterDisposing(entry);
- }
- for(CacheEntryBase entry : all) {
- processListenerReport(entry, result.workarea);
- }
-
- return result;
-
- }
-
- public synchronized String reportListeners(File file) throws IOException {
-
- if (!processor.isAlive())
- return "Disposed!";
-
- PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
- ListenerReport report = getListenerReport();
- report.print(b);
-
- return "Done reporting listeners.";
-
- }
-
- public void fireListeners(WriteGraphImpl graph) {
-
- assert (!processor.updating);
- assert (!processor.cache.collecting);
- assert (!firingListeners);
-
- firingListeners = true;
-
- try {
-
- // Performing may cause further events to be scheduled.
- while (!scheduledListeners.isEmpty()) {
-
- // Clone current events to make new entries possible during
- // firing.
- THashSet<ListenerEntry> entries = scheduledListeners;
- scheduledListeners = new THashSet<ListenerEntry>();
-
- ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
-
- for (ListenerEntry listenerEntry : entries) {
-
- if (pruneListener(listenerEntry)) {
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- new Exception().printStackTrace();
- System.err.println("Pruned " + listenerEntry.procedure);
- }
- }
- continue;
- }
-
- final CacheEntry entry = listenerEntry.entry;
- assert (entry != null);
-
- Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
-
- if (newValue != ListenerEntry.NOT_CHANGED) {
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- new Exception().printStackTrace();
- System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
- }
- }
- schedule.add(listenerEntry);
- listenerEntry.setLastKnown(entry.getResult());
- }
-
- }
-
- for(ListenerEntry listenerEntry : schedule) {
- final CacheEntry entry = listenerEntry.entry;
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- System.err.println("Firing " + listenerEntry.procedure);
- }
- }
- try {
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
- System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
- }
- }
- entry.performFromCache(graph, listenerEntry.procedure);
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
-
- }
-
- } finally {
- firingListeners = false;
- }
-
- }
-
- void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
-
- Iterable<CacheEntry> oldParents = entry.getParents(processor);
- for (CacheEntry parent : oldParents) {
- if(!parent.isDiscarded())
- todo.push(new UpdateEntry(entry, parent, indent + 2));
- }
-
- }
-
- private boolean pruneListener(ListenerEntry entry) {
- if (entry.base.isDisposed()) {
- removeListener(entry);
- return true;
- } else {
- return false;
- }
- }
-
+ private final QueryProcessor processor;
+ private final Scheduler scheduler;
+ private final Consumer<Runnable> consumer;
+ private final Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
+
+ private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
+ private boolean firingListeners = false;
+
+ final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
+
+ private static class ThreadQueue extends ThreadLocal<ArrayList<Runnable>> {
+
+ private final Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
+
+ private ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
+
+ @Override
+ protected synchronized ArrayList<Runnable> initialValue() {
+ ArrayList<Runnable> result = new ArrayList<>();
+ allQueues.put(Thread.currentThread(), result);
+ return result;
+ }
+
+ synchronized void sendToExecution() {
+ ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
+ dispatchedQueues.add(rs);
+ notify();
+ }
+
+ synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
+ ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
+ dispatchedQueues = new ArrayList<>();
+ return result;
+ }
+
+ }
+
+ private static class Scheduler {
+
+ private static final int BUFFER_SIZE = 100;
+
+ private final QueryProcessor processor;
+
+ private final ThreadQueue queues = new ThreadQueue();
+
+ /*
+ * Access to this field is synchronized using the monitor of 'queues'
+ * The method flush() shall be called by the single writing thread and the field has been nulled before the
+ * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time.
+ */
+ private Semaphore flush = null;
+
+ Scheduler(QueryProcessor processor) {
+ this.processor = processor;
+ }
+
+ private boolean isTerminated() {
+ return processor.isDisposed();
+ }
+
+ Consumer<Runnable> newConsumer() {
+ return (task) -> {
+
+ ArrayList<Runnable> l = queues.get();
+ l.add(task);
+ if(l.size() == BUFFER_SIZE) {
+ queues.remove();
+ queues.sendToExecution();
+ }
+
+ };
+ }
+
+ void start() {
+
+ Thread thread = new Thread() {
+
+ public void run() {
+
+ synchronized(queues) {
+ while(!isTerminated()) {
+ try {
+ ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
+ for(ArrayList<Runnable> queue : qs) {
+ for(Runnable r : queue)
+ r.run();
+ }
+ if(flush != null) {
+ for(ArrayList<Runnable> queue : queues.allQueues.values()) {
+ for(Runnable r : queue) {
+ r.run();
+ }
+ queue.clear();
+ }
+ Semaphore s = flush;
+ flush = null;
+ s.release();
+ }
+ queues.wait(1000);
+ } catch (InterruptedException e) {
+ LOGGER.error("Unexpected interrupt", e);
+ }
+ }
+ }
+
+ };
+
+ };
+
+ thread.setName("QueryListening");
+ thread.start();
+
+ }
+
+ private Semaphore createFlush() {
+ synchronized(queues) {
+ flush = new Semaphore(0);
+ queues.notify();
+ return flush;
+ }
+ }
+
+ void flush() {
+ try {
+ createFlush().acquire();
+ } catch (InterruptedException e) {
+ LOGGER.error("Unexpected interrupt", e);
+ }
+ }
+
+ }
+
+ private static class RegisterParentRunnable implements Runnable {
+
+ private final CacheEntry parent;
+ private final CacheEntry child;
+
+ public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
+ this.parent = parent;
+ this.child = child;
+ }
+
+ @Override
+ public void run() {
+ child.addParent(parent);
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+ System.out.println(child + " -> " + parent);
+ }
+ }
+ }
+
+ }
+
+ private static class RegisterListenerRunnable implements Runnable {
+
+ private final QueryListening queryListening;
+ private final ListenerBase base;
+ private final Object procedure;
+ private final CacheEntry parent;
+ private final CacheEntry entry;
+
+ public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
+ this.queryListening = queryListening;
+ this.base = base;
+ this.procedure = procedure;
+ this.parent = parent;
+ this.entry = entry;
+ }
+
+ @Override
+ public void run() {
+
+ assert (entry != null);
+ assert (procedure != null);
+
+ ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
+ if (list == null) {
+ list = new ArrayList<>(1);
+ queryListening.listeners.put(entry, list);
+ }
+
+ ListenerEntry result = new ListenerEntry(entry, base, procedure);
+ // Equals is here based on base
+ int currentIndex = list.indexOf(result);
+ // There was already a listener
+ if(currentIndex > -1) {
+ ListenerEntry current = list.get(currentIndex);
+ if(!current.base.isDisposed())
+ return;
+ list.set(currentIndex, result);
+ } else {
+ list.add(result);
+ }
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
+ }
+ }
+
+ queryListening.addedEntries.put(base, result);
+
+ }
+
+
+ }
+
+ QueryListening(QueryProcessor processor) {
+
+ this.processor = processor;
+ scheduler = new Scheduler(processor);
+ consumer = scheduler.newConsumer();
+ scheduler.start();
+
+ }
+
+ public boolean hasScheduledUpdates() {
+ return !scheduledListeners.isEmpty();
+ }
+
+ void sync() {
+ try {
+ scheduler.flush();
+ } catch (Throwable t) {
+ LOGGER.error("Error while waiting for query dependency management", t);
+ }
+ }
+
+ void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
+
+ if(inferred) {
+ assert(listener == null);
+ return;
+ }
+
+ if(parent != null) {
+ try {
+ if(!child.isImmutable(graph))
+ consumer.accept(new RegisterParentRunnable(parent, child));
+ } catch (DatabaseException e) {
+ LOGGER.error("Error while registering query dependencies", e);
+ }
+ }
+
+ if(listener != null)
+ if(!listener.isDisposed())
+ consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
+
+ }
+
+ void registerFirstKnown(ListenerBase base, Object result) {
+
+ if(base == null) return;
+
+ consumer.accept(() -> {
+ ListenerEntry entry = addedEntries.get(base);
+ if(entry != null) entry.setLastKnown(result);
+ });
+
+ }
+
+ void scheduleListener(ListenerEntry entry) {
+
+ assert (entry != null);
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ System.err.println("Scheduled " + entry.procedure);
+ }
+ }
+
+ scheduledListeners.add(entry);
+
+ }
+
+ boolean hasListener(CacheEntry entry) {
+ if(listeners.get(entry) != null) return true;
+ return false;
+ }
+
+ boolean hasListenerAfterDisposing(CacheEntry entry) {
+ if(listeners.get(entry) != null) {
+ ArrayList<ListenerEntry> entries = listeners.get(entry);
+ ArrayList<ListenerEntry> list = null;
+ for (ListenerEntry e : entries) {
+ if (e.base.isDisposed()) {
+ if(list == null) list = new ArrayList<ListenerEntry>();
+ list.add(e);
+ }
+ }
+ if(list != null) {
+ for (ListenerEntry e : list) {
+ entries.remove(e);
+ }
+ }
+ if (entries.isEmpty()) {
+ listeners.remove(entry);
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
+
+ if(!workarea.containsKey(entry)) {
+
+ HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
+ for(ListenerEntry e : getListenerEntries(entry))
+ ls.add(e.base);
+
+ workarea.put(entry, ls);
+
+ for(CacheEntry parent : entry.getParents(processor)) {
+ processListenerReport(parent, workarea);
+ ls.addAll(workarea.get(parent));
+ }
+
+ }
+
+ }
+
+ public synchronized ListenerReport getListenerReport() throws IOException {
+
+ class ListenerReportImpl implements ListenerReport {
+
+ Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
+
+ @Override
+ public void print(PrintStream b) {
+ Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
+ for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
+ for(ListenerBase l : e.getValue()) {
+ Integer i = hist.get(l);
+ hist.put(l, i != null ? i-1 : -1);
+ }
+ }
+
+ for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
+ b.print("" + -p.second + " " + p.first + "\n");
+ }
+
+ b.flush();
+ }
+
+ }
+
+ ListenerReportImpl result = new ListenerReportImpl();
+
+ Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
+ for(CacheEntryBase entry : all) {
+ hasListenerAfterDisposing(entry);
+ }
+ for(CacheEntryBase entry : all) {
+ processListenerReport(entry, result.workarea);
+ }
+
+ return result;
+
+ }
+
+ public synchronized String reportListeners(File file) throws IOException {
+
+ if (!processor.isAlive())
+ return "Disposed!";
+
+ PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
+ ListenerReport report = getListenerReport();
+ report.print(b);
+
+ return "Done reporting listeners.";
+
+ }
+
+ public void fireListeners(WriteGraphImpl graph) {
+
+ assert (!processor.updating);
+ assert (!processor.cache.collecting);
+ assert (!firingListeners);
+
+ firingListeners = true;
+
+ try {
+
+ // Performing may cause further events to be scheduled.
+ while (!scheduledListeners.isEmpty()) {
+
+ // Clone current events to make new entries possible during
+ // firing.
+ THashSet<ListenerEntry> entries = scheduledListeners;
+ scheduledListeners = new THashSet<ListenerEntry>();
+
+ ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
+
+ for (ListenerEntry listenerEntry : entries) {
+
+ if (pruneListener(listenerEntry)) {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("Pruned " + listenerEntry.procedure);
+ }
+ }
+ continue;
+ }
+
+ final CacheEntry entry = listenerEntry.entry;
+ assert (entry != null);
+
+ Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
+
+ if (newValue != ListenerEntry.NOT_CHANGED) {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ new Exception().printStackTrace();
+ System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
+ }
+ }
+ schedule.add(listenerEntry);
+ listenerEntry.setLastKnown(entry.getResult());
+ }
+
+ }
+
+ for(ListenerEntry listenerEntry : schedule) {
+ final CacheEntry entry = listenerEntry.entry;
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ System.err.println("Firing " + listenerEntry.procedure);
+ }
+ }
+ try {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
+ System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
+ }
+ }
+ entry.performFromCache(graph, listenerEntry.procedure);
+ } catch (Throwable t) {
+ LOGGER.error("Unexpected exception ", t);
+ }
+ }
+
+ }
+
+ } finally {
+ firingListeners = false;
+ }
+
+ }
+
+ void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
+
+ Iterable<CacheEntry> oldParents = entry.getParents(processor);
+ for (CacheEntry parent : oldParents) {
+ if(!parent.isDiscarded())
+ todo.push(new UpdateEntry(entry, parent, indent + 2));
+ }
+
+ }
+
+ private boolean pruneListener(ListenerEntry entry) {
+
+ if (entry.base.isDisposed()) {
+
+ assert (entry != null);
+ ArrayList<ListenerEntry> list = listeners.get(entry.entry);
+ if(list != null) {
+ boolean success = list.remove(entry);
+ assert (success);
+ if (list.isEmpty())
+ listeners.remove(entry.entry);
+ }
+
+ return true;
+
+ } else {
+
+ return false;
+
+ }
+ }
+
+ private List<ListenerEntry> getListenerEntries(CacheEntry entry) {
+ hasListenerAfterDisposing(entry);
+ if(listeners.get(entry) != null)
+ return listeners.get(entry);
+ else
+ return Collections.emptyList();
+ }
+
}