private Consumer<Runnable> consumer;
private Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
- int execCount=0;
- int queueCount = 0;
-
static class TL extends ThreadLocal<ArrayList<Runnable>> {
Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
return !scheduledListeners.isEmpty();
}
- public synchronized void dispatch(Runnable r) {
- queueCount++;
- consumer.accept(r);
+ static class RegisterParentRunnable implements Runnable {
+
+ final CacheEntry parent;
+ final CacheEntry child;
+
+ public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
+ this.parent = parent;
+ this.child = child;
+ }
+
+ @Override
+ public void run() {
+ child.addParent(parent);
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+ System.out.println(child + " -> " + parent);
+ }
+ }
+ }
+
+ }
+
+ static class RegisterListenerRunnable implements Runnable {
+
+ final QueryListening queryListening;
+ final ListenerBase base;
+ final Object procedure;
+ final CacheEntry parent;
+ final CacheEntry entry;
+
+ public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
+ this.queryListening = queryListening;
+ this.base = base;;
+ this.procedure = procedure;
+ this.parent = parent;
+ this.entry = entry;
+ }
+
+ @Override
+ public void run() {
+ queryListening.addListener(entry, base, procedure);
+ }
+
}
void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
- dispatch(() -> {
+ if(inferred) {
+ assert(listener == null);
+ return;
+ }
- if (parent != null && !inferred) {
- try {
- if(!child.isImmutable(graph))
- child.addParent(parent);
- } catch (DatabaseException e) {
- LOGGER.error("Error while registering query dependencies", e);
- }
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
- System.out.println(child + " -> " + parent);
- }
- }
+ if(parent != null) {
+ try {
+ if(!child.isImmutable(graph))
+ consumer.accept(new RegisterParentRunnable(parent, child));
+ } catch (DatabaseException e) {
+ LOGGER.error("Error while registering query dependencies", e);
}
+ }
- if (listener != null)
- registerListener(child, listener, procedure);
-
- });
+ if(listener != null)
+ if(!listener.isDisposed())
+ consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
}
if(base == null) return;
- dispatch(() -> {
-
+ consumer.accept(() -> {
ListenerEntry entry = addedEntries.get(base);
if(entry != null) entry.setLastKnown(result);
-
});
}
- public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
-
- assert (entry != null);
-
- if (base.isDisposed())
- return null;
-
- return addListener(entry, base, procedure);
-
- }
-
/*
* Registers a listener and returns an entry iff the entry was added
*/