DB request scheduling scheme fails with district diagrams
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.query;
13
14 import java.io.BufferedOutputStream;
15 import java.io.File;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.simantics.databoard.Bindings;
36 import org.simantics.db.AsyncReadGraph;
37 import org.simantics.db.DevelopmentKeys;
38 import org.simantics.db.DirectStatements;
39 import org.simantics.db.ReadGraph;
40 import org.simantics.db.RelationInfo;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
46 import org.simantics.db.common.utils.Logger;
47 import org.simantics.db.exception.DatabaseException;
48 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
49 import org.simantics.db.exception.NoInverseException;
50 import org.simantics.db.exception.ResourceNotFoundException;
51 import org.simantics.db.impl.ResourceImpl;
52 import org.simantics.db.impl.graph.BarrierTracing;
53 import org.simantics.db.impl.graph.ReadGraphImpl;
54 import org.simantics.db.impl.graph.ReadGraphSupport;
55 import org.simantics.db.impl.procedure.IntProcedureAdapter;
56 import org.simantics.db.impl.procedure.InternalProcedure;
57 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
58 import org.simantics.db.impl.support.ResourceSupport;
59 import org.simantics.db.procedure.AsyncMultiListener;
60 import org.simantics.db.procedure.AsyncMultiProcedure;
61 import org.simantics.db.procedure.AsyncProcedure;
62 import org.simantics.db.procedure.AsyncSetListener;
63 import org.simantics.db.procedure.ListenerBase;
64 import org.simantics.db.procedure.MultiProcedure;
65 import org.simantics.db.procedure.StatementProcedure;
66 import org.simantics.db.procedure.SyncMultiProcedure;
67 import org.simantics.db.request.AsyncMultiRead;
68 import org.simantics.db.request.ExternalRead;
69 import org.simantics.db.request.MultiRead;
70 import org.simantics.db.request.RequestFlags;
71 import org.simantics.layer0.Layer0;
72 import org.simantics.utils.DataContainer;
73 import org.simantics.utils.Development;
74 import org.simantics.utils.datastructures.Pair;
75 import org.simantics.utils.datastructures.collections.CollectionUtils;
76 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
77
78 import gnu.trove.procedure.TIntProcedure;
79 import gnu.trove.procedure.TLongProcedure;
80 import gnu.trove.procedure.TObjectProcedure;
81 import gnu.trove.set.hash.THashSet;
82 import gnu.trove.set.hash.TIntHashSet;
83
84 @SuppressWarnings({"rawtypes", "unchecked"})
85 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
86
87         public static int                                       indent                = 0;
88
89         
90         // Garbage collection
91         
92         public int                                              boundQueries          = 0;
93
94
95         final private int                                       functionalRelation;
96
97         final private int                                       superrelationOf;
98
99         final private int                                       instanceOf;
100
101         final private int                                       inverseOf;
102
103         final private int                                       asserts;
104
105         final private int                                       hasPredicate;
106
107         final private int                                       hasPredicateInverse;
108
109         final private int                                       hasObject;
110
111         final private int                                       inherits;
112
113         final private int                                       subrelationOf;
114
115         final private int                                       rootLibrary;
116
117         /**
118          * A cache for the root library resource. Initialized in
119          * {@link #getRootLibraryResource()}.
120          */
121         private volatile ResourceImpl                           rootLibraryResource;
122
123         final private int                                       library;
124
125         final private int                                       consistsOf;
126
127         final private int                                       hasName;
128
129         AtomicInteger                                       sleepers = new AtomicInteger(0);
130
131         boolean                                         updating              = false;
132
133
134         final public QueryCache                                 cache;
135         final public QuerySupport                               querySupport;
136         final public Session                                    session;
137         final public ResourceSupport                            resourceSupport;
138         
139         final public Semaphore                                  requests = new Semaphore(1);
140         
141         final public QueryListening                            listening = new QueryListening(this);
142
143         QueryThread[]                                   executors;
144
145         enum ThreadState {
146
147                 INIT, RUN, SLEEP, DISPOSED
148
149         }
150
151         final Scheduling scheduling;
152         
153         public ThreadState[]                                                                    threadStates;
154         
155         final Object querySupportLock;
156         
157         public Long modificationCounter = 0L;
158
159         public void close() {
160         }
161
162
163     /*
164      * We are running errands while waiting for requests to complete.
165      * We can only run work that is part of the current root request to avoid any deadlocks
166      */
167     public boolean performPending(ReadGraphImpl under) {
168         SessionTask task = scheduling.getSubTask(under);
169                 if(task != null) {
170                         task.run(thread.get());
171                         return true;
172                 }
173                 return false;
174         }
175     
176     final public void scheduleNow(SessionTask request) {
177         SessionTask toExecute = scheduleOrReturnForExecution(request);
178         if(toExecute != null)
179             toExecute.run(thread.get());
180     }
181
182     final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
183         
184         return scheduling.scheduleOrReturnForExecution(request);
185
186     }
187
188
189         final int THREADS;
190         final public int  THREAD_MASK;
191
192         final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
193
194         public static abstract class SessionTask {
195
196             final protected ReadGraphImpl rootGraph;
197                 private int counter = 0;
198                 protected int position = 1;
199                 private Exception trace;
200
201                 public SessionTask() {
202                     this(null);
203                 }
204                 
205         public SessionTask(ReadGraphImpl rootGraph) {
206             this.rootGraph = rootGraph;
207         }
208         
209         public boolean isSubtask(ReadGraphImpl graph) {
210             return graph.isParent(rootGraph);
211         }
212
213         public abstract void run0(int thread);
214
215                 public final void run(int thread) {
216                     if(counter++ > 0) {
217                         if(BarrierTracing.BOOKKEEPING) {
218                             trace.printStackTrace();
219                             new Exception().printStackTrace();
220                         }
221                         throw new IllegalStateException("Multiple invocations of SessionTask!");
222                     }
223                     if(BarrierTracing.BOOKKEEPING) {
224                         trace = new Exception();
225                     }
226                     run0(thread);
227                 }
228                 
229                 public boolean maybeReady() {
230                         return true;
231                 }
232
233                 @Override
234                 public String toString() {
235                         if(rootGraph == null)
236                                 return "SessionTask[no graph]";
237                         else
238                                 return "SessionTask[" + rootGraph.parent + "]";
239                 }
240
241         }
242
243         public static abstract class SessionRead extends SessionTask {
244
245                 final public Semaphore notify;
246                 final public DataContainer<Throwable> throwable; 
247
248                 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
249                         super(null);
250                         this.throwable = throwable;
251                         this.notify = notify;
252                 }
253
254         }
255
256         public boolean resume(ReadGraphImpl graph) {
257                 return executors[0].runSynchronized();
258         }
259
260         public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
261                         throws DatabaseException {
262
263                 THREADS = threads;
264                 THREAD_MASK = threads - 1;
265
266                 scheduling = new Scheduling(requests);
267                 
268                 querySupport = core;
269                 cache = new QueryCache(core, threads);
270                 session = querySupport.getSession();
271                 resourceSupport = querySupport.getSupport();
272                 querySupportLock = core.getLock();
273
274                 executors = new QueryThread[THREADS];
275                 threadStates = new ThreadState[THREADS];
276
277                 for (int i = 0; i < THREADS; i++) {
278                         threadStates[i] = ThreadState.INIT;
279                 }
280
281                 for (int i = 0; i < THREADS; i++) {
282
283                         final int index = i;
284
285                         executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
286
287                         threadSet.add(executors[i]);
288
289                 }
290
291                 // Now start threads
292                 for (int i = 0; i < THREADS; i++) {
293                         executors[i].start();
294                 }
295
296                 // Make sure that query threads are up and running
297                 while(sleepers.get() != THREADS) {
298                         try {
299                                 Thread.sleep(5);
300                         } catch (InterruptedException e) {
301                                 e.printStackTrace();
302                         }
303                 }
304
305                 rootLibrary = core.getBuiltin("http:/");
306                 boolean builtinsInstalled = rootLibrary != 0;
307
308                 if (builtinsInstalled) {
309                         functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
310                         assert (functionalRelation != 0);
311                 } else
312                         functionalRelation = 0;
313
314                 if (builtinsInstalled) {
315                         instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
316                         assert (instanceOf != 0);
317                 } else
318                         instanceOf = 0;
319
320                 if (builtinsInstalled) {
321                         inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
322                         assert (inverseOf != 0);
323                 } else
324                         inverseOf = 0;
325
326
327                 if (builtinsInstalled) {
328                         inherits = core.getBuiltin(Layer0.URIs.Inherits);
329                         assert (inherits != 0);
330                 } else
331                         inherits = 0;
332
333                 if (builtinsInstalled) {
334                         asserts = core.getBuiltin(Layer0.URIs.Asserts);
335                         assert (asserts != 0);
336                 } else
337                         asserts = 0;
338
339                 if (builtinsInstalled) {
340                         hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
341                         assert (hasPredicate != 0);
342                 } else
343                         hasPredicate = 0;
344
345                 if (builtinsInstalled) {
346                         hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
347                         assert (hasPredicateInverse != 0);
348                 } else
349                         hasPredicateInverse = 0;
350
351                 if (builtinsInstalled) {
352                         hasObject = core.getBuiltin(Layer0.URIs.HasObject);
353                         assert (hasObject != 0);
354                 } else
355                         hasObject = 0;
356
357                 if (builtinsInstalled) {
358                         subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
359                         assert (subrelationOf != 0);
360                 } else
361                         subrelationOf = 0;
362
363                 if (builtinsInstalled) {
364                         superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
365                         assert (superrelationOf != 0);
366                 } else
367                         superrelationOf = 0;
368
369                 if (builtinsInstalled) {
370                         library = core.getBuiltin(Layer0.URIs.Library);
371                         assert (library != 0);
372                 } else
373                         library = 0;
374
375                 if (builtinsInstalled) {
376                         consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
377                         assert (consistsOf != 0);
378                 } else
379                         consistsOf = 0;
380
381                 if (builtinsInstalled) {
382                         hasName = core.getBuiltin(Layer0.URIs.HasName);
383                         assert (hasName != 0);
384                 } else
385                         hasName = 0;
386
387         }
388
389         final public void releaseWrite(ReadGraphImpl graph) {
390                 propagateChangesInQueryCache(graph);
391                 modificationCounter++;
392         }
393
394         final public int getId(final Resource r) {
395                 return querySupport.getId(r);
396         }
397
398         public QuerySupport getCore() {
399                 return querySupport;
400         }
401
402         public int getFunctionalRelation() {
403                 return functionalRelation;
404         }
405
406         public int getInherits() {
407                 return inherits;
408         }
409
410         public int getInstanceOf() {
411                 return instanceOf;
412         }
413
414         public int getInverseOf() {
415                 return inverseOf;
416         }
417
418         public int getSubrelationOf() {
419                 return subrelationOf;
420         }
421
422         public int getSuperrelationOf() {
423                 return superrelationOf;
424         }
425
426         public int getAsserts() {
427                 return asserts;
428         }
429
430         public int getHasPredicate() {
431                 return hasPredicate;
432         }
433
434         public int getHasPredicateInverse() {
435                 return hasPredicateInverse;
436         }
437
438         public int getHasObject() {
439                 return hasObject;
440         }
441
442         public int getRootLibrary() {
443                 return rootLibrary;
444         }
445
446         public Resource getRootLibraryResource() {
447                 if (rootLibraryResource == null) {
448                         // Synchronization is not needed here, it doesn't matter if multiple
449                         // threads simultaneously set rootLibraryResource once.
450                         int root = getRootLibrary();
451                         if (root == 0)
452                                 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
453                         this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
454                 }
455                 return rootLibraryResource;
456         }
457
458         public int getLibrary() {
459                 return library;
460         }
461
462         public int getConsistsOf() {
463                 return consistsOf;
464         }
465
466         public int getHasName() {
467                 return hasName;
468         }
469
470         public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
471
472                 try {
473                         
474                         QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
475
476                                 @Override
477                                 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
478
479                                         if (result != null && result != 0) {
480                                                 procedure.execute(graph, result);
481                                                 return;
482                                         }
483
484                                         // Fall back to using the fixed builtins.
485 //                                      result = querySupport.getBuiltin(id);
486 //                                      if (result != 0) {
487 //                                              procedure.execute(graph, result);
488 //                                              return;
489 //                                      } 
490
491 //                                      try {
492 //                                              result = querySupport.getRandomAccessReference(id);
493 //                                      } catch (ResourceNotFoundException e) {
494 //                                              procedure.exception(graph, e);
495 //                                              return;
496 //                                      }
497
498                                         if (result != 0) {
499                                                 procedure.execute(graph, result);
500                                         } else {
501                                                 procedure.exception(graph, new ResourceNotFoundException(id));
502                                         }
503
504                                 }
505
506                                 @Override
507                                 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
508                                         procedure.exception(graph, t);
509                                 }
510
511                         });
512                 } catch (DatabaseException e) {
513                     
514                     try {
515                         
516                 procedure.exception(graph, e);
517                 
518             } catch (DatabaseException e1) {
519                 
520                 Logger.defaultLogError(e1);
521                 
522             }
523                     
524                 }
525
526         }
527
528         public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
529
530                 Integer result = querySupport.getBuiltin(id);
531                 if (result != 0) {
532                         procedure.execute(graph, result);
533                 } else {
534                         procedure.exception(graph, new ResourceNotFoundException(id));
535                 }
536
537         }
538
539         final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
540
541                 try {
542                         QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
543                 } catch (DatabaseException e) {
544                         throw new IllegalStateException(e);
545                 }
546
547         }
548
549         public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
550
551                 
552                 try {
553                         QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
554                 } catch (DatabaseException e) {
555                         throw new IllegalStateException(e);
556                 }
557
558         }
559
560         final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
561                 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
562         }
563
564 //    @Override
565 //      public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
566 //      
567 //      return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
568 //
569 //      }
570
571         public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
572
573                 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
574
575         }
576
577         public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
578
579                 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
580
581         }
582
583         boolean isBound(ExternalReadEntry<?> entry) {
584                 if(entry.hasParents()) return true;
585                 else if(listening.hasListener(entry)) return true;
586                 else return false;
587         }
588
589         static class Dummy implements InternalProcedure<Object>, IntProcedure {
590
591                 @Override
592                 public void execute(ReadGraphImpl graph, int i) {
593                 }
594
595                 @Override
596                 public void finished(ReadGraphImpl graph) {
597                 }
598
599                 @Override
600                 public void execute(ReadGraphImpl graph, Object result) {
601                 }
602
603                 @Override
604                 public void exception(ReadGraphImpl graph, Throwable throwable) {
605                 }
606                 
607         }
608         
609         private static final Dummy dummy = new Dummy();
610
611         /*
612     public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
613
614         if (DebugPolicy.PERFORM)
615             System.out.println("PE[ " + (query.hashCode() &  THREAD_MASK) + "] " + query);
616
617         assert (!dirty);
618         assert (!collecting);
619
620         assert(query.assertNotDiscarded());
621
622         registerDependencies(graph, query, parent, listener, procedure, false);
623
624         // FRESH, REFUTED, EXCEPTED go here 
625         if (!query.isReady()) {
626
627             size++;
628             misses++;
629
630             query.computeForEach(graph, this, (Procedure)dummy, true);
631             return query.get(graph, this, null);
632
633         } else {
634
635             hits++;
636
637             return query.get(graph, this, procedure);
638
639         }
640
641     }
642         */
643         
644
645         interface QueryCollectorSupport {
646                 public CacheCollectionResult allCaches();
647                 public Collection<CacheEntry> getRootList();
648                 public int getCurrentSize();
649                 public int calculateCurrentSize();
650                 public CacheEntryBase iterate(int level);
651                 public void remove();
652                 public void setLevel(CacheEntryBase entry, int level);
653                 public boolean start(boolean flush);
654         }
655
656         interface QueryCollector {
657
658                 public void collect(int youngTarget, int allowedTimeInMs);
659
660         }
661
662         class QueryCollectorSupportImpl implements QueryCollectorSupport {
663
664                 private static final boolean DEBUG = false;
665                 private static final double ITERATION_RATIO = 0.2;
666                 
667                 private CacheCollectionResult iteration = new CacheCollectionResult();
668                 private boolean fresh = true;
669                 private boolean needDataInStart = true;
670                 
671                 QueryCollectorSupportImpl() {
672                         iteration.restart();
673                 }
674
675                 public CacheCollectionResult allCaches() {
676                         CacheCollectionResult result = new CacheCollectionResult();
677                         QueryProcessor.this.allCaches(result);
678                         result.restart();
679                         return result;
680                 }
681                 
682                 public boolean start(boolean flush) {
683                         // We need new data from query maps
684                         fresh = true;
685                         if(needDataInStart || flush) {
686                                 // Last run ended after processing all queries => refresh data
687                                 restart(flush ? 0.0 : ITERATION_RATIO);
688                         } else {
689                                 // continue with previous big data
690                         }
691                         // Notify caller about iteration situation
692                         return iteration.isAtStart();
693                 }
694
695                 private void restart(double targetRatio) {
696                         
697                         needDataInStart = true;
698
699                         long start = System.nanoTime();
700                         if(fresh) {
701                                 
702                                 // We need new data from query maps
703                                 
704                                 int iterationSize = iteration.size()+1;
705                                 int diff = calculateCurrentSize()-iterationSize;
706                                 
707                                 double ratio = (double)diff / (double)iterationSize;
708                                 boolean dirty = Math.abs(ratio) >= targetRatio;
709                                 
710                                 if(dirty) {
711                                         iteration = allCaches();
712                                         if(DEBUG) {
713                                                 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
714                                                 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
715                                                         System.err.print(" " + iteration.levels[i].size());
716                                                 System.err.println("");
717                                         }
718                                 } else {
719                                         iteration.restart();
720                                 }
721                                 
722                                 fresh = false;
723                                 needDataInStart = false;
724                         } else {
725                                 // We are returning here within the same GC round - reuse the cache table
726                                 iteration.restart();
727                         }
728                         
729                         return;
730                         
731                 }
732                 
733                 @Override
734                 public CacheEntryBase iterate(int level) {
735                         
736                         CacheEntryBase entry = iteration.next(level);
737                         if(entry == null) {
738                                 restart(ITERATION_RATIO);
739                                 return null;
740                         }
741                         
742                         while(entry != null && entry.isDiscarded()) {
743                                 entry = iteration.next(level);
744                         }
745                         
746                         return entry;
747                         
748                 }
749                 
750                 @Override
751                 public void remove() {
752                         iteration.remove();
753                 }
754                 
755                 @Override
756                 public void setLevel(CacheEntryBase entry, int level) {
757                         iteration.setLevel(entry, level);
758                 }
759
760                 public Collection<CacheEntry> getRootList() {
761                         return cache.getRootList();
762                 }
763
764                 @Override
765                 public int calculateCurrentSize() {
766                         return cache.calculateCurrentSize();
767                 }
768
769                 @Override
770                 public int getCurrentSize() {
771                         return cache.size;
772                 }
773
774         }
775         //    final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
776
777         private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
778         private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
779
780     public int querySize() {
781         return cache.size;
782     }
783
784         public void gc(int youngTarget, int allowedTimeInMs) {
785
786                 collector.collect(youngTarget, allowedTimeInMs);
787
788         }
789
790
791         void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
792
793                 if(entry.isDiscarded()) return;
794                 if(workarea.containsKey(entry)) return;
795                 
796                 Iterable<CacheEntry> parents = entry.getParents(this);
797                 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
798                 for(CacheEntry e : parents) {
799                         if(e.isDiscarded()) continue;
800                         ps.add(e);
801                         processParentReport(e, workarea);
802                 }
803                 workarea.put(entry, ps);
804
805         }
806
807         public synchronized String reportQueryActivity(File file) throws IOException {
808                 
809                 System.err.println("reportQueries " + file.getAbsolutePath());
810
811                 if (!isAlive())
812                         return "Disposed!";
813
814                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
815
816                 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
817                 Collections.reverse(entries);
818                 
819                 for(Pair<String,Integer> entry : entries) {
820                         b.println(entry.first + ": " + entry.second);
821                 }
822
823                 b.close();
824                 
825                 Development.histogram.clear();
826
827                 return "OK";
828
829         }
830         
831         public synchronized String reportQueries(File file) throws IOException {
832
833                 System.err.println("reportQueries " + file.getAbsolutePath());
834
835                 if (!isAlive())
836                         return "Disposed!";
837
838                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
839
840                 long start = System.nanoTime();
841
842 //              ArrayList<CacheEntry> all = ;
843                 
844                 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
845                 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
846                 for(CacheEntryBase entry : caches) {
847                         processParentReport(entry, workarea);
848                 }
849                 
850                 //        for(CacheEntry e : all) System.err.println("entry: " + e);
851
852                 long duration = System.nanoTime() - start;
853                 System.err.println("Query root set in " + 1e-9*duration + "s.");
854
855                 start = System.nanoTime();
856
857                 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>(); 
858
859                 int listeners = 0;
860
861                 for(CacheEntry entry : workarea.keySet()) {
862                         boolean listener = listening.hasListenerAfterDisposing(entry);
863                         boolean hasParents = entry.getParents(this).iterator().hasNext();
864                         if(listener) {
865                                 // Bound
866                                 flagMap.put(entry, 0);
867                         } else if (!hasParents) {
868                                 // Unbound
869                                 flagMap.put(entry, 1);
870                         } else {
871                                 // Unknown
872                                 flagMap.put(entry, 2);
873                         }
874                         //              // Write leaf bit
875                         //              entry.flags |= 4;
876                 }
877
878                 boolean done = true;
879                 int loops = 0;
880
881                 do {
882
883                         done = true;
884
885                         long start2 = System.nanoTime();
886
887                         int boundCounter = 0;
888                         int unboundCounter = 0;
889                         int unknownCounter = 0;
890
891                         for(CacheEntry<?> entry : workarea.keySet()) {
892
893                                 //System.err.println("process " + entry);
894
895                                 int flags = flagMap.get(entry);
896                                 int bindStatus = flags & 3;
897
898                                 if(bindStatus == 0) boundCounter++;
899                                 else if(bindStatus == 1) unboundCounter++;
900                                 else if(bindStatus == 2) unknownCounter++;
901
902                                 if(bindStatus < 2) continue;
903
904                                 int newStatus = 1;
905                                 for(CacheEntry parent : entry.getParents(this)) {
906
907                                         if(parent.isDiscarded()) flagMap.put(parent, 1);
908
909                                         int flags2 = flagMap.get(parent);
910                                         int bindStatus2 = flags2 & 3;
911                                         // Parent is bound => child is bound
912                                         if(bindStatus2 == 0) {
913                                                 newStatus = 0;
914                                                 break;
915                                         }
916                                         // Parent is unknown => child is unknown
917                                         else if (bindStatus2 == 2) {
918                                                 newStatus = 2;
919                                                 done = false;
920                                                 break;
921                                         }
922                                 }
923
924                                 flagMap.put(entry, newStatus);
925
926                         }
927
928                         duration = System.nanoTime() - start2;
929                         System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
930                         b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
931
932                 } while(!done && loops++ < 20);
933
934                 if(loops >= 20) {
935
936                         for(CacheEntry entry : workarea.keySet()) {
937
938                                 int bindStatus = flagMap.get(entry);
939                                 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
940
941                         }
942
943                 }
944
945                 duration = System.nanoTime() - start;
946                 System.err.println("Query analysis in " + 1e-9*duration + "s.");
947
948                 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
949
950                 for(CacheEntry entry : workarea.keySet()) {
951                         Class<?> clazz = entry.getClass();
952                         if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).id.getClass(); 
953                         else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).id.getClass(); 
954                         else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).id.getClass(); 
955                         else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).id.getClass(); 
956                         else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).id.getClass(); 
957                         Integer c = counts.get(clazz);
958                         if(c == null) counts.put(clazz, -1);
959                         else counts.put(clazz, c-1);
960                 }
961
962                 b.print("// Simantics DB client query report file\n");
963                 b.print("// This file contains the following information\n");
964                 b.print("// -The amount of cached query instances per query class\n");
965                 b.print("// -The sizes of retained child sets\n");
966                 b.print("// -List of parents for each query (search for 'P <query name>')\n");
967                 b.print("//  -Followed by status, where\n");
968                 b.print("//   -0=bound\n");
969                 b.print("//   -1=free\n");
970                 b.print("//   -2=unknown\n");
971                 b.print("//   -L=has listener\n");
972                 b.print("// -List of children for each query (search for 'C <query name>')\n");
973
974                 b.print("----------------------------------------\n");
975
976                 b.print("// Queries by class\n");
977                 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
978                         b.print(-p.second + " " + p.first.getName() + "\n");
979                 }
980
981                 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
982                 for(CacheEntry e : workarea.keySet())
983                         hist.put(e, -1);
984                 
985                 boolean changed = true;
986                 int iter = 0;
987                 while(changed && iter++<50) {
988                         
989                         changed = false;
990                         
991                         Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
992                         for(CacheEntry e : workarea.keySet())
993                                 newHist.put(e, -1);
994
995                         for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
996                                 Integer c = hist.get(e.getKey());
997                                 for(CacheEntry p : e.getValue()) {
998                                         Integer i = newHist.get(p);
999                                         newHist.put(p, i+c);
1000                                 }
1001                         }
1002                         for(CacheEntry e : workarea.keySet()) {
1003                                 Integer value = newHist.get(e);
1004                                 Integer old = hist.get(e);
1005                                 if(!value.equals(old)) {
1006                                         hist.put(e, value);
1007 //                                      System.err.println("hist " + e + ": " + old + " => " + value);
1008                                         changed = true;
1009                                 }
1010                         }
1011                         
1012                         System.err.println("Retained set iteration " + iter);
1013
1014                 }
1015
1016                 b.print("// Queries by retained set\n");
1017                 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1018                         b.print("" + -p.second + " " + p.first + "\n");
1019                 }
1020
1021                 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1022
1023                 b.print("// Entry parent listing\n");
1024                 for(CacheEntry entry : workarea.keySet()) {
1025                         int status = flagMap.get(entry);
1026                         boolean hasListener = listening.hasListenerAfterDisposing(entry);
1027                         b.print("Q " + entry.toString());
1028                         if(hasListener) {
1029                                 b.print(" (L" + status + ")");
1030                                 listeners++;
1031                         } else {
1032                                 b.print(" (" + status + ")");
1033                         }
1034                         b.print("\n");
1035                         for(CacheEntry parent : workarea.get(entry)) {
1036                                 Collection<CacheEntry> inv = inverse.get(parent);
1037                                 if(inv == null) {
1038                                         inv = new ArrayList<CacheEntry>();
1039                                         inverse.put(parent, inv);
1040                                 }
1041                                 inv.add(entry);
1042                                 b.print("  " + parent.toString());
1043                                 b.print("\n");
1044                         }
1045                 }
1046
1047                 b.print("// Entry child listing\n");
1048                 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1049                         b.print("C " + entry.getKey().toString());
1050                         b.print("\n");
1051                         for(CacheEntry child : entry.getValue()) {
1052                                 Integer h = hist.get(child);
1053                                 if(h != null) {
1054                                         b.print("  " + h);
1055                                 } else {
1056                                         b.print("  <no children>");
1057                                 }
1058                                 b.print("  " + child.toString());
1059                                 b.print("\n");
1060                         }
1061                 }
1062
1063                 b.print("#queries: " + workarea.keySet().size() + "\n");
1064                 b.print("#listeners: " + listeners + "\n");
1065
1066                 b.close();
1067
1068                 return "Dumped " + workarea.keySet().size() + " queries.";
1069
1070         }
1071
1072         boolean removeQuery(CacheEntry entry) {
1073
1074                 // This entry has been removed before. No need to do anything here.
1075                 if(entry.isDiscarded()) return false;
1076
1077                 assert (!entry.isDiscarded());
1078
1079                 Query query = entry.getQuery();
1080
1081                 query.removeEntry(this);
1082
1083                 cache.updates++;
1084                 cache.size--;
1085
1086                 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1087                         boundQueries--;
1088                 
1089                 entry.discard();
1090
1091                 return true;
1092
1093         }
1094
1095         /**
1096          * 
1097          * @return true if this entry is being listened
1098          */
1099         private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1100
1101                 assert (e != null);
1102
1103                 CacheEntry entry = e.entry;
1104
1105                 /*
1106                  * If the dependency graph forms a DAG, some entries are inserted in the
1107                  * todo list many times. They only need to be processed once though.
1108                  */
1109                 if (entry.isDiscarded()) {
1110                         if (Development.DEVELOPMENT) {
1111                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1112                                         System.err.print("D");
1113                                         for (int i = 0; i < e.indent; i++)
1114                                                 System.err.print(" ");
1115                                         System.err.println(entry.getQuery());
1116                                 }
1117                         }
1118 //                      System.err.println(" => DISCARDED");
1119                         return false;
1120                 }
1121
1122 //              if (entry.isRefuted()) {
1123 //                      if (Development.DEVELOPMENT) {
1124 //                              if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1125 //                                      System.err.print("R");
1126 //                                      for (int i = 0; i < e.indent; i++)
1127 //                                              System.err.print(" ");
1128 //                                      System.err.println(entry.getQuery());
1129 //                              }
1130 //                      }
1131 //                      return false;
1132 //              }
1133
1134                 if (entry.isExcepted()) {
1135                         if (Development.DEVELOPMENT) {
1136                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1137                                         System.err.print("E");
1138                                 }
1139                         }
1140                 }
1141
1142                 if (entry.isPending()) {
1143                         if (Development.DEVELOPMENT) {
1144                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1145                                         System.err.print("P");
1146                                 }
1147                         }
1148                 }
1149
1150                 cache.updates++;
1151
1152                 if (Development.DEVELOPMENT) {
1153                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1154                                 System.err.print("U ");
1155                                 for (int i = 0; i < e.indent; i++)
1156                                         System.err.print(" ");
1157                                 System.err.print(entry.getQuery());
1158                         }
1159                 }
1160
1161                 Query query = entry.getQuery();
1162                 int type = query.type();
1163
1164                 boolean hasListener = listening.hasListener(entry); 
1165
1166                 if (Development.DEVELOPMENT) {
1167                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1168                                 if(listening.hasListener(entry)) {
1169                                         System.err.println(" (L)");
1170                                 } else {
1171                                         System.err.println("");
1172                                 }
1173                         }
1174                 }
1175
1176                 if(entry.isPending() || entry.isExcepted()) {
1177
1178                         // If updated
1179                         if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1180
1181                                 immediates.put(entry, entry);
1182
1183                         } else {
1184
1185                                 if(hasListener) {
1186                                         entry.refute();
1187                                 } else {
1188                                         removeQuery(entry);
1189                                 }
1190
1191                         }
1192
1193                 } else {
1194
1195                         // If updated
1196                         if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1197
1198                                 immediates.put(entry, entry);
1199
1200                         } else {
1201
1202                                 if(hasListener) {
1203                                         entry.refute();
1204                                 } else {
1205                                         removeQuery(entry);
1206                                 }
1207
1208                         }
1209
1210                 }
1211
1212 //              System.err.println(" => FOO " + type);
1213
1214                 if (hasListener) {
1215                         ArrayList<ListenerEntry> entries = listening.listeners.get(entry);
1216                         if(entries != null) {
1217                                 for (ListenerEntry le : entries) {
1218                                         listening.scheduleListener(le);
1219                                 }
1220                         }
1221                 }
1222
1223                 // If invalid, update parents
1224                 if (type == RequestFlags.INVALIDATE) {
1225                         listening.updateParents(e.indent, entry, todo);
1226                 }
1227
1228                 return hasListener;
1229
1230         }
1231
1232         /**
1233          * @param av1 an array (guaranteed)
1234          * @param av2 any object
1235          * @return <code>true</code> if the two arrays are equal
1236          */
1237         private final boolean arrayEquals(Object av1, Object av2) {
1238                 if (av2 == null)
1239                         return false;
1240                 Class<?> c1 = av1.getClass().getComponentType();
1241                 Class<?> c2 = av2.getClass().getComponentType();
1242                 if (c2 == null || !c1.equals(c2))
1243                         return false;
1244                 boolean p1 = c1.isPrimitive();
1245                 boolean p2 = c2.isPrimitive();
1246                 if (p1 != p2)
1247                         return false;
1248                 if (!p1)
1249                         return Arrays.equals((Object[]) av1, (Object[]) av2);
1250                 if (boolean.class.equals(c1))
1251                         return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1252                 else if (byte.class.equals(c1))
1253                         return Arrays.equals((byte[]) av1, (byte[]) av2);
1254                 else if (int.class.equals(c1))
1255                         return Arrays.equals((int[]) av1, (int[]) av2);
1256                 else if (long.class.equals(c1))
1257                         return Arrays.equals((long[]) av1, (long[]) av2);
1258                 else if (float.class.equals(c1))
1259                         return Arrays.equals((float[]) av1, (float[]) av2);
1260                 else if (double.class.equals(c1))
1261                         return Arrays.equals((double[]) av1, (double[]) av2);
1262                 throw new RuntimeException("??? Contact application querySupport.");
1263         }
1264
1265
1266
1267         final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1268
1269                 try {
1270
1271                         Query query = entry.getQuery();
1272
1273                         if (Development.DEVELOPMENT) {
1274                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_RECOMPUTE, Bindings.BOOLEAN)) {
1275                                         System.err.println("R " + query);
1276                                 }
1277                         }
1278
1279                         entry.prepareRecompute(querySupport);
1280                         
1281                         ReadGraphImpl parentGraph = graph.forRecompute(entry);
1282
1283                         query.recompute(parentGraph);
1284
1285                         if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1286
1287                         Object newValue = entry.getResult();
1288
1289                         if (ListenerEntry.NO_VALUE == oldValue) {
1290                                 if (Development.DEVELOPMENT) {
1291                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1292                                                 System.out.println("C " + query);
1293                                                 System.out.println("- " + oldValue);
1294                                                 System.out.println("- " + newValue);
1295                                         }
1296                                 }
1297                                 return newValue;
1298                         }
1299
1300                         boolean changed = false;
1301
1302                         if (newValue != null) {
1303                                 if (newValue.getClass().isArray()) {
1304                                         changed = !arrayEquals(newValue, oldValue);
1305                                 } else {
1306                                         changed = !newValue.equals(oldValue);
1307                                 }
1308                         } else
1309                                 changed = (oldValue != null);
1310
1311                         if (Development.DEVELOPMENT) {
1312                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1313                                         System.err.println("C " + query);
1314                                         System.err.println("- " + oldValue);
1315                                         System.err.println("- " + newValue);
1316                                 }
1317                         }
1318
1319                         return changed ? newValue : ListenerEntry.NOT_CHANGED;
1320
1321                 } catch (Throwable t) {
1322
1323                         Logger.defaultLogError(t);
1324                         entry.except(t);
1325                         return ListenerEntry.NO_VALUE;
1326
1327                 }
1328
1329         }
1330
1331
1332         /**
1333          * 
1334          * @return true if this entry still has listeners
1335          */
1336         public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1337
1338                 assert (!cache.collecting);
1339                 assert (!updating);
1340                 updating = true;
1341
1342                 boolean hadListeners = false;
1343                 boolean listenersUnknown = false;
1344
1345                 try {
1346
1347                         assert(entry != null);
1348                         LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1349                         IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1350                         todo.add(new UpdateEntry(null, entry, 0));
1351
1352                         while(true) {
1353
1354                                 // Walk the tree and collect immediate updates
1355                                 while (!todo.isEmpty()) {
1356                                         UpdateEntry e = todo.pop();
1357                                         hadListeners |= updateQuery(e, todo, immediates);
1358                                 }
1359
1360                                 if(immediates.isEmpty()) break;
1361
1362                                 // Evaluate all immediate updates and collect parents to update
1363                                 for(CacheEntry immediate : immediates.values()) {
1364
1365                                         if(immediate.isDiscarded()) {
1366                                                 continue;
1367                                         }
1368
1369                                         if(immediate.isExcepted()) {
1370
1371                                                 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1372                                                 if (newValue != ListenerEntry.NOT_CHANGED)
1373                                                         listening.updateParents(0, immediate, todo);
1374
1375                                         } else {
1376
1377                                                 Object oldValue = immediate.getResult();
1378                                                 Object newValue = compareTo(graph, immediate, oldValue);
1379
1380                                                 if (newValue != ListenerEntry.NOT_CHANGED) {
1381                                                         listening.updateParents(0, immediate, todo);
1382                                                 } else {
1383                                                         // If not changed, keep the old value
1384                                                         immediate.setResult(oldValue);
1385                                                         immediate.setReady();
1386                                                         listenersUnknown = true;
1387                                                 }
1388
1389                                         }
1390
1391                                 }
1392                                 immediates.clear();
1393
1394                         }
1395
1396                 } catch (Throwable t) {
1397                         Logger.defaultLogError(t);
1398                 }
1399
1400                 assert (updating);
1401                 updating = false;
1402
1403                 return hadListeners | listenersUnknown;
1404
1405         }
1406
1407         private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1408         private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1409         private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1410         // Maybe use a mutex from util.concurrent?
1411         private Object primitiveUpdateLock = new Object();
1412         private THashSet scheduledPrimitiveUpdates = new THashSet();
1413
1414         private ArrayList<CacheEntry> refutations = new ArrayList<>();
1415         
1416         private void markForUpdate(ReadGraphImpl graph, CacheEntry e) {
1417                 e.refute();
1418                 refutations.add(e);
1419         }
1420
1421         private void updateRefutations(ReadGraphImpl graph) {
1422                 
1423                 for(CacheEntry e : refutations)
1424                         update(graph, e);
1425                 
1426                 refutations.clear();
1427                 
1428         }
1429         
1430         public void propagateChangesInQueryCache(final ReadGraphImpl graph) {
1431                 
1432                 // Make sure that listening has performed its work
1433                 listening.sync();
1434
1435                 cache.dirty = false;
1436                 lastInvalidate = 0;
1437
1438                 if (Development.DEVELOPMENT) {
1439                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1440                                 System.err.println("== Query update ==");
1441                         }
1442                 }
1443
1444                 // Special case - one statement
1445                 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1446
1447                         long arg0 = scheduledObjectUpdates.getFirst();
1448
1449                         final int subject = (int)(arg0 >>> 32);
1450                         final int predicate = (int)(arg0 & 0xffffffff);
1451
1452                         for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1453                         for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1454                         for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1455
1456                         if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1457                                 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1458                                 if(principalTypes != null) markForUpdate(graph, principalTypes);
1459                                 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1460                                 if(types != null) markForUpdate(graph, types);
1461                         }
1462
1463                         if(predicate == subrelationOf) {
1464                                 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1465                                 if(superRelations != null) markForUpdate(graph, superRelations);
1466                         }
1467
1468                         DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1469                         if(dp != null) markForUpdate(graph, dp);
1470                         OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1471                         if(os != null) markForUpdate(graph, os);
1472
1473                         updateRefutations(graph);
1474                         
1475                         scheduledObjectUpdates.clear();
1476
1477                         if (Development.DEVELOPMENT) {
1478                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1479                                         System.err.println("== Query update ends ==");
1480                                 }
1481                         }
1482
1483                         return;
1484
1485                 }
1486
1487                 // Special case - one value
1488                 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1489
1490                         int arg0 = scheduledValueUpdates.getFirst();
1491
1492                         ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1493                         if(valueQuery != null) markForUpdate(graph, valueQuery);
1494
1495                         updateRefutations(graph);
1496
1497                         scheduledValueUpdates.clear();
1498
1499                         if (Development.DEVELOPMENT) {
1500                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1501                                         System.err.println("== Query update ends ==");
1502                                 }
1503                         }
1504                         
1505                         return;
1506
1507                 }
1508
1509                 final TIntHashSet predicates = new TIntHashSet();
1510                 final TIntHashSet orderedSets = new TIntHashSet();
1511
1512                 THashSet primitiveUpdates;
1513                 synchronized (primitiveUpdateLock) {
1514                         primitiveUpdates = scheduledPrimitiveUpdates;
1515                         scheduledPrimitiveUpdates = new THashSet();
1516                 }
1517
1518                 scheduledValueUpdates.forEach(new TIntProcedure() {
1519
1520                         @Override
1521                         public boolean execute(int arg0) {
1522                                 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1523                                 if(valueQuery != null) markForUpdate(graph, valueQuery);
1524                                 return true;
1525                         }
1526
1527                 });
1528
1529                 scheduledInvalidates.forEach(new TIntProcedure() {
1530
1531                         @Override
1532                         public boolean execute(int resource) {
1533                                 
1534                                 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1535                                 if(valueQuery != null) markForUpdate(graph, valueQuery);
1536                                 
1537                                 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1538                                 if(principalTypes != null) markForUpdate(graph, principalTypes);
1539                                 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1540                                 if(types != null) markForUpdate(graph, types);
1541
1542                                 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1543                                 if(superRelations != null) markForUpdate(graph, superRelations);
1544
1545                                 predicates.add(resource);
1546                                 
1547                                 return true;
1548                         }
1549
1550                 });
1551
1552                 scheduledObjectUpdates.forEach(new TLongProcedure() {
1553
1554                         @Override
1555                         public boolean execute(long arg0) {
1556
1557                                 final int subject = (int)(arg0 >>> 32);
1558                                 final int predicate = (int)(arg0 & 0xffffffff);
1559
1560                                 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1561                                         PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1562                                         if(principalTypes != null) markForUpdate(graph, principalTypes);
1563                                         Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1564                                         if(types != null) markForUpdate(graph, types);
1565                                 }
1566
1567                                 if(predicate == subrelationOf) {
1568                                         SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1569                                         if(superRelations != null) markForUpdate(graph, superRelations);
1570                                 }
1571
1572                                 predicates.add(subject);
1573                                 orderedSets.add(predicate);
1574
1575                                 return true;
1576
1577                         }
1578
1579                 });
1580
1581                 predicates.forEach(new TIntProcedure() {
1582
1583                         @Override
1584                         public boolean execute(final int subject) {
1585
1586                                 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1587                                 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1588                                 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1589
1590                                 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1591                                 if(entry != null) markForUpdate(graph, entry);
1592
1593                                 return true;
1594
1595                         }
1596
1597                 });
1598
1599                 orderedSets.forEach(new TIntProcedure() {
1600
1601                         @Override
1602                         public boolean execute(int orderedSet) {
1603
1604                                 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1605                                 if(entry != null) markForUpdate(graph, entry);
1606
1607                                 return true;
1608
1609                         }
1610
1611                 });
1612
1613                 updateRefutations(graph);
1614
1615                 primitiveUpdates.forEach(new TObjectProcedure() {
1616
1617                         @Override
1618                         public boolean execute(Object arg0) {
1619
1620                                 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1621                                 if (query != null) {
1622                                         boolean listening = update(graph, query);
1623                                         if (!listening && !query.hasParents()) {
1624                                                 cache.externalReadEntryMap.remove(arg0);
1625                                                 query.discard();
1626                                         }
1627                                 }
1628                                 return true;
1629                         }
1630
1631                 });
1632                 
1633                 scheduledValueUpdates.clear();
1634                 scheduledObjectUpdates.clear();
1635                 scheduledInvalidates.clear();
1636                 
1637                 if (Development.DEVELOPMENT) {
1638                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1639                                 System.err.println("== Query update ends ==");
1640                         }
1641                 }
1642
1643         }
1644
1645         public void updateValue(final int resource) {
1646                 scheduledValueUpdates.add(resource);
1647                 cache.dirty = true;
1648         }
1649
1650         public void updateStatements(final int resource, final int predicate) {
1651                 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
1652                 cache.dirty = true;
1653         }
1654         
1655         private int lastInvalidate = 0;
1656         
1657         public void invalidateResource(final int resource) {
1658                 if(lastInvalidate == resource) return;
1659                 scheduledValueUpdates.add(resource);
1660                 lastInvalidate = resource;
1661                 cache.dirty = true;
1662         }
1663
1664         public void updatePrimitive(final ExternalRead primitive) {
1665
1666                 // External reads may be updated from arbitrary threads.
1667                 // Synchronize to prevent race-conditions.
1668                 synchronized (primitiveUpdateLock) {
1669                         scheduledPrimitiveUpdates.add(primitive);
1670                 }
1671                 querySupport.dirtyPrimitives();
1672
1673         }
1674
1675         @Override
1676         public synchronized String toString() {
1677                 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
1678         }
1679
1680         @Override
1681         protected void doDispose() {
1682
1683                 requests.release(Integer.MAX_VALUE / 2);
1684                 
1685                 for(int index = 0; index < THREADS; index++) { 
1686                         executors[index].dispose();
1687                 }
1688
1689                 // First just wait
1690                 for(int i=0;i<100;i++) {
1691
1692                         boolean alive = false;
1693                         for(int index = 0; index < THREADS; index++) { 
1694                                 alive |= executors[index].isAlive();
1695                         }
1696                         if(!alive) return;
1697                         try {
1698                                 Thread.sleep(5);
1699                         } catch (InterruptedException e) {
1700                                 Logger.defaultLogError(e);
1701                         }
1702
1703                 }
1704
1705                 // Then start interrupting
1706                 for(int i=0;i<100;i++) {
1707
1708                         boolean alive = false;
1709                         for(int index = 0; index < THREADS; index++) { 
1710                                 alive |= executors[index].isAlive();
1711                         }
1712                         if(!alive) return;
1713                         for(int index = 0; index < THREADS; index++) {
1714                                 executors[index].interrupt();
1715                         }
1716                 }
1717
1718                 //              // Then just destroy
1719                 //              for(int index = 0; index < THREADS; index++) {
1720                 //                      executors[index].destroy();
1721                 //              }
1722
1723                 for(int index = 0; index < THREADS; index++) {
1724                         try {
1725                                 executors[index].join(5000);
1726                         } catch (InterruptedException e) {
1727                                 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
1728                         }
1729                         executors[index] = null;
1730                 }
1731
1732         }
1733
1734         public int getHits() {
1735                 return cache.hits;
1736         }
1737
1738         public int getMisses() {
1739                 return cache.misses;
1740         }
1741
1742         public int getSize() {
1743                 return cache.size;
1744         }
1745
1746         public Set<Long> getReferencedClusters() {
1747                 HashSet<Long> result = new HashSet<Long>();
1748                 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
1749                         Objects query = (Objects) entry.getQuery();
1750                         result.add(querySupport.getClusterId(query.r1()));
1751                 }
1752                 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
1753                         DirectPredicates query = (DirectPredicates) entry.getQuery();
1754                         result.add(querySupport.getClusterId(query.id));
1755                 }
1756                 for (CacheEntry entry : cache.valueQueryMap.values()) {
1757                         ValueQuery query = (ValueQuery) entry.getQuery();
1758                         result.add(querySupport.getClusterId(query.id));
1759                 }
1760                 return result;
1761         }
1762
1763         public void assertDone() {
1764         }
1765
1766         CacheCollectionResult allCaches(CacheCollectionResult result) {
1767                 
1768                 return cache.allCaches(result);
1769
1770         }
1771
1772         public void printDiagnostics() {
1773         }
1774
1775         public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
1776                 querySupport.requestCluster(graph, clusterId, runnable);
1777         }
1778
1779         public int clean() {
1780                 collector.collect(0, Integer.MAX_VALUE);
1781                 return cache.size;
1782         }
1783
1784         public void clean(final Collection<ExternalRead<?>> requests) {
1785                 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
1786                         Iterator<ExternalRead<?>> iterator = requests.iterator();
1787                         @Override
1788                         public CacheCollectionResult allCaches() {
1789                                 throw new UnsupportedOperationException();
1790                         }
1791                         @Override
1792                         public CacheEntryBase iterate(int level) {
1793                                 if(iterator.hasNext()) {
1794                                         ExternalRead<?> request = iterator.next();
1795                                         ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1796                                         if (entry != null) return entry;
1797                                         else return iterate(level);
1798                                 } else {
1799                                         iterator = requests.iterator();
1800                                         return null;
1801                                 }
1802                         }
1803                         @Override
1804                         public void remove() {
1805                                 throw new UnsupportedOperationException();
1806                         }
1807                         @Override
1808                         public void setLevel(CacheEntryBase entry, int level) {
1809                                 throw new UnsupportedOperationException();
1810                         }
1811                         @Override
1812                         public Collection<CacheEntry> getRootList() {
1813                                 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
1814                                 for (ExternalRead<?> request : requests) {
1815                                         ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1816                                         if (entry != null)
1817                                                 result.add(entry);
1818                                 }
1819                                 return result;
1820                         }
1821                         @Override
1822                         public int getCurrentSize() {
1823                                 return cache.size;
1824                         }
1825                         @Override
1826                         public int calculateCurrentSize() {
1827                                 // This tells the collector to attempt collecting everything.
1828                                 return Integer.MAX_VALUE;
1829                         }
1830                         @Override
1831                         public boolean start(boolean flush) {
1832                                 return true;
1833                         }
1834                 };
1835                 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
1836         }
1837
1838         public void scanPending() {
1839                 
1840                 cache.scanPending();
1841
1842         }
1843
1844         public ReadGraphImpl graphForVirtualRequest() {
1845                 return ReadGraphImpl.createAsync(this);
1846         }
1847
1848         
1849         private HashMap<Resource, Class<?>> builtinValues;
1850         
1851         public Class<?> getBuiltinValue(Resource r) {
1852                 if(builtinValues == null) initBuiltinValues();
1853                 return builtinValues.get(r);
1854         }
1855
1856         Exception callerException = null;
1857
1858     public interface AsyncBarrier {
1859         public void inc(); 
1860         public void dec();
1861         public void waitBarrier(Object request, ReadGraphImpl impl);
1862         public boolean isBlocking();
1863     }
1864
1865 //      final public QueryProcessor processor;
1866 //      final public QuerySupport support;
1867
1868         //    boolean disposed = false;
1869
1870         private void initBuiltinValues() {
1871
1872                 Layer0 b = getSession().peekService(Layer0.class);
1873                 if(b == null) return;
1874
1875                 builtinValues = new HashMap<Resource, Class<?>>();
1876
1877                 builtinValues.put(b.String, String.class);
1878                 builtinValues.put(b.Double, Double.class);
1879                 builtinValues.put(b.Float, Float.class);
1880                 builtinValues.put(b.Long, Long.class);
1881                 builtinValues.put(b.Integer, Integer.class);
1882                 builtinValues.put(b.Byte, Byte.class);
1883                 builtinValues.put(b.Boolean, Boolean.class);
1884
1885                 builtinValues.put(b.StringArray, String[].class);
1886                 builtinValues.put(b.DoubleArray, double[].class);
1887                 builtinValues.put(b.FloatArray, float[].class);
1888                 builtinValues.put(b.LongArray, long[].class);
1889                 builtinValues.put(b.IntegerArray, int[].class);
1890                 builtinValues.put(b.ByteArray, byte[].class);
1891                 builtinValues.put(b.BooleanArray, boolean[].class);
1892
1893         }
1894
1895 //      public ReadGraphSupportImpl(final QueryProcessor provider2) {
1896 //
1897 //              if (null == provider2) {
1898 //                      this.processor = null;
1899 //                      support = null;
1900 //                      return;
1901 //              }
1902 //              this.processor = provider2;
1903 //              support = provider2.getCore();
1904 //              initBuiltinValues();
1905 //
1906 //      }
1907
1908 //      final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
1909 //              return new ReadGraphSupportImpl(impl.processor);
1910 //      }
1911
1912         @Override
1913         final public Session getSession() {
1914                 return session;
1915         }
1916         
1917         final public ResourceSupport getResourceSupport() {
1918                 return resourceSupport;
1919         }
1920
1921         @Override
1922         final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
1923
1924         try {
1925
1926                 for(Resource predicate : getPredicates(impl, subject))
1927                     procedure.execute(impl, predicate);
1928
1929                 procedure.finished(impl);
1930
1931             } catch (Throwable e) {
1932                 procedure.exception(impl, e);
1933             }
1934
1935         }
1936
1937         @Override
1938         final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
1939                 
1940                 throw new UnsupportedOperationException();
1941
1942 //              assert(subject != null);
1943 //              assert(procedure != null);
1944 //
1945 //              final ListenerBase listener = getListenerBase(procedure);
1946 //
1947 //              try {
1948 //                      QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
1949 //
1950 //                              @Override
1951 //                              public void execute(ReadGraphImpl graph, int i) {
1952 //                                      try {
1953 //                                              procedure.execute(querySupport.getResource(i));
1954 //                                      } catch (Throwable t2) {
1955 //                                              Logger.defaultLogError(t2);
1956 //                                      }
1957 //                              }
1958 //
1959 //                              @Override
1960 //                              public void finished(ReadGraphImpl graph) {
1961 //                                      try {
1962 //                                              procedure.finished();
1963 //                                      } catch (Throwable t2) {
1964 //                                              Logger.defaultLogError(t2);
1965 //                                      }
1966 ////                            impl.state.barrier.dec();
1967 //                              }
1968 //
1969 //                              @Override
1970 //                              public void exception(ReadGraphImpl graph, Throwable t) {
1971 //                                      try {
1972 //                                              procedure.exception(t);
1973 //                                      } catch (Throwable t2) {
1974 //                                              Logger.defaultLogError(t2);
1975 //                                      }
1976 ////                            impl.state.barrier.dec();
1977 //                              }
1978 //
1979 //                      });
1980 //              } catch (DatabaseException e) {
1981 //                      Logger.defaultLogError(e);
1982 //              }
1983
1984         }
1985         
1986         @Override
1987         final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
1988                 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null); 
1989         }
1990
1991         @Override
1992         final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
1993                         final Resource predicate, final MultiProcedure<Statement> procedure) {
1994
1995                 assert(subject != null);
1996                 assert(predicate != null);
1997                 assert(procedure != null);
1998
1999                 final ListenerBase listener = getListenerBase(procedure);
2000
2001 //              impl.state.barrier.inc();
2002
2003                 try {
2004                         Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2005
2006                                 @Override
2007                                 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2008                                         try {
2009                                                 procedure.execute(querySupport.getStatement(s, p, o));
2010                                         } catch (Throwable t2) {
2011                                                 Logger.defaultLogError(t2);
2012                                         }
2013                                 }
2014
2015                                 @Override
2016                                 public void finished(ReadGraphImpl graph) {
2017                                         try {
2018                                                 procedure.finished();
2019                                         } catch (Throwable t2) {
2020                                                 Logger.defaultLogError(t2);
2021                                         }
2022 //                              impl.state.barrier.dec();
2023                                 }
2024
2025                                 @Override
2026                                 public void exception(ReadGraphImpl graph, Throwable t) {
2027                                         try {
2028                                                 procedure.exception(t);
2029                                         } catch (Throwable t2) {
2030                                                 Logger.defaultLogError(t2);
2031                                         }
2032 //                              impl.state.barrier.dec();
2033                                 }
2034
2035                         });
2036                 } catch (DatabaseException e) {
2037                         Logger.defaultLogError(e);
2038                 }
2039
2040         }
2041
2042         @Override
2043         final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2044                         final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2045
2046                 assert(subject != null);
2047                 assert(predicate != null);
2048                 assert(procedure != null);
2049
2050                 final ListenerBase listener = getListenerBase(procedure);
2051
2052                 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2053
2054                         boolean first = true;
2055
2056                         @Override
2057                         public void execute(ReadGraphImpl graph, int s, int p, int o) {
2058                                 try {
2059                                         if(first) {
2060                                                 procedure.execute(graph, querySupport.getStatement(s, p, o));
2061                                         } else {
2062                                                 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2063                                         }
2064                                 } catch (Throwable t2) {
2065                                         Logger.defaultLogError(t2);
2066                                 }
2067                         }
2068
2069                         @Override
2070                         public void finished(ReadGraphImpl graph) {
2071
2072                                 try {
2073                                         if(first) {
2074                                                 first = false;
2075                                                 procedure.finished(graph);
2076 //                                              impl.state.barrier.dec(this);
2077                                         } else {
2078                                                 procedure.finished(impl.newRestart(graph));
2079                                         }
2080                                 } catch (Throwable t2) {
2081                                         Logger.defaultLogError(t2);
2082                                 }
2083
2084                         }
2085
2086                         @Override
2087                         public void exception(ReadGraphImpl graph, Throwable t) {
2088
2089                                 try {
2090                                         if(first) {
2091                                                 first = false;
2092                                                 procedure.exception(graph, t);
2093 //                                              impl.state.barrier.dec(this);
2094                                         } else {
2095                                                 procedure.exception(impl.newRestart(graph), t);
2096                                         }
2097                                 } catch (Throwable t2) {
2098                                         Logger.defaultLogError(t2);
2099                                 }
2100
2101                         }
2102
2103                 };
2104
2105                 int sId = querySupport.getId(subject);
2106                 int pId = querySupport.getId(predicate);
2107
2108 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2109 //              else impl.state.barrier.inc(null, null);
2110
2111                 try {
2112                         Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2113                 } catch (DatabaseException e) {
2114                         Logger.defaultLogError(e);
2115                 }
2116
2117         }
2118
2119         @Override
2120         final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2121                         final Resource predicate, final StatementProcedure procedure) {
2122
2123                 assert(subject != null);
2124                 assert(predicate != null);
2125                 assert(procedure != null);
2126
2127                 final ListenerBase listener = getListenerBase(procedure);
2128
2129                 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2130
2131                         boolean first = true;
2132
2133                         @Override
2134                         public void execute(ReadGraphImpl graph, int s, int p, int o) {
2135                                 try {
2136                                         if(first) {
2137                                                 procedure.execute(graph, s, p, o);
2138                                         } else {
2139                                                 procedure.execute(impl.newRestart(graph), s, p, o);
2140                                         }
2141                                 } catch (Throwable t2) {
2142                                         Logger.defaultLogError(t2);
2143                                 }
2144                         }
2145
2146                         @Override
2147                         public void finished(ReadGraphImpl graph) {
2148
2149                                 try {
2150                                         if(first) {
2151                                                 first = false;
2152                                                 procedure.finished(graph);
2153 //                                              impl.state.barrier.dec(this);
2154                                         } else {
2155                                                 procedure.finished(impl.newRestart(graph));
2156                                         }
2157                                 } catch (Throwable t2) {
2158                                         Logger.defaultLogError(t2);
2159                                 }
2160
2161                         }
2162
2163                         @Override
2164                         public void exception(ReadGraphImpl graph, Throwable t) {
2165
2166                                 try {
2167                                         if(first) {
2168                                                 first = false;
2169                                                 procedure.exception(graph, t);
2170 //                                              impl.state.barrier.dec(this);
2171                                         } else {
2172                                                 procedure.exception(impl.newRestart(graph), t);
2173                                         }
2174                                 } catch (Throwable t2) {
2175                                         Logger.defaultLogError(t2);
2176                                 }
2177
2178                         }
2179
2180                 };
2181
2182                 int sId = querySupport.getId(subject);
2183                 int pId = querySupport.getId(predicate);
2184
2185 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2186 //              else impl.state.barrier.inc(null, null);
2187
2188                 try {
2189                         Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2190                 } catch (DatabaseException e) {
2191                         Logger.defaultLogError(e);
2192                 }
2193
2194         }
2195         
2196         @Override
2197         final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2198
2199                 assert(subject != null);
2200                 assert(predicate != null);
2201                 assert(procedure != null);
2202
2203                 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2204
2205                         private Set<Statement> current = null;
2206                         private Set<Statement> run = new HashSet<Statement>();
2207
2208                         @Override
2209                         public void execute(AsyncReadGraph graph, Statement result) {
2210
2211                                 boolean found = false;
2212
2213                                 if(current != null) {
2214
2215                                         found = current.remove(result);
2216
2217                                 }
2218
2219                                 if(!found) procedure.add(graph, result);
2220
2221                                 run.add(result);
2222
2223                         }
2224
2225                         @Override
2226                         public void finished(AsyncReadGraph graph) {
2227
2228                                 if(current != null) { 
2229                                         for(Statement r : current) procedure.remove(graph, r);
2230                                 }
2231
2232                                 current = run;
2233
2234                                 run = new HashSet<Statement>();
2235
2236                         }
2237
2238                         @Override
2239                         public void exception(AsyncReadGraph graph, Throwable t) {
2240                                 procedure.exception(graph, t);
2241                         }
2242
2243                         @Override
2244                         public boolean isDisposed() {
2245                                 return procedure.isDisposed();
2246                         }
2247
2248                 });
2249
2250         }
2251
2252         @Override
2253         final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2254                         final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2255
2256                 assert(subject != null);
2257                 assert(predicate != null);
2258                 assert(procedure != null);
2259
2260                 final ListenerBase listener = getListenerBase(procedure);
2261
2262 //              impl.state.barrier.inc();
2263
2264                 try {
2265                         QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2266
2267                                 @Override
2268                                 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2269                                         try {
2270                                                 procedure.execute(graph, querySupport.getStatement(s, p, o));
2271                                         } catch (Throwable t2) {
2272                                                 Logger.defaultLogError(t2);
2273                                         }
2274                                 }
2275
2276                                 @Override
2277                                 public void finished(ReadGraphImpl graph) {
2278                                         try {
2279                                                 procedure.finished(graph);
2280                                         } catch (Throwable t2) {
2281                                                 Logger.defaultLogError(t2);
2282                                         }
2283 //                              impl.state.barrier.dec();
2284                                 }
2285
2286                                 @Override
2287                                 public void exception(ReadGraphImpl graph, Throwable t) {
2288                                         try {
2289                                                 procedure.exception(graph, t);
2290                                         } catch (Throwable t2) {
2291                                                 Logger.defaultLogError(t2);
2292                                         }
2293 //                              impl.state.barrier.dec();
2294                                 }
2295
2296                         });
2297                 } catch (DatabaseException e) {
2298                         Logger.defaultLogError(e);
2299                 }
2300
2301         }
2302
2303         private static ListenerBase getListenerBase(Object procedure) {
2304                 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2305                 else return null;
2306         }
2307
2308         @Override
2309         final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2310
2311                 assert(subject != null);
2312                 assert(predicate != null);
2313                 assert(procedure != null);
2314
2315                 final ListenerBase listener = getListenerBase(procedure);
2316
2317 //              impl.state.barrier.inc();
2318
2319                 try {
2320                         QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2321
2322                                 @Override
2323                                 public void execute(ReadGraphImpl graph, int i) {
2324                                         try {
2325                                                 procedure.execute(querySupport.getResource(i));
2326                                         } catch (Throwable t2) {
2327                                                 Logger.defaultLogError(t2);
2328                                         }
2329                                 }
2330
2331                                 @Override
2332                                 public void finished(ReadGraphImpl graph) {
2333                                         try {
2334                                                 procedure.finished();
2335                                         } catch (Throwable t2) {
2336                                                 Logger.defaultLogError(t2);
2337                                         }
2338 //                              impl.state.barrier.dec();
2339                                 }
2340
2341                                 @Override
2342                                 public void exception(ReadGraphImpl graph, Throwable t) {
2343                                         System.out.println("forEachObject exception " + t);
2344                                         try {
2345                                                 procedure.exception(t);
2346                                         } catch (Throwable t2) {
2347                                                 Logger.defaultLogError(t2);
2348                                         }
2349 //                              impl.state.barrier.dec();
2350                                 }
2351
2352                         });
2353                 } catch (DatabaseException e) {
2354                         Logger.defaultLogError(e);
2355                 }
2356
2357         }
2358
2359         @Override
2360         final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
2361
2362                 assert(subject != null);
2363                 assert(procedure != null);
2364
2365                 final ListenerBase listener = getListenerBase(procedure);
2366
2367                 int sId = querySupport.getId(subject);
2368
2369                 try {
2370                         QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
2371
2372                                 @Override
2373                                 public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
2374                                         procedure.execute(graph, result);
2375                                 }
2376
2377                                 @Override
2378                                 public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
2379                                         procedure.exception(graph, throwable);
2380                                 }
2381                                 
2382                         });
2383                 } catch (DatabaseException e) {
2384                         Logger.defaultLogError(e);
2385                 }
2386
2387         }
2388
2389         final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
2390
2391 //              assert(subject != null);
2392 //              assert(procedure != null);
2393 //
2394 //              final ListenerBase listener = getListenerBase(procedure);
2395 //
2396 //              org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2397                 
2398                 return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
2399
2400         }
2401
2402 //      @Override
2403 //      final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2404 //
2405 //              assert(subject != null);
2406 //              assert(procedure != null);
2407 //
2408 //              final ListenerBase listener = getListenerBase(procedure);
2409 //
2410 //              org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2411 //
2412 //      }
2413         
2414         private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2415
2416         @Override
2417         final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2418
2419                 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2420
2421                         private Resource single = null;
2422
2423                         @Override
2424                         public synchronized void execute(AsyncReadGraph graph, Resource result) {
2425                                 if(single == null) {
2426                                         single = result;
2427                                 } else {
2428                                         single = INVALID_RESOURCE;
2429                                 }
2430                         }
2431
2432                         @Override
2433                         public synchronized void finished(AsyncReadGraph graph) {
2434                                 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2435                                 else procedure.execute(graph, single);
2436                         }
2437
2438                         @Override
2439                         public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2440                                 procedure.exception(graph, throwable);
2441                         }
2442
2443                 });
2444
2445         }
2446
2447         final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2448                 
2449                 final int sId = querySupport.getId(subject);
2450                 final int pId = querySupport.getId(predicate);
2451
2452                 try {
2453                         QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2454                 } catch (DatabaseException e) {
2455                         Logger.defaultLogError(e);
2456                 }
2457                 
2458         }
2459         
2460         static class Runner2Procedure implements IntProcedure {
2461             
2462             public int single = 0;
2463             public Throwable t = null;
2464
2465             public void clear() {
2466                 single = 0;
2467                 t = null;
2468             }
2469       &nb