]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryListening.java
9daab950e3964a8681a12fabc5ef0b2cfaf74ceb
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryListening.java
1 package org.simantics.db.impl.query;
2
3 import java.io.BufferedOutputStream;
4 import java.io.File;
5 import java.io.FileOutputStream;
6 import java.io.IOException;
7 import java.io.PrintStream;
8 import java.util.ArrayList;
9 import java.util.Collection;
10 import java.util.Collections;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.LinkedList;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.ArrayBlockingQueue;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.Semaphore;
20
21 import org.simantics.databoard.Bindings;
22 import org.simantics.db.DevelopmentKeys;
23 import org.simantics.db.debug.ListenerReport;
24 import org.simantics.db.exception.DatabaseException;
25 import org.simantics.db.impl.graph.ReadGraphImpl;
26 import org.simantics.db.impl.graph.WriteGraphImpl;
27 import org.simantics.db.procedure.ListenerBase;
28 import org.simantics.utils.Development;
29 import org.simantics.utils.datastructures.Pair;
30 import org.simantics.utils.datastructures.collections.CollectionUtils;
31 import org.slf4j.LoggerFactory;
32
33 import gnu.trove.map.hash.THashMap;
34 import gnu.trove.set.hash.THashSet;
35
36 public class QueryListening {
37
38     private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryListening.class);
39
40         final private QueryProcessor                            processor;
41         private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();
42         private boolean                                         firingListeners       = false;
43         final THashMap<CacheEntry, ArrayList<ListenerEntry>>    listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
44         private BlockingQueue<Runnable>                         tasks = new ArrayBlockingQueue<Runnable>(2048);
45         private Map<ListenerBase,ListenerEntry>                 addedEntries = new HashMap<>();
46         
47         QueryListening(QueryProcessor processor) {
48                 this.processor = processor;
49                 new DependencyManagementThread(processor, tasks).start();
50         }
51         
52         public void sync() {
53                 Semaphore s = new Semaphore(0);
54                 try {
55                         tasks.put(() -> {
56                                 s.release();
57                         });
58                         s.acquire();
59                 } catch (Throwable t) {
60                         LOGGER.error("Error while waiting for query dependency management", t);
61                 }
62         }
63         
64         static class DependencyManagementThread extends Thread {
65                 
66                 final private QueryProcessor processor;
67                 final BlockingQueue<Runnable> tasks;
68                 
69                 DependencyManagementThread(QueryProcessor processor, BlockingQueue<Runnable> tasks) {
70                         setName("Query Dependency Manager");
71                         this.processor = processor;
72                         this.tasks = tasks;
73                 }
74                 
75                 @Override
76                 public void run() {
77                         while(processor.isAlive()) {
78                                 try {
79                                         Runnable r = tasks.take();
80                                         r.run();
81                                 } catch (Throwable t) {
82                                         // Spurious problems?
83                                         LOGGER.error("Error while waiting for query dependency management tasks", t);
84                                 }
85                         }
86                 }
87                 
88         }
89         
90         public boolean hasScheduledUpdates() {
91                 return !scheduledListeners.isEmpty();
92         }
93
94         void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
95                 
96                 try {
97                         tasks.put(() -> {
98
99                                 if (parent != null && !inferred) {
100                                         try {
101                                                 if(!child.isImmutable(graph))
102                                                         child.addParent(parent);
103                                         } catch (DatabaseException e) {
104                                                 LOGGER.error("Error while registering query dependencies", e);
105                                         }
106                                         if (Development.DEVELOPMENT) {
107                                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
108                                                         System.out.println(child + " -> " + parent);
109                                                 }
110                                         }
111                                 }
112
113                                 if (listener != null)
114                                         registerListener(child, listener, procedure);
115
116                         });
117                 } catch (InterruptedException e) {
118                         LOGGER.error("Error while registering dependencies", e);
119                 }
120                 
121         }
122
123         void registerFirstKnown(ListenerBase base, Object result) {
124                 
125                 tasks.offer(() -> {
126
127                         ListenerEntry entry = addedEntries.get(base);
128                         if(entry != null) entry.setLastKnown(result);
129                         
130                 });
131                 
132         }
133
134         public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
135
136                 assert (entry != null);
137
138                 if (base.isDisposed())
139                         return null;
140
141                 return addListener(entry, base, procedure);
142
143         }
144
145         /*
146          * Registers a listener and returns an entry iff the entry was added
147          */
148         private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
149
150                 assert (entry != null);
151                 assert (procedure != null);
152
153                 ArrayList<ListenerEntry> list = listeners.get(entry);
154                 if (list == null) {
155                         list = new ArrayList<>(1);
156                         listeners.put(entry, list);
157                 }
158
159                 ListenerEntry result = new ListenerEntry(entry, base, procedure);
160                 // Equals is here based on base
161                 int currentIndex = list.indexOf(result);
162                 // There was already a listener
163                 if(currentIndex > -1) {
164                         ListenerEntry current = list.get(currentIndex);
165                         if(!current.base.isDisposed()) return null;
166                         list.set(currentIndex, result);
167                 } else {
168                         list.add(result);
169                 }
170
171                 if (Development.DEVELOPMENT) {
172                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
173                                 new Exception().printStackTrace();
174                                 System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
175                         }
176                 }
177                 
178                 addedEntries.put(base, result);
179                 
180                 return result;
181
182         }
183
184         void scheduleListener(ListenerEntry entry) {
185                 assert (entry != null);
186                 if (Development.DEVELOPMENT) {
187                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
188                                 System.err.println("Scheduled " + entry.procedure);
189                         }
190                 }
191                 scheduledListeners.add(entry);
192         }
193
194         private void removeListener(ListenerEntry entry) {
195                 assert (entry != null);
196                 ArrayList<ListenerEntry> list = listeners.get(entry.entry);
197                 if(list == null) return;
198                 boolean success = list.remove(entry);
199                 assert (success);
200                 if (list.isEmpty())
201                         listeners.remove(entry.entry);
202         }
203
204         boolean hasListener(CacheEntry entry) {
205                 if(listeners.get(entry) != null) return true;
206                 return false;
207         }
208
209         boolean hasListenerAfterDisposing(CacheEntry entry) {
210                 if(listeners.get(entry) != null) {
211                         ArrayList<ListenerEntry> entries = listeners.get(entry);
212                         ArrayList<ListenerEntry> list = null;
213                         for (ListenerEntry e : entries) {
214                                 if (e.base.isDisposed()) {
215                                         if(list == null) list = new ArrayList<ListenerEntry>();
216                                         list.add(e);
217                                 }
218                         }
219                         if(list != null) {
220                                 for (ListenerEntry e : list) {
221                                         entries.remove(e);
222                                 }
223                         }
224                         if (entries.isEmpty()) {
225                                 listeners.remove(entry);
226                                 return false;
227                         }
228                         return true;
229                 }
230                 return false;
231         }
232
233         List<ListenerEntry> getListenerEntries(CacheEntry entry) {
234                 hasListenerAfterDisposing(entry);
235                 if(listeners.get(entry) != null)
236                         return listeners.get(entry);
237                 else 
238                         return Collections.emptyList();
239         }
240
241         void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
242
243                 if(!workarea.containsKey(entry)) {
244
245                         HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
246                         for(ListenerEntry e : getListenerEntries(entry))
247                                 ls.add(e.base);
248
249                         workarea.put(entry, ls);
250
251                         for(CacheEntry parent : entry.getParents(processor)) {
252                                 processListenerReport(parent, workarea);
253                                 ls.addAll(workarea.get(parent));
254                         }
255
256                 }
257
258         }
259
260         public synchronized ListenerReport getListenerReport() throws IOException {
261
262                 class ListenerReportImpl implements ListenerReport {
263
264                         Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
265
266                         @Override
267                         public void print(PrintStream b) {
268                                 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
269                                 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
270                                         for(ListenerBase l : e.getValue()) {
271                                                 Integer i = hist.get(l);
272                                                 hist.put(l, i != null ? i-1 : -1);
273                                         }
274                                 }
275
276                                 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
277                                         b.print("" + -p.second + " " + p.first + "\n");
278                                 }
279
280                                 b.flush();
281                         }
282
283                 }
284
285                 ListenerReportImpl result = new ListenerReportImpl();
286
287                 Collection<CacheEntryBase> all = processor.allCaches(new CacheCollectionResult()).toCollection();
288                 for(CacheEntryBase entry : all) {
289                         hasListenerAfterDisposing(entry);
290                 }
291                 for(CacheEntryBase entry : all) {
292                         processListenerReport(entry, result.workarea);
293                 }
294
295                 return result;
296
297         }
298
299         public synchronized String reportListeners(File file) throws IOException {
300
301                 if (!processor.isAlive())
302                         return "Disposed!";
303
304                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
305                 ListenerReport report = getListenerReport();
306                 report.print(b);
307
308                 return "Done reporting listeners.";
309
310         }
311
312         public void fireListeners(WriteGraphImpl graph) {
313
314                 assert (!processor.updating);
315                 assert (!processor.cache.collecting);
316                 assert (!firingListeners);
317
318                 firingListeners = true;
319
320                 try {
321
322                         // Performing may cause further events to be scheduled.
323                         while (!scheduledListeners.isEmpty()) {
324
325                                 // Clone current events to make new entries possible during
326                                 // firing.
327                                 THashSet<ListenerEntry> entries = scheduledListeners;
328                                 scheduledListeners = new THashSet<ListenerEntry>();
329
330                                 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
331
332                                 for (ListenerEntry listenerEntry : entries) {
333
334                                         if (pruneListener(listenerEntry)) {
335                                                 if (Development.DEVELOPMENT) {
336                                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
337                                                                 new Exception().printStackTrace();
338                                                                 System.err.println("Pruned " + listenerEntry.procedure);
339                                                         }
340                                                 }
341                                                 continue;
342                                         }
343
344                                         final CacheEntry entry = listenerEntry.entry;
345                                         assert (entry != null);
346
347                                         Object newValue = processor.compareTo(graph, entry, listenerEntry.getLastKnown());
348
349                                         if (newValue != ListenerEntry.NOT_CHANGED) {
350                                                 if (Development.DEVELOPMENT) {
351                                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
352                                                                 new Exception().printStackTrace();
353                                                                 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
354                                                         }
355                                                 }
356                                                 schedule.add(listenerEntry);
357                                                 listenerEntry.setLastKnown(entry.getResult());
358                                         }
359
360                                 }
361
362                                 for(ListenerEntry listenerEntry : schedule) {
363                                         final CacheEntry entry = listenerEntry.entry;
364                                         if (Development.DEVELOPMENT) {
365                                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
366                                                         System.err.println("Firing " + listenerEntry.procedure);
367                                                 }
368                                         }
369                                         try {
370                                                 if (Development.DEVELOPMENT) {
371                                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
372                                                                 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
373                                                         }
374                                                 }
375                                                 entry.performFromCache(graph, listenerEntry.procedure);
376                                         } catch (Throwable t) {
377                                                 t.printStackTrace();
378                                         }
379                                 }
380
381                         }
382
383                 } finally {
384                         firingListeners = false;
385                 }
386
387         }
388         
389         void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
390
391                 Iterable<CacheEntry> oldParents = entry.getParents(processor);
392                 for (CacheEntry parent : oldParents) {
393                         if(!parent.isDiscarded())
394                                 todo.push(new UpdateEntry(entry, parent, indent + 2));
395                 }
396
397         }
398
399         private boolean pruneListener(ListenerEntry entry) {
400                 if (entry.base.isDisposed()) {
401                         removeListener(entry);
402                         return true;
403                 } else {
404                         return false;
405                 }
406         }
407         
408 }