1 package org.simantics.db.impl.query;
3 import java.io.BufferedOutputStream;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.ThreadFactory;
19 import java.util.concurrent.atomic.AtomicInteger;
20 import java.util.function.Consumer;
22 import org.simantics.databoard.Bindings;
23 import org.simantics.db.DevelopmentKeys;
24 import org.simantics.db.debug.ListenerReport;
25 import org.simantics.db.exception.DatabaseException;
26 import org.simantics.db.impl.graph.ReadGraphImpl;
27 import org.simantics.db.impl.graph.WriteGraphImpl;
28 import org.simantics.db.procedure.ListenerBase;
29 import org.simantics.utils.Development;
30 import org.simantics.utils.datastructures.Pair;
31 import org.simantics.utils.datastructures.collections.CollectionUtils;
32 import org.slf4j.LoggerFactory;
34 import gnu.trove.map.hash.THashMap;
35 import gnu.trove.set.hash.THashSet;
37 public class QueryListening {
39 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
41 final private QueryProcessor processor;
42 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
43 private boolean firingListeners = false;
44 final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
45 private EventBusImpl eventBus;
46 private Consumer<Runnable> consumer;
47 private Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
49 static class TL extends ThreadLocal<ArrayList<Runnable>> {
51 Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
52 ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
55 protected synchronized ArrayList<Runnable> initialValue() {
56 ArrayList<Runnable> result = new ArrayList<>();
57 allQueues.put(Thread.currentThread(), result);
61 synchronized void sendToExecution() {
62 ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
63 dispatchedQueues.add(rs);
67 synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
68 ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
69 dispatchedQueues = new ArrayList<>();
75 static class EventBusImpl {
77 private static final int BUFFER_SIZE = 100;
80 Semaphore flush = null;
82 public boolean isTerminated() {
86 public Consumer<Runnable> newConsumer() {
87 return new Consumer<Runnable>() {
90 public void accept(Runnable arg0) {
91 ArrayList<Runnable> l = queues.get();
93 if(l.size() == BUFFER_SIZE) {
95 queues.sendToExecution();
101 public void shutdown() {
104 public void start(ThreadFactory arg1) {
105 arg1.newThread(new Runnable() {
109 synchronized(queues) {
110 while(!isTerminated()) {
112 ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
113 for(ArrayList<Runnable> queue : qs) {
114 for(Runnable r : queue)
118 for(ArrayList<Runnable> queue : queues.allQueues.values()) {
119 for(Runnable r : queue) {
129 } catch (InterruptedException e) {
140 synchronized(queues) {
141 flush = new Semaphore(0);
146 } catch (InterruptedException e) {
153 QueryListening(QueryProcessor processor) {
154 this.processor = processor;
155 eventBus = new EventBusImpl();
156 consumer = eventBus.newConsumer();
157 eventBus.start(new ThreadFactory()
160 public Thread newThread(Runnable r) {
161 System.err.println("new Thread " + r);
162 return new Thread(r, "QueryDependencyManager");
170 } catch (Throwable t) {
171 LOGGER.error("Error while waiting for query dependency management", t);
175 public boolean hasScheduledUpdates() {
176 return !scheduledListeners.isEmpty();
179 static class RegisterParentRunnable implements Runnable {
181 final CacheEntry parent;
182 final CacheEntry child;
184 public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
185 this.parent = parent;
191 child.addParent(parent);
192 if (Development.DEVELOPMENT) {
193 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
194 System.out.println(child + " -> " + parent);
201 static class RegisterListenerRunnable implements Runnable {
203 final QueryListening queryListening;
204 final ListenerBase base;
205 final Object procedure;
206 final CacheEntry parent;
207 final CacheEntry entry;
209 public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
210 this.queryListening = queryListening;
212 this.procedure = procedure;
213 this.parent = parent;
219 queryListening.addListener(entry, base, procedure);
224 void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
227 assert(listener == null);
233 if(!child.isImmutable(graph))
234 consumer.accept(new RegisterParentRunnable(parent, child));
235 } catch (DatabaseException e) {
236 LOGGER.error("Error while registering query dependencies", e);
241 if(!listener.isDisposed())
242 consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
246 void registerFirstKnown(ListenerBase base, Object result) {
248 if(base == null) return;
250 consumer.accept(() -> {
251 ListenerEntry entry = addedEntries.get(base);
252 if(entry != null) entry.setLastKnown(result);
258 * Registers a listener and returns an entry iff the entry was added
260 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
262 assert (entry != null);
263 assert (procedure != null);
265 ArrayList<ListenerEntry> list = listeners.get(entry);
267 list = new ArrayList<>(1);
268 listeners.put(entry, list);
271 ListenerEntry result = new ListenerEntry(entry, base, procedure);
272 // Equals is here based on base
273 int currentIndex = list.indexOf(result);
274 // There was already a listener
275 if(currentIndex > -1) {
276 ListenerEntry current = list.get(currentIndex);
277 if(!current.base.isDisposed()) return null;
278 list.set(currentIndex, result);
283 if (Development.DEVELOPMENT) {
284 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
285 new Exception().printStackTrace();
286 System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
290 addedEntries.put(base, result);
296 void scheduleListener(ListenerEntry entry) {
297 assert (entry != null);
298 if (Development.DEVELOPMENT) {
299 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
300 System.err.println("Scheduled " + entry.procedure);
303 scheduledListeners.add(entry);
306 private void removeListener(ListenerEntry entry) {
307 assert (entry != null);
308 ArrayList<ListenerEntry> list = listeners.get(entry.entry);
309 if(list == null) return;
310 boolean success = list.remove(entry);
313 listeners.remove(entry.entry);
316 boolean hasListener(CacheEntry entry) {
317 if(listeners.get(entry) != null) return true;
321 boolean hasListenerAfterDisposing(CacheEntry entry) {
322 if(listeners.get(entry) != null) {
323 ArrayList<ListenerEntry> entries = listeners.get(entry);
324 ArrayList<ListenerEntry> list = null;
325 for (ListenerEntry e : entries) {
326 if (e.base.isDisposed()) {
327 if(list == null) list = new ArrayList<ListenerEntry>();
332 for (ListenerEntry e : list) {
336 if (entries.isEmpty()) {
337 listeners.remove(entry);
345 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
346 hasListenerAfterDisposing(entry);
347 if(listeners.get(entry) != null)
348 return listeners.get(entry);
350 return Collections.emptyList();
353 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
355 if(!workarea.containsKey(entry)) {
357 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
358 for(ListenerEntry e : getListenerEntries(entry))
361 workarea.put(entry, ls);
363 for(CacheEntry parent : entry.getParents(processor)) {
364 processListenerReport(parent, workarea);
365 ls.addAll(workarea.get(parent));
372 public synchronized ListenerReport getListenerReport() throws IOException {
374 class ListenerReportImpl implements ListenerReport {
376 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
379 public void print(PrintStream b) {
380 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
381 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
382 for(ListenerBase l : e.getValue()) {
383 Integer i = hist.get(l);
384 hist.put(l, i != null ? i-1 : -1);
388 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
389 b.print("" + -p.second + " " + p.first + "\n");
397 ListenerReportImpl result = new ListenerReportImpl();
399 Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
400 for(CacheEntryBase entry : all) {
401 hasListenerAfterDisposing(entry);
403 for(CacheEntryBase entry : all) {
404 processListenerReport(entry, result.workarea);
411 public synchronized String reportListeners(File file) throws IOException {
413 if (!processor.isAlive())
416 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
417 ListenerReport report = getListenerReport();
420 return "Done reporting listeners.";
424 public void fireListeners(WriteGraphImpl graph) {
426 assert (!processor.updating);
427 assert (!processor.cache.collecting);
428 assert (!firingListeners);
430 firingListeners = true;
434 // Performing may cause further events to be scheduled.
435 while (!scheduledListeners.isEmpty()) {
437 // Clone current events to make new entries possible during
439 THashSet<ListenerEntry> entries = scheduledListeners;
440 scheduledListeners = new THashSet<ListenerEntry>();
442 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
444 for (ListenerEntry listenerEntry : entries) {
446 if (pruneListener(listenerEntry)) {
447 if (Development.DEVELOPMENT) {
448 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
449 new Exception().printStackTrace();
450 System.err.println("Pruned " + listenerEntry.procedure);
456 final CacheEntry entry = listenerEntry.entry;
457 assert (entry != null);
459 Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
461 if (newValue != ListenerEntry.NOT_CHANGED) {
462 if (Development.DEVELOPMENT) {
463 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
464 new Exception().printStackTrace();
465 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
468 schedule.add(listenerEntry);
469 listenerEntry.setLastKnown(entry.getResult());
474 for(ListenerEntry listenerEntry : schedule) {
475 final CacheEntry entry = listenerEntry.entry;
476 if (Development.DEVELOPMENT) {
477 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
478 System.err.println("Firing " + listenerEntry.procedure);
482 if (Development.DEVELOPMENT) {
483 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
484 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
487 entry.performFromCache(graph, listenerEntry.procedure);
488 } catch (Throwable t) {
496 firingListeners = false;
501 void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
503 Iterable<CacheEntry> oldParents = entry.getParents(processor);
504 for (CacheEntry parent : oldParents) {
505 if(!parent.isDiscarded())
506 todo.push(new UpdateEntry(entry, parent, indent + 2));
511 private boolean pruneListener(ListenerEntry entry) {
512 if (entry.base.isDisposed()) {
513 removeListener(entry);