]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
Simupedia tuning step 2
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryListening.java
1 package org.simantics.db.impl.query;
2
3 import java.io.BufferedOutputStream;
4 import java.io.File;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.ThreadFactory;
19 import java.util.concurrent.atomic.AtomicInteger;
20 import java.util.function.Consumer;
21
22 import org.simantics.databoard.Bindings;
23 import org.simantics.db.DevelopmentKeys;
24 import org.simantics.db.debug.ListenerReport;
25 import org.simantics.db.exception.DatabaseException;
26 import org.simantics.db.impl.graph.ReadGraphImpl;
27 import org.simantics.db.impl.graph.WriteGraphImpl;
28 import org.simantics.db.procedure.ListenerBase;
29 import org.simantics.utils.Development;
30 import org.simantics.utils.datastructures.Pair;
31 import org.simantics.utils.datastructures.collections.CollectionUtils;
32 import org.slf4j.LoggerFactory;
33
34 import gnu.trove.map.hash.THashMap;
35 import gnu.trove.set.hash.THashSet;
36
37 public class QueryListening {
38
39     private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
40
41     final private QueryProcessor                            processor;
42     private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();
43     private boolean                                         firingListeners       = false;
44     final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
45     private EventBusImpl                                    eventBus;
46     private Consumer<Runnable>                              consumer;
47     private Map<ListenerBase,ListenerEntry>                 addedEntries = new HashMap<>();
48
49     static class TL extends ThreadLocal<ArrayList<Runnable>> {
50
51         Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
52         ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
53
54         @Override
55         protected synchronized ArrayList<Runnable> initialValue() {
56             ArrayList<Runnable> result = new ArrayList<>();
57             allQueues.put(Thread.currentThread(), result);
58             return result;
59         }
60
61         synchronized void sendToExecution() {
62             ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
63             dispatchedQueues.add(rs);
64             notify();
65         }
66
67         synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
68             ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
69             dispatchedQueues = new ArrayList<>();
70             return result;
71         }
72
73     }
74
75     static class EventBusImpl {
76
77         private static final int BUFFER_SIZE = 100;
78
79         TL queues = new TL();
80         Semaphore flush = null;
81
82         public boolean isTerminated() {
83             return false;
84         }
85
86         public Consumer<Runnable> newConsumer() {
87             return new Consumer<Runnable>() {
88
89                 @Override
90                 public void accept(Runnable arg0) {
91                     ArrayList<Runnable> l = queues.get();
92                     l.add(arg0);
93                     if(l.size() == BUFFER_SIZE) {
94                         queues.remove();
95                         queues.sendToExecution();
96                     }
97                 }
98             };
99         }
100
101         public void shutdown() {
102         }
103
104         public void start(ThreadFactory arg1) {
105             arg1.newThread(new Runnable() {
106
107                 @Override
108                 public void run() {
109                     synchronized(queues) {
110                         while(!isTerminated()) {
111                             try {
112                                 ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
113                                 for(ArrayList<Runnable> queue : qs) {
114                                     for(Runnable r : queue)
115                                         r.run();
116                                 }
117                                 if(flush != null) {
118                                     for(ArrayList<Runnable> queue : queues.allQueues.values()) {
119                                         for(Runnable r : queue) {
120                                             r.run();   
121                                         }
122                                         queue.clear();
123                                     }
124                                     Semaphore s = flush;
125                                     flush = null;
126                                     s.release();
127                                 }
128                                 queues.wait(1000);
129                             } catch (InterruptedException e) {
130                                 e.printStackTrace();
131                             }
132                         }
133                     }
134                 }
135
136             }).start();
137         }
138
139         void flush() {
140             synchronized(queues) {
141                 flush = new Semaphore(0);
142                 queues.notify();
143             }
144             try {
145                 flush.acquire();
146             } catch (InterruptedException e) {
147                 e.printStackTrace();
148             }
149         }
150
151     }
152
153     QueryListening(QueryProcessor processor) {
154         this.processor = processor;
155         eventBus = new EventBusImpl();
156         consumer = eventBus.newConsumer();
157         eventBus.start(new ThreadFactory()
158         {
159             @Override
160             public Thread newThread(Runnable r) {
161                 System.err.println("new Thread " + r);
162                 return new Thread(r, "QueryDependencyManager");
163             }
164         });
165     }
166
167     public void sync() {
168         try {
169             eventBus.flush();
170         } catch (Throwable t) {
171             LOGGER.error("Error while waiting for query dependency management", t);
172         }
173     }
174
175     public boolean hasScheduledUpdates() {
176         return !scheduledListeners.isEmpty();
177     }
178
179     static class RegisterParentRunnable implements Runnable {
180         
181         final CacheEntry parent;
182         final CacheEntry child;
183         
184         public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
185             this.parent = parent;
186             this.child = child;
187         }
188
189         @Override
190         public void run() {
191             child.addParent(parent);
192             if (Development.DEVELOPMENT) {
193                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
194                     System.out.println(child + " -> " + parent);
195                 }
196             }
197         }
198         
199     }
200     
201     static class RegisterListenerRunnable implements Runnable {
202
203         final QueryListening queryListening;
204         final ListenerBase base;
205         final Object procedure;
206         final CacheEntry parent;
207         final CacheEntry entry;
208
209         public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
210             this.queryListening = queryListening;
211             this.base = base;; 
212             this.procedure = procedure;
213             this.parent = parent;
214             this.entry = entry;
215         }
216
217         @Override
218         public void run() {
219             queryListening.addListener(entry, base, procedure);
220         }
221         
222     }
223
224     void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
225
226         if(inferred) {
227             assert(listener == null);
228             return;
229         }
230
231         if(parent != null) {
232             try {
233                 if(!child.isImmutable(graph))
234                     consumer.accept(new RegisterParentRunnable(parent, child));
235             } catch (DatabaseException e) {
236                 LOGGER.error("Error while registering query dependencies", e);
237             }
238         }
239
240         if(listener != null)
241             if(!listener.isDisposed())
242                 consumer.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
243
244     }
245
246     void registerFirstKnown(ListenerBase base, Object result) {
247         
248         if(base == null) return;
249
250         consumer.accept(() -> {
251             ListenerEntry entry = addedEntries.get(base);
252             if(entry != null) entry.setLastKnown(result);
253         });
254
255     }
256
257     /*
258      * Registers a listener and returns an entry iff the entry was added
259      */
260     private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
261
262         assert (entry != null);
263         assert (procedure != null);
264
265         ArrayList<ListenerEntry> list = listeners.get(entry);
266         if (list == null) {
267             list = new ArrayList<>(1);
268             listeners.put(entry, list);
269         }
270
271         ListenerEntry result = new ListenerEntry(entry, base, procedure);
272         // Equals is here based on base
273         int currentIndex = list.indexOf(result);
274         // There was already a listener
275         if(currentIndex > -1) {
276             ListenerEntry current = list.get(currentIndex);
277             if(!current.base.isDisposed()) return null;
278             list.set(currentIndex, result);
279         } else {
280             list.add(result);
281         }
282
283         if (Development.DEVELOPMENT) {
284             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
285                 new Exception().printStackTrace();
286                 System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
287             }
288         }
289
290         addedEntries.put(base, result);
291
292         return result;
293
294     }
295
296     void scheduleListener(ListenerEntry entry) {
297         assert (entry != null);
298         if (Development.DEVELOPMENT) {
299             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
300                 System.err.println("Scheduled " + entry.procedure);
301             }
302         }
303         scheduledListeners.add(entry);
304     }
305
306     private void removeListener(ListenerEntry entry) {
307         assert (entry != null);
308         ArrayList<ListenerEntry> list = listeners.get(entry.entry);
309         if(list == null) return;
310         boolean success = list.remove(entry);
311         assert (success);
312         if (list.isEmpty())
313             listeners.remove(entry.entry);
314     }
315
316     boolean hasListener(CacheEntry entry) {
317         if(listeners.get(entry) != null) return true;
318         return false;
319     }
320
321     boolean hasListenerAfterDisposing(CacheEntry entry) {
322         if(listeners.get(entry) != null) {
323             ArrayList<ListenerEntry> entries = listeners.get(entry);
324             ArrayList<ListenerEntry> list = null;
325             for (ListenerEntry e : entries) {
326                 if (e.base.isDisposed()) {
327                     if(list == null) list = new ArrayList<ListenerEntry>();
328                     list.add(e);
329                 }
330             }
331             if(list != null) {
332                 for (ListenerEntry e : list) {
333                     entries.remove(e);
334                 }
335             }
336             if (entries.isEmpty()) {
337                 listeners.remove(entry);
338                 return false;
339             }
340             return true;
341         }
342         return false;
343     }
344
345     List<ListenerEntry> getListenerEntries(CacheEntry entry) {
346         hasListenerAfterDisposing(entry);
347         if(listeners.get(entry) != null)
348             return listeners.get(entry);
349         else 
350             return Collections.emptyList();
351     }
352
353     void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
354
355         if(!workarea.containsKey(entry)) {
356
357             HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
358             for(ListenerEntry e : getListenerEntries(entry))
359                 ls.add(e.base);
360
361             workarea.put(entry, ls);
362
363             for(CacheEntry parent : entry.getParents(processor)) {
364                 processListenerReport(parent, workarea);
365                 ls.addAll(workarea.get(parent));
366             }
367
368         }
369
370     }
371
372     public synchronized ListenerReport getListenerReport() throws IOException {
373
374         class ListenerReportImpl implements ListenerReport {
375
376             Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
377
378             @Override
379             public void print(PrintStream b) {
380                 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
381                 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
382                     for(ListenerBase l : e.getValue()) {
383                         Integer i = hist.get(l);
384                         hist.put(l, i != null ? i-1 : -1);
385                     }
386                 }
387
388                 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
389                     b.print("" + -p.second + " " + p.first + "\n");
390                 }
391
392                 b.flush();
393             }
394
395         }
396
397         ListenerReportImpl result = new ListenerReportImpl();
398
399         Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
400         for(CacheEntryBase entry : all) {
401             hasListenerAfterDisposing(entry);
402         }
403         for(CacheEntryBase entry : all) {
404             processListenerReport(entry, result.workarea);
405         }
406
407         return result;
408
409     }
410
411     public synchronized String reportListeners(File file) throws IOException {
412
413         if (!processor.isAlive())
414             return "Disposed!";
415
416         PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
417         ListenerReport report = getListenerReport();
418         report.print(b);
419
420         return "Done reporting listeners.";
421
422     }
423
424     public void fireListeners(WriteGraphImpl graph) {
425
426         assert (!processor.updating);
427         assert (!processor.cache.collecting);
428         assert (!firingListeners);
429
430         firingListeners = true;
431
432         try {
433
434             // Performing may cause further events to be scheduled.
435             while (!scheduledListeners.isEmpty()) {
436
437                 // Clone current events to make new entries possible during
438                 // firing.
439                 THashSet<ListenerEntry> entries = scheduledListeners;
440                 scheduledListeners = new THashSet<ListenerEntry>();
441
442                 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
443
444                 for (ListenerEntry listenerEntry : entries) {
445
446                     if (pruneListener(listenerEntry)) {
447                         if (Development.DEVELOPMENT) {
448                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
449                                 new Exception().printStackTrace();
450                                 System.err.println("Pruned " + listenerEntry.procedure);
451                             }
452                         }
453                         continue;
454                     }
455
456                     final CacheEntry entry = listenerEntry.entry;
457                     assert (entry != null);
458
459                     Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
460
461                     if (newValue != ListenerEntry.NOT_CHANGED) {
462                         if (Development.DEVELOPMENT) {
463                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
464                                 new Exception().printStackTrace();
465                                 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
466                             }
467                         }
468                         schedule.add(listenerEntry);
469                         listenerEntry.setLastKnown(entry.getResult());
470                     }
471
472                 }
473
474                 for(ListenerEntry listenerEntry : schedule) {
475                     final CacheEntry entry = listenerEntry.entry;
476                     if (Development.DEVELOPMENT) {
477                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
478                             System.err.println("Firing " + listenerEntry.procedure);
479                         }
480                     }
481                     try {
482                         if (Development.DEVELOPMENT) {
483                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
484                                 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
485                             }
486                         }
487                         entry.performFromCache(graph, listenerEntry.procedure);
488                     } catch (Throwable t) {
489                         t.printStackTrace();
490                     }
491                 }
492
493             }
494
495         } finally {
496             firingListeners = false;
497         }
498
499     }
500
501     void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
502
503         Iterable<CacheEntry> oldParents = entry.getParents(processor);
504         for (CacheEntry parent : oldParents) {
505             if(!parent.isDiscarded())
506                 todo.push(new UpdateEntry(entry, parent, indent + 2));
507         }
508
509     }
510
511     private boolean pruneListener(ListenerEntry entry) {
512         if (entry.base.isDisposed()) {
513             removeListener(entry);
514             return true;
515         } else {
516             return false;
517         }
518     }
519
520 }