]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
Separate query dependency management from QueryProcessor
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryListening.java
1 package org.simantics.db.impl.query;
2
3 import java.io.BufferedOutputStream;
4 import java.io.File;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.Semaphore;
20
21 import org.simantics.databoard.Bindings;
22 import org.simantics.db.DevelopmentKeys;
23 import org.simantics.db.debug.ListenerReport;
24 import org.simantics.db.exception.DatabaseException;
25 import org.simantics.db.impl.graph.ReadGraphImpl;
26 import org.simantics.db.impl.graph.WriteGraphImpl;
27 import org.simantics.db.procedure.ListenerBase;
28 import org.simantics.utils.Development;
29 import org.simantics.utils.datastructures.Pair;
30 import org.simantics.utils.datastructures.collections.CollectionUtils;
31 import org.slf4j.LoggerFactory;
32
33 import gnu.trove.map.hash.THashMap;
34 import gnu.trove.set.hash.THashSet;
35
36 public class QueryListening {
37
38     private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
39
40         final private QueryProcessor                            processor;
41         private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();
42         private boolean                                         firingListeners       = false;
43         final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
44         private BlockingQueue<Runnable>                         tasks = new LinkedBlockingQueue<Runnable>();
45         private Map<ListenerBase,ListenerEntry>                 addedEntries = new HashMap<>();
46         
47         QueryListening(QueryProcessor processor) {
48                 this.processor = processor;
49                 new DependencyManagementThread(processor, tasks).start();
50         }
51         
52         public void sync() {
53                 Semaphore s = new Semaphore(0);
54                 tasks.add(() -> {
55                         s.release();
56                 });
57                 try {
58                         s.acquire();
59                 } catch (Throwable t) {
60                         LOGGER.error("Error while waiting for query dependency management", t);
61                 }
62         }
63         
64         static class DependencyManagementThread extends Thread {
65                 
66                 final private QueryProcessor processor;
67                 final BlockingQueue<Runnable> tasks;
68                 
69                 DependencyManagementThread(QueryProcessor processor, BlockingQueue<Runnable> tasks) {
70                         setName("Query Dependency Manager");
71                         this.processor = processor;
72                         this.tasks = tasks;
73                 }
74                 
75                 @Override
76                 public void run() {
77                         while(processor.isAlive()) {
78                                 try {
79                                         Runnable r = tasks.take();
80                                         r.run();
81                                 } catch (Throwable t) {
82                                         // Spurious problems?
83                                         LOGGER.error("Error while waiting for query dependency management tasks", t);
84                                 }
85                         }
86                 }
87                 
88         }
89         
90         public boolean hasScheduledUpdates() {
91                 return !scheduledListeners.isEmpty();
92         }
93
94         void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
95                 
96                 tasks.offer(() -> {
97
98                         if (parent != null && !inferred) {
99                                 try {
100                                         if(!child.isImmutable(graph))
101                                                 child.addParent(parent);
102                                 } catch (DatabaseException e) {
103                                         LOGGER.error("Error while registering query dependencies", e);
104                                 }
105                                 if (Development.DEVELOPMENT) {
106                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
107                                                 System.out.println(child + " -> " + parent);
108                                         }
109                                 }
110                         }
111
112                         if (listener != null)
113                                 registerListener(child, listener, procedure);
114
115                 });
116                 
117         }
118
119         void registerFirstKnown(ListenerBase base, Object result) {
120                 
121                 tasks.offer(() -> {
122
123                         ListenerEntry entry = addedEntries.get(base);
124                         if(entry != null) entry.setLastKnown(result);
125                         
126                 });
127                 
128         }
129
130         public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
131
132                 assert (entry != null);
133
134                 if (base.isDisposed())
135                         return null;
136
137                 return addListener(entry, base, procedure);
138
139         }
140
141         /*
142          * Registers a listener and returns an entry iff the entry was added
143          */
144         private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
145
146                 assert (entry != null);
147                 assert (procedure != null);
148
149                 ArrayList<ListenerEntry> list = listeners.get(entry);
150                 if (list == null) {
151                         list = new ArrayList<>(1);
152                         listeners.put(entry, list);
153                 }
154
155                 ListenerEntry result = new ListenerEntry(entry, base, procedure);
156                 // Equals is here based on base
157                 int currentIndex = list.indexOf(result);
158                 // There was already a listener
159                 if(currentIndex > -1) {
160                         ListenerEntry current = list.get(currentIndex);
161                         if(!current.base.isDisposed()) return null;
162                         list.set(currentIndex, result);
163                 } else {
164                         list.add(result);
165                 }
166
167                 if (Development.DEVELOPMENT) {
168                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
169                                 new Exception().printStackTrace();
170                                 System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
171                         }
172                 }
173                 
174                 addedEntries.put(base, result);
175                 
176                 return result;
177
178         }
179
180         void scheduleListener(ListenerEntry entry) {
181                 assert (entry != null);
182                 if (Development.DEVELOPMENT) {
183                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
184                                 System.err.println("Scheduled " + entry.procedure);
185                         }
186                 }
187                 scheduledListeners.add(entry);
188         }
189
190         private void removeListener(ListenerEntry entry) {
191                 assert (entry != null);
192                 ArrayList<ListenerEntry> list = listeners.get(entry.entry);
193                 if(list == null) return;
194                 boolean success = list.remove(entry);
195                 assert (success);
196                 if (list.isEmpty())
197                         listeners.remove(entry.entry);
198         }
199
200         boolean hasListener(CacheEntry entry) {
201                 if(listeners.get(entry) != null) return true;
202                 return false;
203         }
204
205         boolean hasListenerAfterDisposing(CacheEntry entry) {
206                 if(listeners.get(entry) != null) {
207                         ArrayList<ListenerEntry> entries = listeners.get(entry);
208                         ArrayList<ListenerEntry> list = null;
209                         for (ListenerEntry e : entries) {
210                                 if (e.base.isDisposed()) {
211                                         if(list == null) list = new ArrayList<ListenerEntry>();
212                                         list.add(e);
213                                 }
214                         }
215                         if(list != null) {
216                                 for (ListenerEntry e : list) {
217                                         entries.remove(e);
218                                 }
219                         }
220                         if (entries.isEmpty()) {
221                                 listeners.remove(entry);
222                                 return false;
223                         }
224                         return true;
225                 }
226                 return false;
227         }
228
229         List<ListenerEntry> getListenerEntries(CacheEntry entry) {
230                 hasListenerAfterDisposing(entry);
231                 if(listeners.get(entry) != null)
232                         return listeners.get(entry);
233                 else 
234                         return Collections.emptyList();
235         }
236
237         void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
238
239                 if(!workarea.containsKey(entry)) {
240
241                         HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
242                         for(ListenerEntry e : getListenerEntries(entry))
243                                 ls.add(e.base);
244
245                         workarea.put(entry, ls);
246
247                         for(CacheEntry parent : entry.getParents(processor)) {
248                                 processListenerReport(parent, workarea);
249                                 ls.addAll(workarea.get(parent));
250                         }
251
252                 }
253
254         }
255
256         public synchronized ListenerReport getListenerReport() throws IOException {
257
258                 class ListenerReportImpl implements ListenerReport {
259
260                         Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
261
262                         @Override
263                         public void print(PrintStream b) {
264                                 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
265                                 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
266                                         for(ListenerBase l : e.getValue()) {
267                                                 Integer i = hist.get(l);
268                                                 hist.put(l, i != null ? i-1 : -1);
269                                         }
270                                 }
271
272                                 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
273                                         b.print("" + -p.second + " " + p.first + "\n");
274                                 }
275
276                                 b.flush();
277                         }
278
279                 }
280
281                 ListenerReportImpl result = new ListenerReportImpl();
282
283                 Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
284                 for(CacheEntryBase entry : all) {
285                         hasListenerAfterDisposing(entry);
286                 }
287                 for(CacheEntryBase entry : all) {
288                         processListenerReport(entry, result.workarea);
289                 }
290
291                 return result;
292
293         }
294
295         public synchronized String reportListeners(File file) throws IOException {
296
297                 if (!processor.isAlive())
298                         return "Disposed!";
299
300                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
301                 ListenerReport report = getListenerReport();
302                 report.print(b);
303
304                 return "Done reporting listeners.";
305
306         }
307
308         public void fireListeners(WriteGraphImpl graph) {
309
310                 assert (!processor.updating);
311                 assert (!processor.cache.collecting);
312                 assert (!firingListeners);
313
314                 firingListeners = true;
315
316                 try {
317
318                         // Performing may cause further events to be scheduled.
319                         while (!scheduledListeners.isEmpty()) {
320
321                                 // Clone current events to make new entries possible during
322                                 // firing.
323                                 THashSet<ListenerEntry> entries = scheduledListeners;
324                                 scheduledListeners = new THashSet<ListenerEntry>();
325
326                                 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
327
328                                 for (ListenerEntry listenerEntry : entries) {
329
330                                         if (pruneListener(listenerEntry)) {
331                                                 if (Development.DEVELOPMENT) {
332                                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
333                                                                 new Exception().printStackTrace();
334                                                                 System.err.println("Pruned " + listenerEntry.procedure);
335                                                         }
336                                                 }
337                                                 continue;
338                                         }
339
340                                         final CacheEntry entry = listenerEntry.entry;
341                                         assert (entry != null);
342
343                                         Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
344
345                                         if (newValue != ListenerEntry.NOT_CHANGED) {
346                                                 if (Development.DEVELOPMENT) {
347                                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
348                                                                 new Exception().printStackTrace();
349                                                                 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
350                                                         }
351                                                 }
352                                                 schedule.add(listenerEntry);
353                                                 listenerEntry.setLastKnown(entry.getResult());
354                                         }
355
356                                 }
357
358                                 for(ListenerEntry listenerEntry : schedule) {
359                                         final CacheEntry entry = listenerEntry.entry;
360                                         if (Development.DEVELOPMENT) {
361                                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
362                                                         System.err.println("Firing " + listenerEntry.procedure);
363                                                 }
364                                         }
365                                         try {
366                                                 if (Development.DEVELOPMENT) {
367                                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
368                                                                 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
369                                                         }
370                                                 }
371                                                 entry.performFromCache(graph, listenerEntry.procedure);
372                                         } catch (Throwable t) {
373                                                 t.printStackTrace();
374                                         }
375                                 }
376
377                         }
378
379                 } finally {
380                         firingListeners = false;
381                 }
382
383         }
384         
385         void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
386
387                 Iterable<CacheEntry> oldParents = entry.getParents(processor);
388                 for (CacheEntry parent : oldParents) {
389                         if(!parent.isDiscarded())
390                                 todo.push(new UpdateEntry(entry, parent, indent + 2));
391                 }
392
393         }
394
395         private boolean pruneListener(ListenerEntry entry) {
396                 if (entry.base.isDisposed()) {
397                         removeListener(entry);
398                         return true;
399                 } else {
400                         return false;
401                 }
402         }
403         
404 }