1 package org.simantics.db.impl.query;
3 import java.io.BufferedOutputStream;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
18 import org.simantics.databoard.Bindings;
19 import org.simantics.db.DevelopmentKeys;
20 import org.simantics.db.debug.ListenerReport;
21 import org.simantics.db.exception.DatabaseException;
22 import org.simantics.db.impl.graph.ReadGraphImpl;
23 import org.simantics.db.procedure.ListenerBase;
24 import org.simantics.utils.Development;
25 import org.simantics.utils.datastructures.Pair;
26 import org.simantics.utils.datastructures.collections.CollectionUtils;
27 import org.slf4j.LoggerFactory;
29 import gnu.trove.map.hash.THashMap;
30 import gnu.trove.set.hash.THashSet;
32 public class QueryListening {
34 static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
36 private final QueryProcessor processor;
37 private final Scheduler scheduler;
38 private final Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
40 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
41 private boolean firingListeners = false;
43 final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
46 QueryListening(QueryProcessor processor) {
48 this.processor = processor;
49 scheduler = new Scheduler(processor);
54 public boolean hasScheduledUpdates() {
55 return !scheduledListeners.isEmpty();
61 } catch (Throwable t) {
62 LOGGER.error("Error while waiting for query dependency management", t);
66 void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
69 assert(listener == null);
75 if(!child.isImmutable(graph))
76 scheduler.accept(new RegisterParentRunnable(parent, child));
77 } catch (DatabaseException e) {
78 LOGGER.error("Error while registering query dependencies", e);
83 if(!listener.isDisposed())
84 scheduler.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
88 void registerFirstKnown(ListenerBase base, Object result) {
90 if(base == null) return;
92 scheduler.accept(new RegisterFirstKnownRunnable(addedEntries, base, result));
96 void scheduleListener(ListenerEntry entry) {
98 assert (entry != null);
100 if (Development.DEVELOPMENT) {
101 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
102 System.err.println("Scheduled " + entry.procedure);
106 scheduledListeners.add(entry);
110 boolean hasListener(CacheEntry entry) {
111 if(listeners.get(entry) != null) return true;
115 boolean hasListenerAfterDisposing(CacheEntry entry) {
116 if(listeners.get(entry) != null) {
117 ArrayList<ListenerEntry> entries = listeners.get(entry);
118 ArrayList<ListenerEntry> list = null;
119 for (ListenerEntry e : entries) {
120 if (e.base.isDisposed()) {
121 if(list == null) list = new ArrayList<ListenerEntry>();
126 for (ListenerEntry e : list) {
130 if (entries.isEmpty()) {
131 listeners.remove(entry);
139 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
141 if(!workarea.containsKey(entry)) {
143 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
144 for(ListenerEntry e : getListenerEntries(entry))
147 workarea.put(entry, ls);
149 for(CacheEntry parent : entry.getParents(processor)) {
150 processListenerReport(parent, workarea);
151 ls.addAll(workarea.get(parent));
158 public synchronized ListenerReport getListenerReport() throws IOException {
160 class ListenerReportImpl implements ListenerReport {
162 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
165 public void print(PrintStream b) {
166 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
167 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
168 for(ListenerBase l : e.getValue()) {
169 Integer i = hist.get(l);
170 hist.put(l, i != null ? i-1 : -1);
174 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
175 b.print("" + -p.second + " " + p.first + "\n");
183 ListenerReportImpl result = new ListenerReportImpl();
185 Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
186 for(CacheEntryBase entry : all) {
187 hasListenerAfterDisposing(entry);
189 for(CacheEntryBase entry : all) {
190 processListenerReport(entry, result.workarea);
197 public synchronized String reportListeners(File file) throws IOException {
199 if (!processor.isAlive())
202 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
203 ListenerReport report = getListenerReport();
206 return "Done reporting listeners.";
210 public void fireListeners(ReadGraphImpl graph) {
212 assert (!processor.updating);
213 assert (!processor.cache.collecting);
214 assert (!firingListeners);
216 firingListeners = true;
220 // Performing may cause further events to be scheduled.
221 while (!scheduledListeners.isEmpty()) {
223 // Clone current events to make new entries possible during
225 THashSet<ListenerEntry> entries = scheduledListeners;
226 scheduledListeners = new THashSet<ListenerEntry>();
228 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
230 for (ListenerEntry listenerEntry : entries) {
232 if (pruneListener(listenerEntry)) {
233 if (Development.DEVELOPMENT) {
234 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
235 new Exception().printStackTrace();
236 System.err.println("Pruned " + listenerEntry.procedure);
242 final CacheEntry entry = listenerEntry.entry;
243 assert (entry != null);
245 Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
247 if (newValue != ListenerEntry.NOT_CHANGED) {
248 if (Development.DEVELOPMENT) {
249 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
250 new Exception().printStackTrace();
251 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
254 schedule.add(listenerEntry);
255 listenerEntry.setLastKnown(entry.getResult());
260 for(ListenerEntry listenerEntry : schedule) {
261 final CacheEntry entry = listenerEntry.entry;
262 if (Development.DEVELOPMENT) {
263 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
264 System.err.println("Firing " + listenerEntry.procedure);
268 if (Development.DEVELOPMENT) {
269 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
270 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
273 entry.performFromCache(graph, listenerEntry.procedure);
274 } catch (Throwable t) {
275 LOGGER.error("Unexpected exception ", t);
282 firingListeners = false;
287 void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
289 Iterable<CacheEntry> oldParents = entry.getParents(processor);
290 for (CacheEntry parent : oldParents) {
291 if(!parent.isDiscarded())
292 todo.push(new UpdateEntry(entry, parent, indent + 2));
297 private boolean pruneListener(ListenerEntry entry) {
299 if (entry.base.isDisposed()) {
301 assert (entry != null);
302 ArrayList<ListenerEntry> list = listeners.get(entry.entry);
304 boolean success = list.remove(entry);
307 listeners.remove(entry.entry);
319 private List<ListenerEntry> getListenerEntries(CacheEntry entry) {
320 hasListenerAfterDisposing(entry);
321 if(listeners.get(entry) != null)
322 return listeners.get(entry);
324 return Collections.emptyList();
327 private static class RegisterParentRunnable implements Runnable {
329 private final CacheEntry parent;
330 private final CacheEntry child;
332 public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
333 this.parent = parent;
339 child.addParent(parent);
340 if (Development.DEVELOPMENT) {
341 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
342 System.out.println(child + " -> " + parent);
349 private static class RegisterListenerRunnable implements Runnable {
351 private final QueryListening queryListening;
352 private final ListenerBase base;
353 private final Object procedure;
354 private final CacheEntry parent;
355 private final CacheEntry entry;
357 public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
358 this.queryListening = queryListening;
360 this.procedure = procedure;
361 this.parent = parent;
368 assert (entry != null);
369 assert (procedure != null);
371 ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
373 list = new ArrayList<>(1);
374 queryListening.listeners.put(entry, list);
377 ListenerEntry result = new ListenerEntry(entry, base, procedure);
378 // Equals is here based on base
379 int currentIndex = list.indexOf(result);
380 // There was already a listener
381 if(currentIndex > -1) {
382 ListenerEntry current = list.get(currentIndex);
383 if(!current.base.isDisposed())
385 list.set(currentIndex, result);
390 if (Development.DEVELOPMENT) {
391 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
392 new Exception().printStackTrace();
393 System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
397 queryListening.addedEntries.put(base, result);
404 private static class RegisterFirstKnownRunnable implements Runnable {
406 private final Map<ListenerBase,ListenerEntry> addedEntries;
407 private final ListenerBase base;
408 private final Object result;
410 public RegisterFirstKnownRunnable(Map<ListenerBase,ListenerEntry> addedEntries, ListenerBase base, Object result) {
411 this.addedEntries = addedEntries;
413 this.result = result;
418 ListenerEntry entry = addedEntries.remove(base);
419 if(entry != null) entry.setLastKnown(result);