1 package org.simantics.db.impl.query;
3 import java.io.BufferedOutputStream;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
17 import java.util.concurrent.ArrayBlockingQueue;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.Semaphore;
21 import org.simantics.databoard.Bindings;
22 import org.simantics.db.DevelopmentKeys;
23 import org.simantics.db.debug.ListenerReport;
24 import org.simantics.db.exception.DatabaseException;
25 import org.simantics.db.impl.graph.ReadGraphImpl;
26 import org.simantics.db.impl.graph.WriteGraphImpl;
27 import org.simantics.db.procedure.ListenerBase;
28 import org.simantics.utils.Development;
29 import org.simantics.utils.datastructures.Pair;
30 import org.simantics.utils.datastructures.collections.CollectionUtils;
31 import org.slf4j.LoggerFactory;
33 import gnu.trove.map.hash.THashMap;
34 import gnu.trove.set.hash.THashSet;
36 public class QueryListening {
38 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
40 final private QueryProcessor processor;
41 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
42 private boolean firingListeners = false;
43 final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
44 private BlockingQueue<Runnable> tasks = new ArrayBlockingQueue<Runnable>(2048);
45 private Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
47 QueryListening(QueryProcessor processor) {
48 this.processor = processor;
49 new DependencyManagementThread(processor, tasks).start();
53 Semaphore s = new Semaphore(0);
59 } catch (Throwable t) {
60 LOGGER.error("Error while waiting for query dependency management", t);
64 static class DependencyManagementThread extends Thread {
66 final private QueryProcessor processor;
67 final BlockingQueue<Runnable> tasks;
69 DependencyManagementThread(QueryProcessor processor, BlockingQueue<Runnable> tasks) {
70 setName("Query Dependency Manager");
71 this.processor = processor;
77 while(processor.isAlive()) {
79 Runnable r = tasks.take();
81 } catch (Throwable t) {
83 LOGGER.error("Error while waiting for query dependency management tasks", t);
90 public boolean hasScheduledUpdates() {
91 return !scheduledListeners.isEmpty();
94 void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
99 if (parent != null && !inferred) {
101 if(!child.isImmutable(graph))
102 child.addParent(parent);
103 } catch (DatabaseException e) {
104 LOGGER.error("Error while registering query dependencies", e);
106 if (Development.DEVELOPMENT) {
107 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
108 System.out.println(child + " -> " + parent);
113 if (listener != null)
114 registerListener(child, listener, procedure);
117 } catch (InterruptedException e) {
118 LOGGER.error("Error while registering dependencies", e);
123 void registerFirstKnown(ListenerBase base, Object result) {
127 ListenerEntry entry = addedEntries.get(base);
128 if(entry != null) entry.setLastKnown(result);
134 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
136 assert (entry != null);
138 if (base.isDisposed())
141 return addListener(entry, base, procedure);
146 * Registers a listener and returns an entry iff the entry was added
148 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
150 assert (entry != null);
151 assert (procedure != null);
153 ArrayList<ListenerEntry> list = listeners.get(entry);
155 list = new ArrayList<>(1);
156 listeners.put(entry, list);
159 ListenerEntry result = new ListenerEntry(entry, base, procedure);
160 // Equals is here based on base
161 int currentIndex = list.indexOf(result);
162 // There was already a listener
163 if(currentIndex > -1) {
164 ListenerEntry current = list.get(currentIndex);
165 if(!current.base.isDisposed()) return null;
166 list.set(currentIndex, result);
171 if (Development.DEVELOPMENT) {
172 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
173 new Exception().printStackTrace();
174 System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
178 addedEntries.put(base, result);
184 void scheduleListener(ListenerEntry entry) {
185 assert (entry != null);
186 if (Development.DEVELOPMENT) {
187 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
188 System.err.println("Scheduled " + entry.procedure);
191 scheduledListeners.add(entry);
194 private void removeListener(ListenerEntry entry) {
195 assert (entry != null);
196 ArrayList<ListenerEntry> list = listeners.get(entry.entry);
197 if(list == null) return;
198 boolean success = list.remove(entry);
201 listeners.remove(entry.entry);
204 boolean hasListener(CacheEntry entry) {
205 if(listeners.get(entry) != null) return true;
209 boolean hasListenerAfterDisposing(CacheEntry entry) {
210 if(listeners.get(entry) != null) {
211 ArrayList<ListenerEntry> entries = listeners.get(entry);
212 ArrayList<ListenerEntry> list = null;
213 for (ListenerEntry e : entries) {
214 if (e.base.isDisposed()) {
215 if(list == null) list = new ArrayList<ListenerEntry>();
220 for (ListenerEntry e : list) {
224 if (entries.isEmpty()) {
225 listeners.remove(entry);
233 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
234 hasListenerAfterDisposing(entry);
235 if(listeners.get(entry) != null)
236 return listeners.get(entry);
238 return Collections.emptyList();
241 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
243 if(!workarea.containsKey(entry)) {
245 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
246 for(ListenerEntry e : getListenerEntries(entry))
249 workarea.put(entry, ls);
251 for(CacheEntry parent : entry.getParents(processor)) {
252 processListenerReport(parent, workarea);
253 ls.addAll(workarea.get(parent));
260 public synchronized ListenerReport getListenerReport() throws IOException {
262 class ListenerReportImpl implements ListenerReport {
264 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
267 public void print(PrintStream b) {
268 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
269 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
270 for(ListenerBase l : e.getValue()) {
271 Integer i = hist.get(l);
272 hist.put(l, i != null ? i-1 : -1);
276 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
277 b.print("" + -p.second + " " + p.first + "\n");
285 ListenerReportImpl result = new ListenerReportImpl();
287 Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
288 for(CacheEntryBase entry : all) {
289 hasListenerAfterDisposing(entry);
291 for(CacheEntryBase entry : all) {
292 processListenerReport(entry, result.workarea);
299 public synchronized String reportListeners(File file) throws IOException {
301 if (!processor.isAlive())
304 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
305 ListenerReport report = getListenerReport();
308 return "Done reporting listeners.";
312 public void fireListeners(WriteGraphImpl graph) {
314 assert (!processor.updating);
315 assert (!processor.cache.collecting);
316 assert (!firingListeners);
318 firingListeners = true;
322 // Performing may cause further events to be scheduled.
323 while (!scheduledListeners.isEmpty()) {
325 // Clone current events to make new entries possible during
327 THashSet<ListenerEntry> entries = scheduledListeners;
328 scheduledListeners = new THashSet<ListenerEntry>();
330 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
332 for (ListenerEntry listenerEntry : entries) {
334 if (pruneListener(listenerEntry)) {
335 if (Development.DEVELOPMENT) {
336 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
337 new Exception().printStackTrace();
338 System.err.println("Pruned " + listenerEntry.procedure);
344 final CacheEntry entry = listenerEntry.entry;
345 assert (entry != null);
347 Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
349 if (newValue != ListenerEntry.NOT_CHANGED) {
350 if (Development.DEVELOPMENT) {
351 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
352 new Exception().printStackTrace();
353 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
356 schedule.add(listenerEntry);
357 listenerEntry.setLastKnown(entry.getResult());
362 for(ListenerEntry listenerEntry : schedule) {
363 final CacheEntry entry = listenerEntry.entry;
364 if (Development.DEVELOPMENT) {
365 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
366 System.err.println("Firing " + listenerEntry.procedure);
370 if (Development.DEVELOPMENT) {
371 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
372 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
375 entry.performFromCache(graph, listenerEntry.procedure);
376 } catch (Throwable t) {
384 firingListeners = false;
389 void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
391 Iterable<CacheEntry> oldParents = entry.getParents(processor);
392 for (CacheEntry parent : oldParents) {
393 if(!parent.isDiscarded())
394 todo.push(new UpdateEntry(entry, parent, indent + 2));
399 private boolean pruneListener(ListenerEntry entry) {
400 if (entry.base.isDisposed()) {
401 removeListener(entry);