1 package org.simantics.db.impl.query;
3 import java.io.BufferedOutputStream;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
17 import java.util.concurrent.Semaphore;
18 import java.util.function.Consumer;
20 import org.simantics.databoard.Bindings;
21 import org.simantics.db.DevelopmentKeys;
22 import org.simantics.db.debug.ListenerReport;
23 import org.simantics.db.exception.DatabaseException;
24 import org.simantics.db.impl.graph.ReadGraphImpl;
25 import org.simantics.db.impl.graph.WriteGraphImpl;
26 import org.simantics.db.procedure.ListenerBase;
27 import org.simantics.utils.Development;
28 import org.simantics.utils.datastructures.Pair;
29 import org.simantics.utils.datastructures.collections.CollectionUtils;
30 import org.slf4j.LoggerFactory;
32 import gnu.trove.map.hash.THashMap;
33 import gnu.trove.set.hash.THashSet;
35 public class QueryListening {
37 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
39 private final QueryProcessor processor;
40 private final Scheduler scheduler;
41 private final Consumer<Runnable> consumer;
42 private final Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
44 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
45 private boolean firingListeners = false;
47 final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
49 private static class ThreadQueue extends ThreadLocal<List<Runnable>> {
51 private final Map<Thread,List<Runnable>> allQueues = new HashMap<>();
53 private ArrayList<List<Runnable>> dispatchedQueues = new ArrayList<>();
56 protected synchronized List<Runnable> initialValue() {
57 List<Runnable> result = Collections.synchronizedList(new ArrayList<>());
58 allQueues.put(Thread.currentThread(), result);
62 synchronized void sendToExecution() {
63 List<Runnable> rs = allQueues.remove(Thread.currentThread());
64 dispatchedQueues.add(rs);
68 synchronized ArrayList<List<Runnable>> getDispatchedQueues() {
69 ArrayList<List<Runnable>> result = dispatchedQueues;
70 dispatchedQueues = new ArrayList<>();
76 private static class Scheduler {
78 private static final int BUFFER_SIZE = 100;
80 private final QueryProcessor processor;
82 private final ThreadQueue queues = new ThreadQueue();
85 * Access to this field is synchronized using the monitor of 'queues'
86 * The method flush() shall be called by the single writing thread and the field has been nulled before the
87 * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time.
89 private Semaphore flush = null;
91 Scheduler(QueryProcessor processor) {
92 this.processor = processor;
95 private boolean isTerminated() {
96 return processor.isDisposed();
99 Consumer<Runnable> newConsumer() {
102 List<Runnable> l = queues.get();
104 if(l.size() == BUFFER_SIZE) {
106 queues.sendToExecution();
114 Thread thread = new Thread() {
118 synchronized(queues) {
119 while(!isTerminated()) {
121 ArrayList<List<Runnable>> qs = queues.getDispatchedQueues();
122 for(List<Runnable> queue : qs) {
123 queue.forEach(r -> r.run());
126 for(List<Runnable> queue : queues.allQueues.values()) {
127 // FIXME : using synchronized list is not enough, the list may change between forEach and clear!
128 queue.forEach(r -> r.run());
136 } catch (InterruptedException e) {
137 LOGGER.error("Unexpected interrupt", e);
146 thread.setName("QueryListening");
151 private Semaphore createFlush() {
152 synchronized(queues) {
153 flush = new Semaphore(0);
161 createFlush().acquire();
162 } catch (InterruptedException e) {
163 LOGGER.error("Unexpected interrupt", e);
169 private static class RegisterParentRunnable implements Runnable {
171 private final CacheEntry parent;
172 private final CacheEntry child;
174 public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
175 this.parent = parent;
181 child.addParent(parent);
182 if (Development.DEVELOPMENT) {
183 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
184 System.out.println(child + " -> " + parent);
191 private static class RegisterListenerRunnable implements Runnable {
193 private final QueryListening queryListening;
194 private final ListenerBase base;
195 private final Object procedure;
196 private final CacheEntry parent;
197 private final CacheEntry entry;
199 public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
200 this.queryListening = queryListening;
202 this.procedure = procedure;
203 this.parent = parent;
210 assert (entry != null);
211 assert (procedure != null);
213 ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
215 list = new ArrayList<>(1);
216 queryListening.listeners.put(entry, list);
219 ListenerEntry result = new ListenerEntry(entry, base, procedure);
220 // Equals is here based on base
221 int currentIndex = list.indexOf(result);
222 // There was already a listener
223 if(currentIndex > -1) {
224 ListenerEntry current = list.get(currentIndex);
225 if(!current.base.isDisposed())
227 list.set(currentIndex, result);
232 if (Development.DEVELOPMENT) {
233 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
234 new Exception().printStackTrace();
235 System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
239 queryListening.addedEntries.put(base, result);
246 QueryListening(QueryProcessor processor) {
248 this.processor = processor;
249 scheduler = new Scheduler(processor);
250 consumer = scheduler.newConsumer();
255 public boolean hasScheduledUpdates() {
256 return !scheduledListeners.isEmpty();
262 } catch (Throwable t) {
263 LOGGER.error("Error while waiting for query dependency management", t);
267 void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
270 assert(listener == null);
276 if(!child.isImmutable(graph))
277 consumer.accept(new RegisterParentRunnable(parent, child));
278 } catch (DatabaseException e) {
279 LOGGER.error("Error while registering query dependencies", e);
284 if(!listener.isDisposed())
285 consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
289 void registerFirstKnown(ListenerBase base, Object result) {
291 if(base == null) return;
293 consumer.accept(() -> {
294 ListenerEntry entry = addedEntries.remove(base);
295 if(entry != null) entry.setLastKnown(result);
300 void scheduleListener(ListenerEntry entry) {
302 assert (entry != null);
304 if (Development.DEVELOPMENT) {
305 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
306 System.err.println("Scheduled " + entry.procedure);
310 scheduledListeners.add(entry);
314 boolean hasListener(CacheEntry entry) {
315 if(listeners.get(entry) != null) return true;
319 boolean hasListenerAfterDisposing(CacheEntry entry) {
320 if(listeners.get(entry) != null) {
321 ArrayList<ListenerEntry> entries = listeners.get(entry);
322 ArrayList<ListenerEntry> list = null;
323 for (ListenerEntry e : entries) {
324 if (e.base.isDisposed()) {
325 if(list == null) list = new ArrayList<ListenerEntry>();
330 for (ListenerEntry e : list) {
334 if (entries.isEmpty()) {
335 listeners.remove(entry);
343 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
345 if(!workarea.containsKey(entry)) {
347 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
348 for(ListenerEntry e : getListenerEntries(entry))
351 workarea.put(entry, ls);
353 for(CacheEntry parent : entry.getParents(processor)) {
354 processListenerReport(parent, workarea);
355 ls.addAll(workarea.get(parent));
362 public synchronized ListenerReport getListenerReport() throws IOException {
364 class ListenerReportImpl implements ListenerReport {
366 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
369 public void print(PrintStream b) {
370 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
371 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
372 for(ListenerBase l : e.getValue()) {
373 Integer i = hist.get(l);
374 hist.put(l, i != null ? i-1 : -1);
378 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
379 b.print("" + -p.second + " " + p.first + "\n");
387 ListenerReportImpl result = new ListenerReportImpl();
389 Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
390 for(CacheEntryBase entry : all) {
391 hasListenerAfterDisposing(entry);
393 for(CacheEntryBase entry : all) {
394 processListenerReport(entry, result.workarea);
401 public synchronized String reportListeners(File file) throws IOException {
403 if (!processor.isAlive())
406 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
407 ListenerReport report = getListenerReport();
410 return "Done reporting listeners.";
414 public void fireListeners(ReadGraphImpl graph) {
416 assert (!processor.updating);
417 assert (!processor.cache.collecting);
418 assert (!firingListeners);
420 firingListeners = true;
424 // Performing may cause further events to be scheduled.
425 while (!scheduledListeners.isEmpty()) {
427 // Clone current events to make new entries possible during
429 THashSet<ListenerEntry> entries = scheduledListeners;
430 scheduledListeners = new THashSet<ListenerEntry>();
432 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
434 for (ListenerEntry listenerEntry : entries) {
436 if (pruneListener(listenerEntry)) {
437 if (Development.DEVELOPMENT) {
438 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
439 new Exception().printStackTrace();
440 System.err.println("Pruned " + listenerEntry.procedure);
446 final CacheEntry entry = listenerEntry.entry;
447 assert (entry != null);
449 Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
451 if (newValue != ListenerEntry.NOT_CHANGED) {
452 if (Development.DEVELOPMENT) {
453 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
454 new Exception().printStackTrace();
455 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
458 schedule.add(listenerEntry);
459 listenerEntry.setLastKnown(entry.getResult());
464 for(ListenerEntry listenerEntry : schedule) {
465 final CacheEntry entry = listenerEntry.entry;
466 if (Development.DEVELOPMENT) {
467 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
468 System.err.println("Firing " + listenerEntry.procedure);
472 if (Development.DEVELOPMENT) {
473 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
474 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
477 entry.performFromCache(graph, listenerEntry.procedure);
478 } catch (Throwable t) {
479 LOGGER.error("Unexpected exception ", t);
486 firingListeners = false;
491 void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
493 Iterable<CacheEntry> oldParents = entry.getParents(processor);
494 for (CacheEntry parent : oldParents) {
495 if(!parent.isDiscarded())
496 todo.push(new UpdateEntry(entry, parent, indent + 2));
501 private boolean pruneListener(ListenerEntry entry) {
503 if (entry.base.isDisposed()) {
505 assert (entry != null);
506 ArrayList<ListenerEntry> list = listeners.get(entry.entry);
508 boolean success = list.remove(entry);
511 listeners.remove(entry.entry);
523 private List<ListenerEntry> getListenerEntries(CacheEntry entry) {
524 hasListenerAfterDisposing(entry);
525 if(listeners.get(entry) != null)
526 return listeners.get(entry);
528 return Collections.emptyList();