]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
b873bfd73ef8f9b97773977b2459d7cdb502e9c6
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryListening.java
1 package org.simantics.db.impl.query;
2
3 import java.io.BufferedOutputStream;
4 import java.io.File;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.ThreadFactory;
19 import java.util.concurrent.atomic.AtomicInteger;
20 import java.util.function.Consumer;
21
22 import org.simantics.databoard.Bindings;
23 import org.simantics.db.DevelopmentKeys;
24 import org.simantics.db.debug.ListenerReport;
25 import org.simantics.db.exception.DatabaseException;
26 import org.simantics.db.impl.graph.ReadGraphImpl;
27 import org.simantics.db.impl.graph.WriteGraphImpl;
28 import org.simantics.db.procedure.ListenerBase;
29 import org.simantics.utils.Development;
30 import org.simantics.utils.datastructures.Pair;
31 import org.simantics.utils.datastructures.collections.CollectionUtils;
32 import org.slf4j.LoggerFactory;
33
34 import gnu.trove.map.hash.THashMap;
35 import gnu.trove.set.hash.THashSet;
36
37 public class QueryListening {
38
39     private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
40
41     final private QueryProcessor                            processor;
42     private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();
43     private boolean                                         firingListeners       = false;
44     final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
45     private EventBusImpl                                    eventBus;
46     private Consumer<Runnable>                              consumer;
47     private Map<ListenerBase,ListenerEntry>                 addedEntries = new HashMap<>();
48
49     int execCount=0;
50     int queueCount = 0;
51
52     static class TL extends ThreadLocal<ArrayList<Runnable>> {
53
54         Map<Thread,ArrayList<Runnable>> allQueues = new HashMap<>();
55         ArrayList<ArrayList<Runnable>> dispatchedQueues = new ArrayList<>();
56
57         @Override
58         protected synchronized ArrayList<Runnable> initialValue() {
59             ArrayList<Runnable> result = new ArrayList<>();
60             allQueues.put(Thread.currentThread(), result);
61             return result;
62         }
63
64         synchronized void sendToExecution() {
65             ArrayList<Runnable> rs = allQueues.remove(Thread.currentThread());
66             dispatchedQueues.add(rs);
67             notify();
68         }
69
70         synchronized ArrayList<ArrayList<Runnable>> getDispatchedQueues() {
71             ArrayList<ArrayList<Runnable>> result = dispatchedQueues;
72             dispatchedQueues = new ArrayList<>();
73             return result;
74         }
75
76     }
77
78     static class EventBusImpl {
79
80         private static final int BUFFER_SIZE = 100;
81
82         TL queues = new TL();
83         Semaphore flush = null;
84
85         public boolean isTerminated() {
86             return false;
87         }
88
89         public Consumer<Runnable> newConsumer() {
90             return new Consumer<Runnable>() {
91
92                 @Override
93                 public void accept(Runnable arg0) {
94                     ArrayList<Runnable> l = queues.get();
95                     l.add(arg0);
96                     if(l.size() == BUFFER_SIZE) {
97                         queues.remove();
98                         queues.sendToExecution();
99                     }
100                 }
101             };
102         }
103
104         public void shutdown() {
105         }
106
107         public void start(ThreadFactory arg1) {
108             arg1.newThread(new Runnable() {
109
110                 @Override
111                 public void run() {
112                     synchronized(queues) {
113                         while(!isTerminated()) {
114                             try {
115                                 ArrayList<ArrayList<Runnable>> qs = queues.getDispatchedQueues();
116                                 for(ArrayList<Runnable> queue : qs) {
117                                     for(Runnable r : queue)
118                                         r.run();
119                                 }
120                                 if(flush != null) {
121                                     for(ArrayList<Runnable> queue : queues.allQueues.values()) {
122                                         for(Runnable r : queue) {
123                                             r.run();   
124                                         }
125                                         queue.clear();
126                                     }
127                                     Semaphore s = flush;
128                                     flush = null;
129                                     s.release();
130                                 }
131                                 queues.wait(1000);
132                             } catch (InterruptedException e) {
133                                 e.printStackTrace();
134                             }
135                         }
136                     }
137                 }
138
139             }).start();
140         }
141
142         void flush() {
143             synchronized(queues) {
144                 flush = new Semaphore(0);
145                 queues.notify();
146             }
147             try {
148                 flush.acquire();
149             } catch (InterruptedException e) {
150                 e.printStackTrace();
151             }
152         }
153
154     }
155
156     QueryListening(QueryProcessor processor) {
157         this.processor = processor;
158         eventBus = new EventBusImpl();
159         consumer = eventBus.newConsumer();
160         eventBus.start(new ThreadFactory()
161         {
162             @Override
163             public Thread newThread(Runnable r) {
164                 System.err.println("new Thread " + r);
165                 return new Thread(r, "QueryDependencyManager");
166             }
167         });
168     }
169
170     public void sync() {
171         try {
172             eventBus.flush();
173         } catch (Throwable t) {
174             LOGGER.error("Error while waiting for query dependency management", t);
175         }
176     }
177
178     public boolean hasScheduledUpdates() {
179         return !scheduledListeners.isEmpty();
180     }
181
182     public synchronized void dispatch(Runnable r) {
183         queueCount++;
184         consumer.accept(r);
185     }
186
187     void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
188
189         dispatch(() -> {
190
191             if (parent != null && !inferred) {
192                 try {
193                     if(!child.isImmutable(graph))
194                         child.addParent(parent);
195                 } catch (DatabaseException e) {
196                     LOGGER.error("Error while registering query dependencies", e);
197                 }
198                 if (Development.DEVELOPMENT) {
199                     if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
200                         System.out.println(child + " -> " + parent);
201                     }
202                 }
203             }
204
205             if (listener != null)
206                 registerListener(child, listener, procedure);
207
208         });
209
210     }
211
212     void registerFirstKnown(ListenerBase base, Object result) {
213         
214         if(base == null) return;
215
216         dispatch(() -> {
217
218             ListenerEntry entry = addedEntries.get(base);
219             if(entry != null) entry.setLastKnown(result);
220
221         });
222
223     }
224
225     public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
226
227         assert (entry != null);
228
229         if (base.isDisposed())
230             return null;
231
232         return addListener(entry, base, procedure);
233
234     }
235
236     /*
237      * Registers a listener and returns an entry iff the entry was added
238      */
239     private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
240
241         assert (entry != null);
242         assert (procedure != null);
243
244         ArrayList<ListenerEntry> list = listeners.get(entry);
245         if (list == null) {
246             list = new ArrayList<>(1);
247             listeners.put(entry, list);
248         }
249
250         ListenerEntry result = new ListenerEntry(entry, base, procedure);
251         // Equals is here based on base
252         int currentIndex = list.indexOf(result);
253         // There was already a listener
254         if(currentIndex > -1) {
255             ListenerEntry current = list.get(currentIndex);
256             if(!current.base.isDisposed()) return null;
257             list.set(currentIndex, result);
258         } else {
259             list.add(result);
260         }
261
262         if (Development.DEVELOPMENT) {
263             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
264                 new Exception().printStackTrace();
265                 System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
266             }
267         }
268
269         addedEntries.put(base, result);
270
271         return result;
272
273     }
274
275     void scheduleListener(ListenerEntry entry) {
276         assert (entry != null);
277         if (Development.DEVELOPMENT) {
278             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
279                 System.err.println("Scheduled " + entry.procedure);
280             }
281         }
282         scheduledListeners.add(entry);
283     }
284
285     private void removeListener(ListenerEntry entry) {
286         assert (entry != null);
287         ArrayList<ListenerEntry> list = listeners.get(entry.entry);
288         if(list == null) return;
289         boolean success = list.remove(entry);
290         assert (success);
291         if (list.isEmpty())
292             listeners.remove(entry.entry);
293     }
294
295     boolean hasListener(CacheEntry entry) {
296         if(listeners.get(entry) != null) return true;
297         return false;
298     }
299
300     boolean hasListenerAfterDisposing(CacheEntry entry) {
301         if(listeners.get(entry) != null) {
302             ArrayList<ListenerEntry> entries = listeners.get(entry);
303             ArrayList<ListenerEntry> list = null;
304             for (ListenerEntry e : entries) {
305                 if (e.base.isDisposed()) {
306                     if(list == null) list = new ArrayList<ListenerEntry>();
307                     list.add(e);
308                 }
309             }
310             if(list != null) {
311                 for (ListenerEntry e : list) {
312                     entries.remove(e);
313                 }
314             }
315             if (entries.isEmpty()) {
316                 listeners.remove(entry);
317                 return false;
318             }
319             return true;
320         }
321         return false;
322     }
323
324     List<ListenerEntry> getListenerEntries(CacheEntry entry) {
325         hasListenerAfterDisposing(entry);
326         if(listeners.get(entry) != null)
327             return listeners.get(entry);
328         else 
329             return Collections.emptyList();
330     }
331
332     void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
333
334         if(!workarea.containsKey(entry)) {
335
336             HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
337             for(ListenerEntry e : getListenerEntries(entry))
338                 ls.add(e.base);
339
340             workarea.put(entry, ls);
341
342             for(CacheEntry parent : entry.getParents(processor)) {
343                 processListenerReport(parent, workarea);
344                 ls.addAll(workarea.get(parent));
345             }
346
347         }
348
349     }
350
351     public synchronized ListenerReport getListenerReport() throws IOException {
352
353         class ListenerReportImpl implements ListenerReport {
354
355             Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
356
357             @Override
358             public void print(PrintStream b) {
359                 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
360                 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
361                     for(ListenerBase l : e.getValue()) {
362                         Integer i = hist.get(l);
363                         hist.put(l, i != null ? i-1 : -1);
364                     }
365                 }
366
367                 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
368                     b.print("" + -p.second + " " + p.first + "\n");
369                 }
370
371                 b.flush();
372             }
373
374         }
375
376         ListenerReportImpl result = new ListenerReportImpl();
377
378         Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
379         for(CacheEntryBase entry : all) {
380             hasListenerAfterDisposing(entry);
381         }
382         for(CacheEntryBase entry : all) {
383             processListenerReport(entry, result.workarea);
384         }
385
386         return result;
387
388     }
389
390     public synchronized String reportListeners(File file) throws IOException {
391
392         if (!processor.isAlive())
393             return "Disposed!";
394
395         PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
396         ListenerReport report = getListenerReport();
397         report.print(b);
398
399         return "Done reporting listeners.";
400
401     }
402
403     public void fireListeners(WriteGraphImpl graph) {
404
405         assert (!processor.updating);
406         assert (!processor.cache.collecting);
407         assert (!firingListeners);
408
409         firingListeners = true;
410
411         try {
412
413             // Performing may cause further events to be scheduled.
414             while (!scheduledListeners.isEmpty()) {
415
416                 // Clone current events to make new entries possible during
417                 // firing.
418                 THashSet<ListenerEntry> entries = scheduledListeners;
419                 scheduledListeners = new THashSet<ListenerEntry>();
420
421                 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
422
423                 for (ListenerEntry listenerEntry : entries) {
424
425                     if (pruneListener(listenerEntry)) {
426                         if (Development.DEVELOPMENT) {
427                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
428                                 new Exception().printStackTrace();
429                                 System.err.println("Pruned " + listenerEntry.procedure);
430                             }
431                         }
432                         continue;
433                     }
434
435                     final CacheEntry entry = listenerEntry.entry;
436                     assert (entry != null);
437
438                     Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
439
440                     if (newValue != ListenerEntry.NOT_CHANGED) {
441                         if (Development.DEVELOPMENT) {
442                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
443                                 new Exception().printStackTrace();
444                                 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
445                             }
446                         }
447                         schedule.add(listenerEntry);
448                         listenerEntry.setLastKnown(entry.getResult());
449                     }
450
451                 }
452
453                 for(ListenerEntry listenerEntry : schedule) {
454                     final CacheEntry entry = listenerEntry.entry;
455                     if (Development.DEVELOPMENT) {
456                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
457                             System.err.println("Firing " + listenerEntry.procedure);
458                         }
459                     }
460                     try {
461                         if (Development.DEVELOPMENT) {
462                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
463                                 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
464                             }
465                         }
466                         entry.performFromCache(graph, listenerEntry.procedure);
467                     } catch (Throwable t) {
468                         t.printStackTrace();
469                     }
470                 }
471
472             }
473
474         } finally {
475             firingListeners = false;
476         }
477
478     }
479
480     void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
481
482         Iterable<CacheEntry> oldParents = entry.getParents(processor);
483         for (CacheEntry parent : oldParents) {
484             if(!parent.isDiscarded())
485                 todo.push(new UpdateEntry(entry, parent, indent + 2));
486         }
487
488     }
489
490     private boolean pruneListener(ListenerEntry entry) {
491         if (entry.base.isDisposed()) {
492             removeListener(entry);
493             return true;
494         } else {
495             return false;
496         }
497     }
498
499 }