e3bd437416af310936e66533746b270d8228aba8
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryListening.java
1 package org.simantics.db.impl.query;
2
3 import java.io.BufferedOutputStream;
4 import java.io.File;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.Semaphore;
18 import java.util.function.Consumer;
19
20 import org.simantics.databoard.Bindings;
21 import org.simantics.db.DevelopmentKeys;
22 import org.simantics.db.debug.ListenerReport;
23 import org.simantics.db.exception.DatabaseException;
24 import org.simantics.db.impl.graph.ReadGraphImpl;
25 import org.simantics.db.impl.graph.WriteGraphImpl;
26 import org.simantics.db.procedure.ListenerBase;
27 import org.simantics.utils.Development;
28 import org.simantics.utils.datastructures.Pair;
29 import org.simantics.utils.datastructures.collections.CollectionUtils;
30 import org.slf4j.LoggerFactory;
31
32 import gnu.trove.map.hash.THashMap;
33 import gnu.trove.set.hash.THashSet;
34
35 public class QueryListening {
36
37     private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
38
39     private final QueryProcessor                                    processor;
40     private final Scheduler                                         scheduler;
41     private final Consumer<Runnable>                                consumer;
42     private final Map<ListenerBase,ListenerEntry>                   addedEntries = new HashMap<>();
43
44     private THashSet<ListenerEntry>                                 scheduledListeners    = new THashSet<ListenerEntry>();
45     private boolean                                                 firingListeners       = false;
46
47     final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
48
49     private static class ThreadQueue extends ThreadLocal<ArrayList<Runnable>> {
50
51         private final Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
52         
53         private ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
54
55         @Override
56         protected synchronized ArrayList<Runnable> initialValue() {
57             ArrayList<Runnable> result = new ArrayList<>();
58             allQueues.put(Thread.currentThread(), result);
59             return result;
60         }
61
62         synchronized void sendToExecution() {
63             ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
64             dispatchedQueues.add(rs);
65             notify();
66         }
67
68         synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
69             ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
70             dispatchedQueues = new ArrayList<>();
71             return result;
72         }
73
74     }
75
76     private static class Scheduler {
77
78         private static final int BUFFER_SIZE = 100;
79
80         private final QueryProcessor processor;
81         
82         private final ThreadQueue queues = new ThreadQueue();
83
84         /*
85          * Access to this field is synchronized using the monitor of 'queues'
86          * The method flush() shall be called by the single writing thread and the field has been nulled before the
87          * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time.
88          */
89         private Semaphore flush = null;
90
91         Scheduler(QueryProcessor processor) {
92             this.processor = processor;
93         }
94         
95         private boolean isTerminated() {
96             return processor.isDisposed();
97         }
98
99         Consumer<Runnable> newConsumer() {
100             return (task) -> {
101
102                 ArrayList<Runnable> l = queues.get();
103                 l.add(task);
104                 if(l.size() == BUFFER_SIZE) {
105                     queues.remove();
106                     queues.sendToExecution();
107                 }
108                 
109             };
110         }
111
112         void start() {
113             
114             Thread thread = new Thread() {
115                 
116                 public void run() {
117
118                     synchronized(queues) {
119                         while(!isTerminated()) {
120                             try {
121                                 ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
122                                 for(ArrayList<Runnable> queue : qs) {
123                                     for(Runnable r : queue)
124                                         r.run();
125                                 }
126                                 if(flush != null) {
127                                     for(ArrayList<Runnable> queue : queues.allQueues.values()) {
128                                         for(Runnable r : queue) {
129                                             r.run();   
130                                         }
131                                         queue.clear();
132                                     }
133                                     Semaphore s = flush;
134                                     flush = null;
135                                     s.release();
136                                 }
137                                 queues.wait(1000);
138                             } catch (InterruptedException e) {
139                                 LOGGER.error("Unexpected interrupt", e);
140                             }
141                         }
142                     }
143
144                 };
145                 
146             };
147             
148             thread.setName("QueryListening");
149             thread.start();
150             
151         }
152
153         private Semaphore createFlush() {
154             synchronized(queues) {
155                 flush = new Semaphore(0);
156                 queues.notify();
157                 return flush;
158             }
159         }
160         
161         void flush() {
162             try {
163                 createFlush().acquire();
164             } catch (InterruptedException e) {
165                 LOGGER.error("Unexpected interrupt", e);
166             }
167         }
168
169     }
170
171     private static class RegisterParentRunnable implements Runnable {
172         
173         private final CacheEntry parent;
174         private final CacheEntry child;
175         
176         public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
177             this.parent = parent;
178             this.child = child;
179         }
180
181         @Override
182         public void run() {
183             child.addParent(parent);
184             if (Development.DEVELOPMENT) {
185                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
186                     System.out.println(child + " -> " + parent);
187                 }
188             }
189         }
190         
191     }
192     
193     private static class RegisterListenerRunnable implements Runnable {
194
195         private final QueryListening queryListening;
196         private final ListenerBase base;
197         private final Object procedure;
198         private final CacheEntry parent;
199         private final CacheEntry entry;
200
201         public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
202             this.queryListening = queryListening;
203             this.base = base;
204             this.procedure = procedure;
205             this.parent = parent;
206             this.entry = entry;
207         }
208
209         @Override
210         public void run() {
211
212             assert (entry != null);
213             assert (procedure != null);
214
215             ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
216             if (list == null) {
217                 list = new ArrayList<>(1);
218                 queryListening.listeners.put(entry, list);
219             }
220
221             ListenerEntry result = new ListenerEntry(entry, base, procedure);
222             // Equals is here based on base
223             int currentIndex = list.indexOf(result);
224             // There was already a listener
225             if(currentIndex > -1) {
226                 ListenerEntry current = list.get(currentIndex);
227                 if(!current.base.isDisposed())
228                     return;
229                 list.set(currentIndex, result);
230             } else {
231                 list.add(result);
232             }
233
234             if (Development.DEVELOPMENT) {
235                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
236                     new Exception().printStackTrace();
237                     System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
238                 }
239             }
240
241             queryListening.addedEntries.put(base, result);
242
243         }
244
245         
246     }
247     
248     QueryListening(QueryProcessor processor) {
249         
250         this.processor = processor;
251         scheduler = new Scheduler(processor);
252         consumer = scheduler.newConsumer();
253         scheduler.start();
254         
255     }
256
257     public boolean hasScheduledUpdates() {
258         return !scheduledListeners.isEmpty();
259     }
260     
261     void sync() {
262         try {
263             scheduler.flush();
264         } catch (Throwable t) {
265             LOGGER.error("Error while waiting for query dependency management", t);
266         }
267     }
268
269     void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
270
271         if(inferred) {
272             assert(listener == null);
273             return;
274         }
275
276         if(parent != null) {
277             try {
278                 if(!child.isImmutable(graph))
279                     consumer.accept(new RegisterParentRunnable(parent, child));
280             } catch (DatabaseException e) {
281                 LOGGER.error("Error while registering query dependencies", e);
282             }
283         }
284
285         if(listener != null)
286             if(!listener.isDisposed())
287                 consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
288
289     }
290
291     void registerFirstKnown(ListenerBase base, Object result) {
292         
293         if(base == null) return;
294
295         consumer.accept(() -> {
296             ListenerEntry entry = addedEntries.get(base);
297             if(entry != null) entry.setLastKnown(result);
298         });
299
300     }
301
302     void scheduleListener(ListenerEntry entry) {
303         
304         assert (entry != null);
305         
306         if (Development.DEVELOPMENT) {
307             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
308                 System.err.println("Scheduled " + entry.procedure);
309             }
310         }
311         
312         scheduledListeners.add(entry);
313         
314     }
315
316     boolean hasListener(CacheEntry entry) {
317         if(listeners.get(entry) != null) return true;
318         return false;
319     }
320
321     boolean hasListenerAfterDisposing(CacheEntry entry) {
322         if(listeners.get(entry) != null) {
323             ArrayList<ListenerEntry> entries = listeners.get(entry);
324             ArrayList<ListenerEntry> list = null;
325             for (ListenerEntry e : entries) {
326                 if (e.base.isDisposed()) {
327                     if(list == null) list = new ArrayList<ListenerEntry>();
328                     list.add(e);
329                 }
330             }
331             if(list != null) {
332                 for (ListenerEntry e : list) {
333                     entries.remove(e);
334                 }
335             }
336             if (entries.isEmpty()) {
337                 listeners.remove(entry);
338                 return false;
339             }
340             return true;
341         }
342         return false;
343     }
344
345     void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
346
347         if(!workarea.containsKey(entry)) {
348
349             HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
350             for(ListenerEntry e : getListenerEntries(entry))
351                 ls.add(e.base);
352
353             workarea.put(entry, ls);
354
355             for(CacheEntry parent : entry.getParents(processor)) {
356                 processListenerReport(parent, workarea);
357                 ls.addAll(workarea.get(parent));
358             }
359
360         }
361
362     }
363
364     public synchronized ListenerReport getListenerReport() throws IOException {
365
366         class ListenerReportImpl implements ListenerReport {
367
368             Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
369
370             @Override
371             public void print(PrintStream b) {
372                 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
373                 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
374                     for(ListenerBase l : e.getValue()) {
375                         Integer i = hist.get(l);
376                         hist.put(l, i != null ? i-1 : -1);
377                     }
378                 }
379
380                 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
381                     b.print("" + -p.second + " " + p.first + "\n");
382                 }
383
384                 b.flush();
385             }
386
387         }
388
389         ListenerReportImpl result = new ListenerReportImpl();
390
391         Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
392         for(CacheEntryBase entry : all) {
393             hasListenerAfterDisposing(entry);
394         }
395         for(CacheEntryBase entry : all) {
396             processListenerReport(entry, result.workarea);
397         }
398
399         return result;
400
401     }
402
403     public synchronized String reportListeners(File file) throws IOException {
404
405         if (!processor.isAlive())
406             return "Disposed!";
407
408         PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
409         ListenerReport report = getListenerReport();
410         report.print(b);
411
412         return "Done reporting listeners.";
413
414     }
415
416     public void fireListeners(ReadGraphImpl graph) {
417
418         assert (!processor.updating);
419         assert (!processor.cache.collecting);
420         assert (!firingListeners);
421
422         firingListeners = true;
423
424         try {
425
426             // Performing may cause further events to be scheduled.
427             while (!scheduledListeners.isEmpty()) {
428
429                 // Clone current events to make new entries possible during
430                 // firing.
431                 THashSet<ListenerEntry> entries = scheduledListeners;
432                 scheduledListeners = new THashSet<ListenerEntry>();
433
434                 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
435
436                 for (ListenerEntry listenerEntry : entries) {
437
438                     if (pruneListener(listenerEntry)) {
439                         if (Development.DEVELOPMENT) {
440                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
441                                 new Exception().printStackTrace();
442                                 System.err.println("Pruned " + listenerEntry.procedure);
443                             }
444                         }
445                         continue;
446                     }
447
448                     final CacheEntry entry = listenerEntry.entry;
449                     assert (entry != null);
450
451                     Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
452
453                     if (newValue != ListenerEntry.NOT_CHANGED) {
454                         if (Development.DEVELOPMENT) {
455                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
456                                 new Exception().printStackTrace();
457                                 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
458                             }
459                         }
460                         schedule.add(listenerEntry);
461                         listenerEntry.setLastKnown(entry.getResult());
462                     }
463
464                 }
465
466                 for(ListenerEntry listenerEntry : schedule) {
467                     final CacheEntry entry = listenerEntry.entry;
468                     if (Development.DEVELOPMENT) {
469                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
470                             System.err.println("Firing " + listenerEntry.procedure);
471                         }
472                     }
473                     try {
474                         if (Development.DEVELOPMENT) {
475                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
476                                 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
477                             }
478                         }
479                         entry.performFromCache(graph, listenerEntry.procedure);
480                     } catch (Throwable t) {
481                         LOGGER.error("Unexpected exception ", t);
482                     }
483                 }
484
485             }
486
487         } finally {
488             firingListeners = false;
489         }
490
491     }
492
493     void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
494
495         Iterable<CacheEntry> oldParents = entry.getParents(processor);
496         for (CacheEntry parent : oldParents) {
497             if(!parent.isDiscarded())
498                 todo.push(new UpdateEntry(entry, parent, indent + 2));
499         }
500
501     }
502
503     private boolean pruneListener(ListenerEntry entry) {
504         
505         if (entry.base.isDisposed()) {
506             
507             assert (entry != null);
508             ArrayList<ListenerEntry> list = listeners.get(entry.entry);
509             if(list != null) {
510                 boolean success = list.remove(entry);
511                 assert (success);
512                 if (list.isEmpty())
513                     listeners.remove(entry.entry);
514             }
515
516             return true;
517             
518         } else {
519             
520             return false;
521             
522         }
523     }
524
525     private List<ListenerEntry> getListenerEntries(CacheEntry entry) {
526         hasListenerAfterDisposing(entry);
527         if(listeners.get(entry) != null)
528             return listeners.get(entry);
529         else 
530             return Collections.emptyList();
531     }
532     
533 }