1 package org.simantics.db.impl.query;
3 import java.io.BufferedOutputStream;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
17 import java.util.concurrent.Semaphore;
18 import java.util.function.Consumer;
20 import org.simantics.databoard.Bindings;
21 import org.simantics.db.DevelopmentKeys;
22 import org.simantics.db.debug.ListenerReport;
23 import org.simantics.db.exception.DatabaseException;
24 import org.simantics.db.impl.graph.ReadGraphImpl;
25 import org.simantics.db.impl.graph.WriteGraphImpl;
26 import org.simantics.db.procedure.ListenerBase;
27 import org.simantics.utils.Development;
28 import org.simantics.utils.datastructures.Pair;
29 import org.simantics.utils.datastructures.collections.CollectionUtils;
30 import org.slf4j.LoggerFactory;
32 import gnu.trove.map.hash.THashMap;
33 import gnu.trove.set.hash.THashSet;
35 public class QueryListening {
37 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
39 private final QueryProcessor processor;
40 private final Scheduler scheduler;
41 private final Consumer<Runnable> consumer;
42 private final Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
44 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
45 private boolean firingListeners = false;
47 final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
49 private static class ThreadQueue extends ThreadLocal<ArrayList<Runnable>> {
51 private final Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
53 private ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
56 protected synchronized ArrayList<Runnable> initialValue() {
57 ArrayList<Runnable> result = new ArrayList<>();
58 allQueues.put(Thread.currentThread(), result);
62 synchronized void sendToExecution() {
63 ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
64 dispatchedQueues.add(rs);
68 synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
69 ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
70 dispatchedQueues = new ArrayList<>();
76 private static class Scheduler {
78 private static final int BUFFER_SIZE = 100;
80 private final QueryProcessor processor;
82 private final ThreadQueue queues = new ThreadQueue();
85 * Access to this field is synchronized using the monitor of 'queues'
86 * The method flush() shall be called by the single writing thread and the field has been nulled before the
87 * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time.
89 private Semaphore flush = null;
91 Scheduler(QueryProcessor processor) {
92 this.processor = processor;
95 private boolean isTerminated() {
96 return processor.isDisposed();
99 Consumer<Runnable> newConsumer() {
102 ArrayList<Runnable> l = queues.get();
104 if(l.size() == BUFFER_SIZE) {
106 queues.sendToExecution();
114 Thread thread = new Thread() {
118 synchronized(queues) {
119 while(!isTerminated()) {
121 ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
122 for(ArrayList<Runnable> queue : qs) {
123 for(Runnable r : queue)
127 for(ArrayList<Runnable> queue : queues.allQueues.values()) {
128 for(Runnable r : queue) {
138 } catch (InterruptedException e) {
139 LOGGER.error("Unexpected interrupt", e);
148 thread.setName("QueryListening");
153 private Semaphore createFlush() {
154 synchronized(queues) {
155 flush = new Semaphore(0);
163 createFlush().acquire();
164 } catch (InterruptedException e) {
165 LOGGER.error("Unexpected interrupt", e);
171 private static class RegisterParentRunnable implements Runnable {
173 private final CacheEntry parent;
174 private final CacheEntry child;
176 public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
177 this.parent = parent;
183 child.addParent(parent);
184 if (Development.DEVELOPMENT) {
185 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
186 System.out.println(child + " -> " + parent);
193 private static class RegisterListenerRunnable implements Runnable {
195 private final QueryListening queryListening;
196 private final ListenerBase base;
197 private final Object procedure;
198 private final CacheEntry parent;
199 private final CacheEntry entry;
201 public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
202 this.queryListening = queryListening;
204 this.procedure = procedure;
205 this.parent = parent;
212 assert (entry != null);
213 assert (procedure != null);
215 ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
217 list = new ArrayList<>(1);
218 queryListening.listeners.put(entry, list);
221 ListenerEntry result = new ListenerEntry(entry, base, procedure);
222 // Equals is here based on base
223 int currentIndex = list.indexOf(result);
224 // There was already a listener
225 if(currentIndex > -1) {
226 ListenerEntry current = list.get(currentIndex);
227 if(!current.base.isDisposed())
229 list.set(currentIndex, result);
234 if (Development.DEVELOPMENT) {
235 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
236 new Exception().printStackTrace();
237 System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
241 queryListening.addedEntries.put(base, result);
248 QueryListening(QueryProcessor processor) {
250 this.processor = processor;
251 scheduler = new Scheduler(processor);
252 consumer = scheduler.newConsumer();
257 public boolean hasScheduledUpdates() {
258 return !scheduledListeners.isEmpty();
264 } catch (Throwable t) {
265 LOGGER.error("Error while waiting for query dependency management", t);
269 void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
272 assert(listener == null);
278 if(!child.isImmutable(graph))
279 consumer.accept(new RegisterParentRunnable(parent, child));
280 } catch (DatabaseException e) {
281 LOGGER.error("Error while registering query dependencies", e);
286 if(!listener.isDisposed())
287 consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
291 void registerFirstKnown(ListenerBase base, Object result) {
293 if(base == null) return;
295 consumer.accept(() -> {
296 ListenerEntry entry = addedEntries.remove(base);
297 if(entry != null) entry.setLastKnown(result);
302 void scheduleListener(ListenerEntry entry) {
304 assert (entry != null);
306 if (Development.DEVELOPMENT) {
307 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
308 System.err.println("Scheduled " + entry.procedure);
312 scheduledListeners.add(entry);
316 boolean hasListener(CacheEntry entry) {
317 if(listeners.get(entry) != null) return true;
321 boolean hasListenerAfterDisposing(CacheEntry entry) {
322 if(listeners.get(entry) != null) {
323 ArrayList<ListenerEntry> entries = listeners.get(entry);
324 ArrayList<ListenerEntry> list = null;
325 for (ListenerEntry e : entries) {
326 if (e.base.isDisposed()) {
327 if(list == null) list = new ArrayList<ListenerEntry>();
332 for (ListenerEntry e : list) {
336 if (entries.isEmpty()) {
337 listeners.remove(entry);
345 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
347 if(!workarea.containsKey(entry)) {
349 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
350 for(ListenerEntry e : getListenerEntries(entry))
353 workarea.put(entry, ls);
355 for(CacheEntry parent : entry.getParents(processor)) {
356 processListenerReport(parent, workarea);
357 ls.addAll(workarea.get(parent));
364 public synchronized ListenerReport getListenerReport() throws IOException {
366 class ListenerReportImpl implements ListenerReport {
368 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
371 public void print(PrintStream b) {
372 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
373 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
374 for(ListenerBase l : e.getValue()) {
375 Integer i = hist.get(l);
376 hist.put(l, i != null ? i-1 : -1);
380 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
381 b.print("" + -p.second + " " + p.first + "\n");
389 ListenerReportImpl result = new ListenerReportImpl();
391 Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
392 for(CacheEntryBase entry : all) {
393 hasListenerAfterDisposing(entry);
395 for(CacheEntryBase entry : all) {
396 processListenerReport(entry, result.workarea);
403 public synchronized String reportListeners(File file) throws IOException {
405 if (!processor.isAlive())
408 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
409 ListenerReport report = getListenerReport();
412 return "Done reporting listeners.";
416 public void fireListeners(ReadGraphImpl graph) {
418 assert (!processor.updating);
419 assert (!processor.cache.collecting);
420 assert (!firingListeners);
422 firingListeners = true;
426 // Performing may cause further events to be scheduled.
427 while (!scheduledListeners.isEmpty()) {
429 // Clone current events to make new entries possible during
431 THashSet<ListenerEntry> entries = scheduledListeners;
432 scheduledListeners = new THashSet<ListenerEntry>();
434 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
436 for (ListenerEntry listenerEntry : entries) {
438 if (pruneListener(listenerEntry)) {
439 if (Development.DEVELOPMENT) {
440 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
441 new Exception().printStackTrace();
442 System.err.println("Pruned " + listenerEntry.procedure);
448 final CacheEntry entry = listenerEntry.entry;
449 assert (entry != null);
451 Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
453 if (newValue != ListenerEntry.NOT_CHANGED) {
454 if (Development.DEVELOPMENT) {
455 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
456 new Exception().printStackTrace();
457 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
460 schedule.add(listenerEntry);
461 listenerEntry.setLastKnown(entry.getResult());
466 for(ListenerEntry listenerEntry : schedule) {
467 final CacheEntry entry = listenerEntry.entry;
468 if (Development.DEVELOPMENT) {
469 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
470 System.err.println("Firing " + listenerEntry.procedure);
474 if (Development.DEVELOPMENT) {
475 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
476 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
479 entry.performFromCache(graph, listenerEntry.procedure);
480 } catch (Throwable t) {
481 LOGGER.error("Unexpected exception ", t);
488 firingListeners = false;
493 void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
495 Iterable<CacheEntry> oldParents = entry.getParents(processor);
496 for (CacheEntry parent : oldParents) {
497 if(!parent.isDiscarded())
498 todo.push(new UpdateEntry(entry, parent, indent + 2));
503 private boolean pruneListener(ListenerEntry entry) {
505 if (entry.base.isDisposed()) {
507 assert (entry != null);
508 ArrayList<ListenerEntry> list = listeners.get(entry.entry);
510 boolean success = list.remove(entry);
513 listeners.remove(entry.entry);
525 private List<ListenerEntry> getListenerEntries(CacheEntry entry) {
526 hasListenerAfterDisposing(entry);
527 if(listeners.get(entry) != null)
528 return listeners.get(entry);
530 return Collections.emptyList();