91200f1fc3ad037393bc5e3794d6acc4c29a4e7b
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryListening.java
1 package org.simantics.db.impl.query;
2
3 import java.io.BufferedOutputStream;
4 import java.io.File;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17
18 import org.simantics.databoard.Bindings;
19 import org.simantics.db.DevelopmentKeys;
20 import org.simantics.db.debug.ListenerReport;
21 import org.simantics.db.exception.DatabaseException;
22 import org.simantics.db.impl.graph.ReadGraphImpl;
23 import org.simantics.db.procedure.ListenerBase;
24 import org.simantics.utils.Development;
25 import org.simantics.utils.datastructures.Pair;
26 import org.simantics.utils.datastructures.collections.CollectionUtils;
27 import org.slf4j.LoggerFactory;
28
29 import gnu.trove.map.hash.THashMap;
30 import gnu.trove.set.hash.THashSet;
31
32 public class QueryListening {
33
34     static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
35
36     private final QueryProcessor                                    processor;
37     private final Scheduler                                         scheduler;
38     private final Map<ListenerBase,ListenerEntry>                   addedEntries = new HashMap<>();
39
40     private THashSet<ListenerEntry>                                 scheduledListeners    = new THashSet<ListenerEntry>();
41     private boolean                                                 firingListeners       = false;
42
43     final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
44
45     
46     QueryListening(QueryProcessor processor) {
47         
48         this.processor = processor;
49         scheduler = new Scheduler(processor);
50         scheduler.start();
51         
52     }
53
54     public boolean hasScheduledUpdates() {
55         return !scheduledListeners.isEmpty();
56     }
57     
58     void sync() {
59         try {
60             scheduler.sync();
61         } catch (Throwable t) {
62             LOGGER.error("Error while waiting for query dependency management", t);
63         }
64     }
65
66     void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
67
68         if(inferred) {
69             assert(listener == null);
70             return;
71         }
72
73         if(parent != null) {
74             try {
75                 if(!child.isImmutable(graph))
76                     scheduler.accept(new RegisterParentRunnable(parent, child));
77             } catch (DatabaseException e) {
78                 LOGGER.error("Error while registering query dependencies", e);
79             }
80         }
81
82         if(listener != null)
83             if(!listener.isDisposed())
84                 scheduler.accept(new RegisterListenerRunnable(this, listener, procedure, parent, child));
85
86     }
87
88     void registerFirstKnown(ListenerBase base, Object result) {
89         
90         if(base == null) return;
91
92         scheduler.accept(new RegisterFirstKnownRunnable(addedEntries, base, result));
93
94     }
95
96     void scheduleListener(ListenerEntry entry) {
97         
98         assert (entry != null);
99         
100         if (Development.DEVELOPMENT) {
101             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
102                 System.err.println("Scheduled " + entry.procedure);
103             }
104         }
105         
106         scheduledListeners.add(entry);
107         
108     }
109
110     boolean hasListener(CacheEntry entry) {
111         if(listeners.get(entry) != null) return true;
112         return false;
113     }
114
115     boolean hasListenerAfterDisposing(CacheEntry entry) {
116         if(listeners.get(entry) != null) {
117             ArrayList<ListenerEntry> entries = listeners.get(entry);
118             ArrayList<ListenerEntry> list = null;
119             for (ListenerEntry e : entries) {
120                 if (e.base.isDisposed()) {
121                     if(list == null) list = new ArrayList<ListenerEntry>();
122                     list.add(e);
123                 }
124             }
125             if(list != null) {
126                 for (ListenerEntry e : list) {
127                     entries.remove(e);
128                 }
129             }
130             if (entries.isEmpty()) {
131                 listeners.remove(entry);
132                 return false;
133             }
134             return true;
135         }
136         return false;
137     }
138
139     void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
140
141         if(!workarea.containsKey(entry)) {
142
143             HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
144             for(ListenerEntry e : getListenerEntries(entry))
145                 ls.add(e.base);
146
147             workarea.put(entry, ls);
148
149             for(CacheEntry parent : entry.getParents(processor)) {
150                 processListenerReport(parent, workarea);
151                 ls.addAll(workarea.get(parent));
152             }
153
154         }
155
156     }
157
158     public synchronized ListenerReport getListenerReport() throws IOException {
159
160         class ListenerReportImpl implements ListenerReport {
161
162             Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
163
164             @Override
165             public void print(PrintStream b) {
166                 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
167                 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
168                     for(ListenerBase l : e.getValue()) {
169                         Integer i = hist.get(l);
170                         hist.put(l, i != null ? i-1 : -1);
171                     }
172                 }
173
174                 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
175                     b.print("" + -p.second + " " + p.first + "\n");
176                 }
177
178                 b.flush();
179             }
180
181         }
182
183         ListenerReportImpl result = new ListenerReportImpl();
184
185         Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
186         for(CacheEntryBase entry : all) {
187             hasListenerAfterDisposing(entry);
188         }
189         for(CacheEntryBase entry : all) {
190             processListenerReport(entry, result.workarea);
191         }
192
193         return result;
194
195     }
196
197     public synchronized String reportListeners(File file) throws IOException {
198
199         if (!processor.isAlive())
200             return "Disposed!";
201
202         PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
203         ListenerReport report = getListenerReport();
204         report.print(b);
205
206         return "Done reporting listeners.";
207
208     }
209
210     public void fireListeners(ReadGraphImpl graph) {
211
212         assert (!processor.updating);
213         assert (!processor.cache.collecting);
214         assert (!firingListeners);
215
216         firingListeners = true;
217
218         try {
219
220             // Performing may cause further events to be scheduled.
221             while (!scheduledListeners.isEmpty()) {
222
223                 // Clone current events to make new entries possible during
224                 // firing.
225                 THashSet<ListenerEntry> entries = scheduledListeners;
226                 scheduledListeners = new THashSet<ListenerEntry>();
227
228                 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
229
230                 for (ListenerEntry listenerEntry : entries) {
231
232                     if (pruneListener(listenerEntry)) {
233                         if (Development.DEVELOPMENT) {
234                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
235                                 new Exception().printStackTrace();
236                                 System.err.println("Pruned " + listenerEntry.procedure);
237                             }
238                         }
239                         continue;
240                     }
241
242                     final CacheEntry entry = listenerEntry.entry;
243                     assert (entry != null);
244
245                     Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
246
247                     if (newValue != ListenerEntry.NOT_CHANGED) {
248                         if (Development.DEVELOPMENT) {
249                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
250                                 new Exception().printStackTrace();
251                                 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
252                             }
253                         }
254                         schedule.add(listenerEntry);
255                         listenerEntry.setLastKnown(entry.getResult());
256                     }
257
258                 }
259
260                 for(ListenerEntry listenerEntry : schedule) {
261                     final CacheEntry entry = listenerEntry.entry;
262                     if (Development.DEVELOPMENT) {
263                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
264                             System.err.println("Firing " + listenerEntry.procedure);
265                         }
266                     }
267                     try {
268                         if (Development.DEVELOPMENT) {
269                             if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
270                                 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
271                             }
272                         }
273                         entry.performFromCache(graph, listenerEntry.procedure);
274                     } catch (Throwable t) {
275                         LOGGER.error("Unexpected exception ", t);
276                     }
277                 }
278
279             }
280
281         } finally {
282             firingListeners = false;
283         }
284
285     }
286
287     void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
288
289         Iterable<CacheEntry> oldParents = entry.getParents(processor);
290         for (CacheEntry parent : oldParents) {
291             if(!parent.isDiscarded())
292                 todo.push(new UpdateEntry(entry, parent, indent + 2));
293         }
294
295     }
296
297     private boolean pruneListener(ListenerEntry entry) {
298         
299         if (entry.base.isDisposed()) {
300             
301             assert (entry != null);
302             ArrayList<ListenerEntry> list = listeners.get(entry.entry);
303             if(list != null) {
304                 boolean success = list.remove(entry);
305                 assert (success);
306                 if (list.isEmpty())
307                     listeners.remove(entry.entry);
308             }
309
310             return true;
311             
312         } else {
313             
314             return false;
315             
316         }
317     }
318
319     private List<ListenerEntry> getListenerEntries(CacheEntry entry) {
320         hasListenerAfterDisposing(entry);
321         if(listeners.get(entry) != null)
322             return listeners.get(entry);
323         else 
324             return Collections.emptyList();
325     }
326
327     private static class RegisterParentRunnable implements Runnable {
328
329         private final CacheEntry parent;
330         private final CacheEntry child;
331
332         public RegisterParentRunnable(CacheEntry parent, CacheEntry child) {
333             this.parent = parent;
334             this.child = child;
335         }
336
337         @Override
338         public void run() {
339             child.addParent(parent);
340             if (Development.DEVELOPMENT) {
341                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
342                     System.out.println(child + " -> " + parent);
343                 }
344             }
345         }
346
347     }
348
349     private static class RegisterListenerRunnable implements Runnable {
350
351         private final QueryListening queryListening;
352         private final ListenerBase base;
353         private final Object procedure;
354         private final CacheEntry parent;
355         private final CacheEntry entry;
356
357         public RegisterListenerRunnable(QueryListening queryListening, ListenerBase base, Object procedure, CacheEntry parent, CacheEntry entry) {
358             this.queryListening = queryListening;
359             this.base = base;
360             this.procedure = procedure;
361             this.parent = parent;
362             this.entry = entry;
363         }
364
365         @Override
366         public void run() {
367
368             assert (entry != null);
369             assert (procedure != null);
370
371             ArrayList<ListenerEntry> list = queryListening.listeners.get(entry);
372             if (list == null) {
373                 list = new ArrayList<>(1);
374                 queryListening.listeners.put(entry, list);
375             }
376
377             ListenerEntry result = new ListenerEntry(entry, base, procedure);
378             // Equals is here based on base
379             int currentIndex = list.indexOf(result);
380             // There was already a listener
381             if(currentIndex > -1) {
382                 ListenerEntry current = list.get(currentIndex);
383                 if(!current.base.isDisposed())
384                     return;
385                 list.set(currentIndex, result);
386             } else {
387                 list.add(result);
388             }
389
390             if (Development.DEVELOPMENT) {
391                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
392                     new Exception().printStackTrace();
393                     System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
394                 }
395             }
396
397             queryListening.addedEntries.put(base, result);
398
399         }
400
401
402     }
403
404     private static class RegisterFirstKnownRunnable implements Runnable {
405
406         private final Map<ListenerBase,ListenerEntry> addedEntries;
407         private final ListenerBase base;
408         private final Object result;
409
410         public RegisterFirstKnownRunnable(Map<ListenerBase,ListenerEntry> addedEntries, ListenerBase base, Object result) {
411             this.addedEntries = addedEntries;
412             this.base = base;
413             this.result = result;
414         }
415
416         @Override
417         public void run() {
418             ListenerEntry entry = addedEntries.get(base);
419             if(entry != null) entry.setLastKnown(result);
420         }
421
422     }
423
424 }