]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
075281035a7802842a2b6ab285700b2a22f29d6b
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryListening.java
1 package org.simantics.db.impl.query;
2
3 import java.io.BufferedOutputStream;
4 import java.io.File;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.Semaphore;
18 import java.util.function.Consumer;
19
20 import org.simantics.databoard.Bindings;
21 import org.simantics.db.DevelopmentKeys;
22 import org.simantics.db.debug.ListenerReport;
23 import org.simantics.db.exception.DatabaseException;
24 import org.simantics.db.impl.graph.ReadGraphImpl;
25 import org.simantics.db.impl.graph.WriteGraphImpl;
26 import org.simantics.db.procedure.ListenerBase;
27 import org.simantics.utils.Development;
28 import org.simantics.utils.datastructures.Pair;
29 import org.simantics.utils.datastructures.collections.CollectionUtils;
30 import org.slf4j.LoggerFactory;
31
32 import gnu.trove.map.hash.THashMap;
33 import gnu.trove.set.hash.THashSet;
34
35 public class QueryListening {
36
37     private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
38
39     private final QueryProcessor                                    processor;
40     private final Scheduler                                         scheduler;
41     private final Consumer<Runnable>                                consumer;
42     private final Map<ListenerBase,ListenerEntry>                   addedEntries = new HashMap<>();
43
44     private THashSet<ListenerEntry>                                 scheduledListeners    = new THashSet<ListenerEntry>();
45     private boolean                                                 firingListeners       = false;
46
47     final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
48
49     private static class ThreadQueue extends ThreadLocal<List<Runnable>> {
50
51         private final Map<Thread,List<Runnable>> allQueues = new HashMap<>();
52         
53         private ArrayList<List<Runnable>> dispatchedQueues = new ArrayList<>();
54
55         @Override
56         protected synchronized List<Runnable> initialValue() {
57             List<Runnable> result = Collections.synchronizedList(new ArrayList<>());
58             allQueues.put(Thread.currentThread(), result);
59             return result;
60         }
61
62         synchronized void sendToExecution() {
63             List<Runnable> rs = allQueues.remove(Thread.currentThread());
64             dispatchedQueues.add(rs);
65             notify();
66         }
67
68         synchronized ArrayList<List<Runnable>> getDispatchedQueues() {
69             ArrayList<List<Runnable>> result = dispatchedQueues;
70             dispatchedQueues = new ArrayList<>();
71             return result;
72         }
73
74     }
75
76     private static class Scheduler {
77
78         private static final int BUFFER_SIZE = 100;
79
80         private final QueryProcessor processor;
81         
82         private final ThreadQueue queues = new ThreadQueue();
83
84         /*
85          * Access to this field is synchronized using the monitor of 'queues'
86          * The method flush() shall be called by the single writing thread and the field has been nulled before the
87          * method can exit. Thus it is not possible for multiple instances of flush Semaphore to exist at the same time.
88          */
89         private Semaphore flush = null;
90
91         Scheduler(QueryProcessor processor) {
92             this.processor = processor;
93         }
94         
95         private boolean isTerminated() {
96             return processor.isDisposed();
97         }
98
99         Consumer<Runnable> newConsumer() {
100             return (task) -> {
101
102                 List<Runnable> l = queues.get();
103                 l.add(task);
104                 if(l.size() == BUFFER_SIZE) {
105                     queues.remove();
106                     queues.sendToExecution();
107                 }
108                 
109             };
110         }
111
112         void start() {
113             
114             Thread thread = new Thread() {
115                 
116                 public void run() {
117
118                     synchronized(queues) {
119                         while(!isTerminated()) {
120                             try {
121                                 ArrayList<List<Runnable>> qs = queues.getDispatchedQueues();
122                                 for(List<Runnable> queue : qs) {
123                                         queue.forEach(r -> r.run());
124                                 }
125                                 if(flush != null) {
126                                     for(List<Runnable> queue : queues.allQueues.values()) {
127                                         // FIXME : using synchronized list is not enough, the list may change between forEach and clear!
128                                         queue.forEach(r -> r.run());
129                                         queue.clear();
130                                     }
131                                     Semaphore s = flush;
132                                     flush = null;
133                                     s.release();
134                                 }
135                                 queues.wait(1000);
136                             } catch (InterruptedException e) {
137                                 LOGGER.error("Unexpected interrupt", e);
138                             }
139                         }
140                     }
141
142                 };
143                 
144             };
145             
146             thread.setName("QueryListening");
147             thread.start();
148             
149         }
150
151         private Semaphore createFlush() {
152             synchronized(queues) {
153                 flush = new Semaphore(0);
154                 queues.notify();
155                 return flush;
156             }
157         }
158         
159         void flush() {
160             try {
161                 createFlush().acquire();
162             } catch (InterruptedException e) {
163                 LOGGER.error("Unexpected interrupt", e);
164             }
165         }
166
167     }
168
169     private static class RegisterParentRunnable implements Runnable {
170         
171         private final CacheEntry parent;
172         private final CacheEntry child;
173         
174         public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
175             this.parent = parent;
176             this.child = child;
177         }
178
179         @Override
180         public void run() {
181             child.addParent(parent);
182             if (Development.DEVELOPMENT) {
183                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
184                     System.out.println(child + " -> " + parent);
185                 }
186             }
187         }
188         
189     }
190     
191     private static class RegisterListenerRunnable implements Runnable {
192
193         private final QueryListening queryListening;
194         private final ListenerBase base;
195         private final Object procedure;
196         private final CacheEntry parent;
197         private final CacheEntry entry;
198
199         public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
200             this.queryListening = queryListening;
201             this.base = base;
202             this.procedure = procedure;
203             this.parent = parent;
204             this.entry = entry;
205         }
206
207         @Override
208         public void run() {
209
210             assert (entry != null);
211             assert (procedure != null);
212
213             ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
214             if (list == null) {
215                 list = new ArrayList<>(1);
216                 queryListening.listeners.put(entry, list);
217             }
218
219             ListenerEntry result = new ListenerEntry(entry, base, procedure);
220             // Equals is here based on base
221             int currentIndex = list.indexOf(result);
222             // There was already a listener
223             if(currentIndex > -1) {
224                 ListenerEntry current = list.get(currentIndex);
225                 if(!current.base.isDisposed())
226                     return;
227                 list.set(currentIndex, result);
228             } else {
229                 list.add(result);
230             }
231
232             if (Development.DEVELOPMENT) {
233                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
234                     new Exception().printStackTrace();
235                     System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
236                 }
237             }
238
239             queryListening.addedEntries.put(base, result);
240
241         }
242
243         
244     }
245     
246     QueryListening(QueryProcessor processor) {
247         
248         this.processor = processor;
249         scheduler = new Scheduler(processor);
250         consumer = scheduler.newConsumer();
251         scheduler.start();
252         
253     }
254
255     public boolean hasScheduledUpdates() {
256         return !scheduledListeners.isEmpty();
257     }
258     
259     void sync() {
260         try {
261             scheduler.flush();
262         } catch (Throwable t) {
263             LOGGER.error("Error while waiting for query dependency management", t);
264         }
265     }
266
267     void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
268
269         if(inferred) {
270             assert(listener == null);
271             return;
272         }
273
274         if(parent != null) {
275             try {
276                 if(!child.isImmutable(graph))
277                     consumer.accept(new RegisterParentRunnable(parent, child));
278             } catch (DatabaseException e) {
279                 LOGGER.error("Error while registering query dependencies", e);
280             }
281         }
282
283         if(listener != null)
284             if(!listener.isDisposed())
285                 consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
286
287     }
288
289     void registerFirstKnown(ListenerBase base, Object result) {
290         
291         if(base == null) return;
292
293         consumer.accept(() -> {
294             ListenerEntry entry = addedEntries.remove(base);
295             if(entry != null) entry.setLastKnown(result);
296         });
297
298     }
299
300     void scheduleListener(ListenerEntry entry) {
301         
302         assert (entry != null);
303         
304         if (Development.DEVELOPMENT) {
305             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
306                 System.err.println("Scheduled " + entry.procedure);
307             }
308         }
309         
310         scheduledListeners.add(entry);
311         
312     }
313
314     boolean hasListener(CacheEntry entry) {
315         if(listeners.get(entry) != null) return true;
316         return false;
317     }
318
319     boolean hasListenerAfterDisposing(CacheEntry entry) {
320         if(listeners.get(entry) != null) {
321             ArrayList<ListenerEntry> entries = listeners.get(entry);
322             ArrayList<ListenerEntry> list = null;
323             for (ListenerEntry e : entries) {
324                 if (e.base.isDisposed()) {
325                     if(list == null) list = new ArrayList<ListenerEntry>();
326                     list.add(e);
327                 }
328             }
329             if(list != null) {
330                 for (ListenerEntry e : list) {
331                     entries.remove(e);
332                 }
333             }
334             if (entries.isEmpty()) {
335                 listeners.remove(entry);
336                 return false;
337             }
338             return true;
339         }
340         return false;
341     }
342
343     void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
344
345         if(!workarea.containsKey(entry)) {
346
347             HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
348             for(ListenerEntry e : getListenerEntries(entry))
349                 ls.add(e.base);
350
351             workarea.put(entry, ls);
352
353             for(CacheEntry parent : entry.getParents(processor)) {
354                 processListenerReport(parent, workarea);
355                 ls.addAll(workarea.get(parent));
356             }
357
358         }
359
360     }
361
362     public synchronized ListenerReport getListenerReport() throws IOException {
363
364         class ListenerReportImpl implements ListenerReport {
365
366             Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
367
368             @Override
369             public void print(PrintStream b) {
370                 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
371                 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
372                     for(ListenerBase l : e.getValue()) {
373                         Integer i = hist.get(l);
374                         hist.put(l, i != null ? i-1 : -1);
375                     }
376                 }
377
378                 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
379                     b.print("" + -p.second + " " + p.first + "\n");
380                 }
381
382                 b.flush();
383             }
384
385         }
386
387         ListenerReportImpl result = new ListenerReportImpl();
388
389         Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
390         for(CacheEntryBase entry : all) {
391             hasListenerAfterDisposing(entry);
392         }
393         for(CacheEntryBase entry : all) {
394             processListenerReport(entry, result.workarea);
395         }
396
397         return result;
398
399     }
400
401     public synchronized String reportListeners(File file) throws IOException {
402
403         if (!processor.isAlive())
404             return "Disposed!";
405
406         PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
407         ListenerReport report = getListenerReport();
408         report.print(b);
409
410         return "Done reporting listeners.";
411
412     }
413
414     public void fireListeners(ReadGraphImpl graph) {
415
416         assert (!processor.updating);
417         assert (!processor.cache.collecting);
418         assert (!firingListeners);
419
420         firingListeners = true;
421
422         try {
423
424             // Performing may cause further events to be scheduled.
425             while (!scheduledListeners.isEmpty()) {
426
427                 // Clone current events to make new entries possible during
428                 // firing.
429                 THashSet<ListenerEntry> entries = scheduledListeners;
430                 scheduledListeners = new THashSet<ListenerEntry>();
431
432                 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
433
434                 for (ListenerEntry listenerEntry : entries) {
435
436                     if (pruneListener(listenerEntry)) {
437                         if (Development.DEVELOPMENT) {
438                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
439                                 new Exception().printStackTrace();
440                                 System.err.println("Pruned " + listenerEntry.procedure);
441                             }
442                         }
443                         continue;
444                     }
445
446                     final CacheEntry entry = listenerEntry.entry;
447                     assert (entry != null);
448
449                     Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
450
451                     if (newValue != ListenerEntry.NOT_CHANGED) {
452                         if (Development.DEVELOPMENT) {
453                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
454                                 new Exception().printStackTrace();
455                                 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
456                             }
457                         }
458                         schedule.add(listenerEntry);
459                         listenerEntry.setLastKnown(entry.getResult());
460                     }
461
462                 }
463
464                 for(ListenerEntry listenerEntry : schedule) {
465                     final CacheEntry entry = listenerEntry.entry;
466                     if (Development.DEVELOPMENT) {
467                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
468                             System.err.println("Firing " + listenerEntry.procedure);
469                         }
470                     }
471                     try {
472                         if (Development.DEVELOPMENT) {
473                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
474                                 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
475                             }
476                         }
477                         entry.performFromCache(graph, listenerEntry.procedure);
478                     } catch (Throwable t) {
479                         LOGGER.error("Unexpected exception ", t);
480                     }
481                 }
482
483             }
484
485         } finally {
486             firingListeners = false;
487         }
488
489     }
490
491     void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
492
493         Iterable<CacheEntry> oldParents = entry.getParents(processor);
494         for (CacheEntry parent : oldParents) {
495             if(!parent.isDiscarded())
496                 todo.push(new UpdateEntry(entry, parent, indent + 2));
497         }
498
499     }
500
501     private boolean pruneListener(ListenerEntry entry) {
502         
503         if (entry.base.isDisposed()) {
504             
505             assert (entry != null);
506             ArrayList<ListenerEntry> list = listeners.get(entry.entry);
507             if(list != null) {
508                 boolean success = list.remove(entry);
509                 assert (success);
510                 if (list.isEmpty())
511                     listeners.remove(entry.entry);
512             }
513
514             return true;
515             
516         } else {
517             
518             return false;
519             
520         }
521     }
522
523     private List<ListenerEntry> getListenerEntries(CacheEntry entry) {
524         hasListenerAfterDisposing(entry);
525         if(listeners.get(entry) != null)
526             return listeners.get(entry);
527         else 
528             return Collections.emptyList();
529     }
530     
531 }