eaaa9b3730976265c0673dc4547bbe35b3d4904c
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.query;
13
14 import java.io.BufferedOutputStream;
15 import java.io.File;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.simantics.databoard.Bindings;
36 import org.simantics.db.AsyncReadGraph;
37 import org.simantics.db.DevelopmentKeys;
38 import org.simantics.db.DirectStatements;
39 import org.simantics.db.ReadGraph;
40 import org.simantics.db.RelationInfo;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
46 import org.simantics.db.common.utils.Logger;
47 import org.simantics.db.exception.DatabaseException;
48 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
49 import org.simantics.db.exception.NoInverseException;
50 import org.simantics.db.exception.ResourceNotFoundException;
51 import org.simantics.db.impl.ResourceImpl;
52 import org.simantics.db.impl.graph.BarrierTracing;
53 import org.simantics.db.impl.graph.ReadGraphImpl;
54 import org.simantics.db.impl.graph.ReadGraphSupport;
55 import org.simantics.db.impl.procedure.IntProcedureAdapter;
56 import org.simantics.db.impl.procedure.InternalProcedure;
57 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
58 import org.simantics.db.impl.support.ResourceSupport;
59 import org.simantics.db.procedure.AsyncMultiListener;
60 import org.simantics.db.procedure.AsyncMultiProcedure;
61 import org.simantics.db.procedure.AsyncProcedure;
62 import org.simantics.db.procedure.AsyncSetListener;
63 import org.simantics.db.procedure.ListenerBase;
64 import org.simantics.db.procedure.MultiProcedure;
65 import org.simantics.db.procedure.StatementProcedure;
66 import org.simantics.db.procedure.SyncMultiProcedure;
67 import org.simantics.db.request.AsyncMultiRead;
68 import org.simantics.db.request.ExternalRead;
69 import org.simantics.db.request.MultiRead;
70 import org.simantics.db.request.RequestFlags;
71 import org.simantics.layer0.Layer0;
72 import org.simantics.utils.DataContainer;
73 import org.simantics.utils.Development;
74 import org.simantics.utils.datastructures.Pair;
75 import org.simantics.utils.datastructures.collections.CollectionUtils;
76 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
77
78 import gnu.trove.procedure.TIntProcedure;
79 import gnu.trove.procedure.TLongProcedure;
80 import gnu.trove.procedure.TObjectProcedure;
81 import gnu.trove.set.hash.THashSet;
82 import gnu.trove.set.hash.TIntHashSet;
83
84 @SuppressWarnings({"rawtypes", "unchecked"})
85 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
86
87         public static int                                       indent                = 0;
88
89         
90         // Garbage collection
91         
92         public int                                              boundQueries          = 0;
93
94
95         final private int                                       functionalRelation;
96
97         final private int                                       superrelationOf;
98
99         final private int                                       instanceOf;
100
101         final private int                                       inverseOf;
102
103         final private int                                       asserts;
104
105         final private int                                       hasPredicate;
106
107         final private int                                       hasPredicateInverse;
108
109         final private int                                       hasObject;
110
111         final private int                                       inherits;
112
113         final private int                                       subrelationOf;
114
115         final private int                                       rootLibrary;
116
117         /**
118          * A cache for the root library resource. Initialized in
119          * {@link #getRootLibraryResource()}.
120          */
121         private volatile ResourceImpl                           rootLibraryResource;
122
123         final private int                                       library;
124
125         final private int                                       consistsOf;
126
127         final private int                                       hasName;
128
129         AtomicInteger                                       sleepers = new AtomicInteger(0);
130
131         boolean                                         updating              = false;
132
133
134         final public QueryCache                                 cache;
135         final public QuerySupport                               querySupport;
136         final public Session                                    session;
137         final public ResourceSupport                            resourceSupport;
138         
139         final public Semaphore                                  requests = new Semaphore(1);
140         
141         final public QueryListening                            listening = new QueryListening(this);
142
143         QueryThread[]                                   executors;
144         
145         public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
146     
147     public LinkedList<SessionTask>                           topLevelTasks = new LinkedList<SessionTask>();
148
149         enum ThreadState {
150
151                 INIT, RUN, SLEEP, DISPOSED
152
153         }
154
155         public ThreadState[]                                                                    threadStates;
156         
157         final Object querySupportLock;
158         
159         public Long modificationCounter = 0L;
160
161         public void close() {
162         }
163
164     public SessionTask getSubTask(ReadGraphImpl parent) {
165         synchronized(querySupportLock) {
166             int index = 0;
167             while(index < freeScheduling.size()) {
168                 SessionTask task = freeScheduling.get(index);
169                 if(task.isSubtask(parent) && task.maybeReady()) {
170                     return freeScheduling.remove(index);
171                 }
172                 index++;
173             }
174         }
175         return null;
176     }
177
178     /*
179      * We are running errands while waiting for requests to complete.
180      * We can only run work that is part of the current root request to avoid any deadlocks
181      */
182     public boolean performPending(ReadGraphImpl under) {
183         SessionTask task = getSubTask(under);
184                 if(task != null) {
185                         task.run(thread.get());
186                         return true;
187                 }
188                 return false;
189         }
190     
191     final public void scheduleNow(SessionTask request) {
192         SessionTask toExecute = scheduleOrReturnForExecution(request);
193         if(toExecute != null)
194             toExecute.run(thread.get());
195     }
196
197     final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
198
199         assert(request != null);
200
201         synchronized(querySupportLock) {
202
203             LinkedList<SessionTask> queue = request.rootGraph != null ? freeScheduling : topLevelTasks;
204             
205             if(BarrierTracing.BOOKKEEPING) {
206                 Exception current = new Exception();
207                 Exception previous = BarrierTracing.tasks.put(request, current);
208                 if(previous != null) {
209                     previous.printStackTrace();
210                     current.printStackTrace();
211                 }
212             }
213
214             queue.addFirst(request);
215             requests.release();
216
217         }
218
219         return null;
220
221     }
222
223
224         final int THREADS;
225         final public int  THREAD_MASK;
226
227         final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
228
229         public static abstract class SessionTask {
230
231             final protected ReadGraphImpl rootGraph;
232                 private int counter = 0;
233                 protected int position = 1;
234                 private Exception trace;
235
236                 public SessionTask() {
237                     this(null);
238                 }
239                 
240         public SessionTask(ReadGraphImpl rootGraph) {
241             this.rootGraph = rootGraph;
242         }
243         
244         public boolean isSubtask(ReadGraphImpl graph) {
245             return graph.isParent(rootGraph);
246         }
247
248         public abstract void run0(int thread);
249
250                 public final void run(int thread) {
251                     if(counter++ > 0) {
252                         if(BarrierTracing.BOOKKEEPING) {
253                             trace.printStackTrace();
254                             new Exception().printStackTrace();
255                         }
256                         throw new IllegalStateException("Multiple invocations of SessionTask!");
257                     }
258                     if(BarrierTracing.BOOKKEEPING) {
259                         trace = new Exception();
260                     }
261                     run0(thread);
262                 }
263                 
264                 public boolean maybeReady() {
265                         return true;
266                 }
267
268                 @Override
269                 public String toString() {
270                         if(rootGraph == null)
271                                 return "SessionTask[no graph]";
272                         else
273                                 return "SessionTask[" + rootGraph.parent + "]";
274                 }
275
276         }
277
278         public static abstract class SessionRead extends SessionTask {
279
280                 final public Semaphore notify;
281                 final public DataContainer<Throwable> throwable; 
282
283                 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
284                         super(null);
285                         this.throwable = throwable;
286                         this.notify = notify;
287                 }
288
289         }
290
291         public boolean resume(ReadGraphImpl graph) {
292                 return executors[0].runSynchronized();
293         }
294
295         public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
296                         throws DatabaseException {
297
298                 THREADS = threads;
299                 THREAD_MASK = threads - 1;
300
301                 querySupport = core;
302                 cache = new QueryCache(core, threads);
303                 session = querySupport.getSession();
304                 resourceSupport = querySupport.getSupport();
305                 querySupportLock = core.getLock();
306
307                 executors = new QueryThread[THREADS];
308                 threadStates = new ThreadState[THREADS];
309
310                 for (int i = 0; i < THREADS; i++) {
311                         threadStates[i] = ThreadState.INIT;
312                 }
313
314                 for (int i = 0; i < THREADS; i++) {
315
316                         final int index = i;
317
318                         executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
319
320                         threadSet.add(executors[i]);
321
322                 }
323
324                 // Now start threads
325                 for (int i = 0; i < THREADS; i++) {
326                         executors[i].start();
327                 }
328
329                 // Make sure that query threads are up and running
330                 while(sleepers.get() != THREADS) {
331                         try {
332                                 Thread.sleep(5);
333                         } catch (InterruptedException e) {
334                                 e.printStackTrace();
335                         }
336                 }
337
338                 rootLibrary = core.getBuiltin("http:/");
339                 boolean builtinsInstalled = rootLibrary != 0;
340
341                 if (builtinsInstalled) {
342                         functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
343                         assert (functionalRelation != 0);
344                 } else
345                         functionalRelation = 0;
346
347                 if (builtinsInstalled) {
348                         instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
349                         assert (instanceOf != 0);
350                 } else
351                         instanceOf = 0;
352
353                 if (builtinsInstalled) {
354                         inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
355                         assert (inverseOf != 0);
356                 } else
357                         inverseOf = 0;
358
359
360                 if (builtinsInstalled) {
361                         inherits = core.getBuiltin(Layer0.URIs.Inherits);
362                         assert (inherits != 0);
363                 } else
364                         inherits = 0;
365
366                 if (builtinsInstalled) {
367                         asserts = core.getBuiltin(Layer0.URIs.Asserts);
368                         assert (asserts != 0);
369                 } else
370                         asserts = 0;
371
372                 if (builtinsInstalled) {
373                         hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
374                         assert (hasPredicate != 0);
375                 } else
376                         hasPredicate = 0;
377
378                 if (builtinsInstalled) {
379                         hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
380                         assert (hasPredicateInverse != 0);
381                 } else
382                         hasPredicateInverse = 0;
383
384                 if (builtinsInstalled) {
385                         hasObject = core.getBuiltin(Layer0.URIs.HasObject);
386                         assert (hasObject != 0);
387                 } else
388                         hasObject = 0;
389
390                 if (builtinsInstalled) {
391                         subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
392                         assert (subrelationOf != 0);
393                 } else
394                         subrelationOf = 0;
395
396                 if (builtinsInstalled) {
397                         superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
398                         assert (superrelationOf != 0);
399                 } else
400                         superrelationOf = 0;
401
402                 if (builtinsInstalled) {
403                         library = core.getBuiltin(Layer0.URIs.Library);
404                         assert (library != 0);
405                 } else
406                         library = 0;
407
408                 if (builtinsInstalled) {
409                         consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
410                         assert (consistsOf != 0);
411                 } else
412                         consistsOf = 0;
413
414                 if (builtinsInstalled) {
415                         hasName = core.getBuiltin(Layer0.URIs.HasName);
416                         assert (hasName != 0);
417                 } else
418                         hasName = 0;
419
420         }
421
422         final public void releaseWrite(ReadGraphImpl graph) {
423                 propagateChangesInQueryCache(graph);
424                 modificationCounter++;
425         }
426
427         final public int getId(final Resource r) {
428                 return querySupport.getId(r);
429         }
430
431         public QuerySupport getCore() {
432                 return querySupport;
433         }
434
435         public int getFunctionalRelation() {
436                 return functionalRelation;
437         }
438
439         public int getInherits() {
440                 return inherits;
441         }
442
443         public int getInstanceOf() {
444                 return instanceOf;
445         }
446
447         public int getInverseOf() {
448                 return inverseOf;
449         }
450
451         public int getSubrelationOf() {
452                 return subrelationOf;
453         }
454
455         public int getSuperrelationOf() {
456                 return superrelationOf;
457         }
458
459         public int getAsserts() {
460                 return asserts;
461         }
462
463         public int getHasPredicate() {
464                 return hasPredicate;
465         }
466
467         public int getHasPredicateInverse() {
468                 return hasPredicateInverse;
469         }
470
471         public int getHasObject() {
472                 return hasObject;
473         }
474
475         public int getRootLibrary() {
476                 return rootLibrary;
477         }
478
479         public Resource getRootLibraryResource() {
480                 if (rootLibraryResource == null) {
481                         // Synchronization is not needed here, it doesn't matter if multiple
482                         // threads simultaneously set rootLibraryResource once.
483                         int root = getRootLibrary();
484                         if (root == 0)
485                                 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
486                         this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
487                 }
488                 return rootLibraryResource;
489         }
490
491         public int getLibrary() {
492                 return library;
493         }
494
495         public int getConsistsOf() {
496                 return consistsOf;
497         }
498
499         public int getHasName() {
500                 return hasName;
501         }
502
503         public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
504
505                 try {
506                         
507                         QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
508
509                                 @Override
510                                 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
511
512                                         if (result != null && result != 0) {
513                                                 procedure.execute(graph, result);
514                                                 return;
515                                         }
516
517                                         // Fall back to using the fixed builtins.
518 //                                      result = querySupport.getBuiltin(id);
519 //                                      if (result != 0) {
520 //                                              procedure.execute(graph, result);
521 //                                              return;
522 //                                      } 
523
524 //                                      try {
525 //                                              result = querySupport.getRandomAccessReference(id);
526 //                                      } catch (ResourceNotFoundException e) {
527 //                                              procedure.exception(graph, e);
528 //                                              return;
529 //                                      }
530
531                                         if (result != 0) {
532                                                 procedure.execute(graph, result);
533                                         } else {
534                                                 procedure.exception(graph, new ResourceNotFoundException(id));
535                                         }
536
537                                 }
538
539                                 @Override
540                                 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
541                                         procedure.exception(graph, t);
542                                 }
543
544                         });
545                 } catch (DatabaseException e) {
546                     
547                     try {
548                         
549                 procedure.exception(graph, e);
550                 
551             } catch (DatabaseException e1) {
552                 
553                 Logger.defaultLogError(e1);
554                 
555             }
556                     
557                 }
558
559         }
560
561         public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
562
563                 Integer result = querySupport.getBuiltin(id);
564                 if (result != 0) {
565                         procedure.execute(graph, result);
566                 } else {
567                         procedure.exception(graph, new ResourceNotFoundException(id));
568                 }
569
570         }
571
572         final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
573
574                 try {
575                         QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
576                 } catch (DatabaseException e) {
577                         throw new IllegalStateException(e);
578                 }
579
580         }
581
582         public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
583
584                 
585                 try {
586                         QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
587                 } catch (DatabaseException e) {
588                         throw new IllegalStateException(e);
589                 }
590
591         }
592
593         final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
594                 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
595         }
596
597 //    @Override
598 //      public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
599 //      
600 //      return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
601 //
602 //      }
603
604         public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
605
606                 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
607
608         }
609
610         public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
611
612                 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
613
614         }
615
616         boolean isBound(ExternalReadEntry<?> entry) {
617                 if(entry.hasParents()) return true;
618                 else if(listening.hasListener(entry)) return true;
619                 else return false;
620         }
621
622         static class Dummy implements InternalProcedure<Object>, IntProcedure {
623
624                 @Override
625                 public void execute(ReadGraphImpl graph, int i) {
626                 }
627
628                 @Override
629                 public void finished(ReadGraphImpl graph) {
630                 }
631
632                 @Override
633                 public void execute(ReadGraphImpl graph, Object result) {
634                 }
635
636                 @Override
637                 public void exception(ReadGraphImpl graph, Throwable throwable) {
638                 }
639                 
640         }
641         
642         private static final Dummy dummy = new Dummy();
643
644         /*
645     public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
646
647         if (DebugPolicy.PERFORM)
648             System.out.println("PE[ " + (query.hashCode() &  THREAD_MASK) + "] " + query);
649
650         assert (!dirty);
651         assert (!collecting);
652
653         assert(query.assertNotDiscarded());
654
655         registerDependencies(graph, query, parent, listener, procedure, false);
656
657         // FRESH, REFUTED, EXCEPTED go here 
658         if (!query.isReady()) {
659
660             size++;
661             misses++;
662
663             query.computeForEach(graph, this, (Procedure)dummy, true);
664             return query.get(graph, this, null);
665
666         } else {
667
668             hits++;
669
670             return query.get(graph, this, procedure);
671
672         }
673
674     }
675         */
676         
677
678         interface QueryCollectorSupport {
679                 public CacheCollectionResult allCaches();
680                 public Collection<CacheEntry> getRootList();
681                 public int getCurrentSize();
682                 public int calculateCurrentSize();
683                 public CacheEntryBase iterate(int level);
684                 public void remove();
685                 public void setLevel(CacheEntryBase entry, int level);
686                 public boolean start(boolean flush);
687         }
688
689         interface QueryCollector {
690
691                 public void collect(int youngTarget, int allowedTimeInMs);
692
693         }
694
695         class QueryCollectorSupportImpl implements QueryCollectorSupport {
696
697                 private static final boolean DEBUG = false;
698                 private static final double ITERATION_RATIO = 0.2;
699                 
700                 private CacheCollectionResult iteration = new CacheCollectionResult();
701                 private boolean fresh = true;
702                 private boolean needDataInStart = true;
703                 
704                 QueryCollectorSupportImpl() {
705                         iteration.restart();
706                 }
707
708                 public CacheCollectionResult allCaches() {
709                         CacheCollectionResult result = new CacheCollectionResult();
710                         QueryProcessor.this.allCaches(result);
711                         result.restart();
712                         return result;
713                 }
714                 
715                 public boolean start(boolean flush) {
716                         // We need new data from query maps
717                         fresh = true;
718                         if(needDataInStart || flush) {
719                                 // Last run ended after processing all queries => refresh data
720                                 restart(flush ? 0.0 : ITERATION_RATIO);
721                         } else {
722                                 // continue with previous big data
723                         }
724                         // Notify caller about iteration situation
725                         return iteration.isAtStart();
726                 }
727
728                 private void restart(double targetRatio) {
729                         
730                         needDataInStart = true;
731
732                         long start = System.nanoTime();
733                         if(fresh) {
734                                 
735                                 // We need new data from query maps
736                                 
737                                 int iterationSize = iteration.size()+1;
738                                 int diff = calculateCurrentSize()-iterationSize;
739                                 
740                                 double ratio = (double)diff / (double)iterationSize;
741                                 boolean dirty = Math.abs(ratio) >= targetRatio;
742                                 
743                                 if(dirty) {
744                                         iteration = allCaches();
745                                         if(DEBUG) {
746                                                 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
747                                                 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
748                                                         System.err.print(" " + iteration.levels[i].size());
749                                                 System.err.println("");
750                                         }
751                                 } else {
752                                         iteration.restart();
753                                 }
754                                 
755                                 fresh = false;
756                                 needDataInStart = false;
757                         } else {
758                                 // We are returning here within the same GC round - reuse the cache table
759                                 iteration.restart();
760                         }
761                         
762                         return;
763                         
764                 }
765                 
766                 @Override
767                 public CacheEntryBase iterate(int level) {
768                         
769                         CacheEntryBase entry = iteration.next(level);
770                         if(entry == null) {
771                                 restart(ITERATION_RATIO);
772                                 return null;
773                         }
774                         
775                         while(entry != null && entry.isDiscarded()) {
776                                 entry = iteration.next(level);
777                         }
778                         
779                         return entry;
780                         
781                 }
782                 
783                 @Override
784                 public void remove() {
785                         iteration.remove();
786                 }
787                 
788                 @Override
789                 public void setLevel(CacheEntryBase entry, int level) {
790                         iteration.setLevel(entry, level);
791                 }
792
793                 public Collection<CacheEntry> getRootList() {
794                         return cache.getRootList();
795                 }
796
797                 @Override
798                 public int calculateCurrentSize() {
799                         return cache.calculateCurrentSize();
800                 }
801
802                 @Override
803                 public int getCurrentSize() {
804                         return cache.size;
805                 }
806
807         }
808         //    final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
809
810         private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
811         private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
812
813     public int querySize() {
814         return cache.size;
815     }
816
817         public void gc(int youngTarget, int allowedTimeInMs) {
818
819                 collector.collect(youngTarget, allowedTimeInMs);
820
821         }
822
823
824         void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
825
826                 if(entry.isDiscarded()) return;
827                 if(workarea.containsKey(entry)) return;
828                 
829                 Iterable<CacheEntry> parents = entry.getParents(this);
830                 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
831                 for(CacheEntry e : parents) {
832                         if(e.isDiscarded()) continue;
833                         ps.add(e);
834                         processParentReport(e, workarea);
835                 }
836                 workarea.put(entry, ps);
837
838         }
839
840         public synchronized String reportQueryActivity(File file) throws IOException {
841                 
842                 System.err.println("reportQueries " + file.getAbsolutePath());
843
844                 if (!isAlive())
845                         return "Disposed!";
846
847                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
848
849                 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
850                 Collections.reverse(entries);
851                 
852                 for(Pair<String,Integer> entry : entries) {
853                         b.println(entry.first + ": " + entry.second);
854                 }
855
856                 b.close();
857                 
858                 Development.histogram.clear();
859
860                 return "OK";
861
862         }
863         
864         public synchronized String reportQueries(File file) throws IOException {
865
866                 System.err.println("reportQueries " + file.getAbsolutePath());
867
868                 if (!isAlive())
869                         return "Disposed!";
870
871                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
872
873                 long start = System.nanoTime();
874
875 //              ArrayList<CacheEntry> all = ;
876                 
877                 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
878                 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
879                 for(CacheEntryBase entry : caches) {
880                         processParentReport(entry, workarea);
881                 }
882                 
883                 //        for(CacheEntry e : all) System.err.println("entry: " + e);
884
885                 long duration = System.nanoTime() - start;
886                 System.err.println("Query root set in " + 1e-9*duration + "s.");
887
888                 start = System.nanoTime();
889
890                 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>(); 
891
892                 int listeners = 0;
893
894                 for(CacheEntry entry : workarea.keySet()) {
895                         boolean listener = listening.hasListenerAfterDisposing(entry);
896                         boolean hasParents = entry.getParents(this).iterator().hasNext();
897                         if(listener) {
898                                 // Bound
899                                 flagMap.put(entry, 0);
900                         } else if (!hasParents) {
901                                 // Unbound
902                                 flagMap.put(entry, 1);
903                         } else {
904                                 // Unknown
905                                 flagMap.put(entry, 2);
906                         }
907                         //              // Write leaf bit
908                         //              entry.flags |= 4;
909                 }
910
911                 boolean done = true;
912                 int loops = 0;
913
914                 do {
915
916                         done = true;
917
918                         long start2 = System.nanoTime();
919
920                         int boundCounter = 0;
921                         int unboundCounter = 0;
922                         int unknownCounter = 0;
923
924                         for(CacheEntry<?> entry : workarea.keySet()) {
925
926                                 //System.err.println("process " + entry);
927
928                                 int flags = flagMap.get(entry);
929                                 int bindStatus = flags & 3;
930
931                                 if(bindStatus == 0) boundCounter++;
932                                 else if(bindStatus == 1) unboundCounter++;
933                                 else if(bindStatus == 2) unknownCounter++;
934
935                                 if(bindStatus < 2) continue;
936
937                                 int newStatus = 1;
938                                 for(CacheEntry parent : entry.getParents(this)) {
939
940                                         if(parent.isDiscarded()) flagMap.put(parent, 1);
941
942                                         int flags2 = flagMap.get(parent);
943                                         int bindStatus2 = flags2 & 3;
944                                         // Parent is bound => child is bound
945                                         if(bindStatus2 == 0) {
946                                                 newStatus = 0;
947                                                 break;
948                                         }
949                                         // Parent is unknown => child is unknown
950                                         else if (bindStatus2 == 2) {
951                                                 newStatus = 2;
952                                                 done = false;
953                                                 break;
954                                         }
955                                 }
956
957                                 flagMap.put(entry, newStatus);
958
959                         }
960
961                         duration = System.nanoTime() - start2;
962                         System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
963                         b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
964
965                 } while(!done && loops++ < 20);
966
967                 if(loops >= 20) {
968
969                         for(CacheEntry entry : workarea.keySet()) {
970
971                                 int bindStatus = flagMap.get(entry);
972                                 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
973
974                         }
975
976                 }
977
978                 duration = System.nanoTime() - start;
979                 System.err.println("Query analysis in " + 1e-9*duration + "s.");
980
981                 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
982
983                 for(CacheEntry entry : workarea.keySet()) {
984                         Class<?> clazz = entry.getClass();
985                         if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).id.getClass(); 
986                         else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).id.getClass(); 
987                         else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).id.getClass(); 
988                         else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).id.getClass(); 
989                         else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).id.getClass(); 
990                         Integer c = counts.get(clazz);
991                         if(c == null) counts.put(clazz, -1);
992                         else counts.put(clazz, c-1);
993                 }
994
995                 b.print("// Simantics DB client query report file\n");
996                 b.print("// This file contains the following information\n");
997                 b.print("// -The amount of cached query instances per query class\n");
998                 b.print("// -The sizes of retained child sets\n");
999                 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1000                 b.print("//  -Followed by status, where\n");
1001                 b.print("//   -0=bound\n");
1002                 b.print("//   -1=free\n");
1003                 b.print("//   -2=unknown\n");
1004                 b.print("//   -L=has listener\n");
1005                 b.print("// -List of children for each query (search for 'C <query name>')\n");
1006
1007                 b.print("----------------------------------------\n");
1008
1009                 b.print("// Queries by class\n");
1010                 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1011                         b.print(-p.second + " " + p.first.getName() + "\n");
1012                 }
1013
1014                 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1015                 for(CacheEntry e : workarea.keySet())
1016                         hist.put(e, -1);
1017                 
1018                 boolean changed = true;
1019                 int iter = 0;
1020                 while(changed && iter++<50) {
1021                         
1022                         changed = false;
1023                         
1024                         Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1025                         for(CacheEntry e : workarea.keySet())
1026                                 newHist.put(e, -1);
1027
1028                         for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1029                                 Integer c = hist.get(e.getKey());
1030                                 for(CacheEntry p : e.getValue()) {
1031                                         Integer i = newHist.get(p);
1032                                         newHist.put(p, i+c);
1033                                 }
1034                         }
1035                         for(CacheEntry e : workarea.keySet()) {
1036                                 Integer value = newHist.get(e);
1037                                 Integer old = hist.get(e);
1038                                 if(!value.equals(old)) {
1039                                         hist.put(e, value);
1040 //                                      System.err.println("hist " + e + ": " + old + " => " + value);
1041                                         changed = true;
1042                                 }
1043                         }
1044                         
1045                         System.err.println("Retained set iteration " + iter);
1046
1047                 }
1048
1049                 b.print("// Queries by retained set\n");
1050                 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1051                         b.print("" + -p.second + " " + p.first + "\n");
1052                 }
1053
1054                 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1055
1056                 b.print("// Entry parent listing\n");
1057                 for(CacheEntry entry : workarea.keySet()) {
1058                         int status = flagMap.get(entry);
1059                         boolean hasListener = listening.hasListenerAfterDisposing(entry);
1060                         b.print("Q " + entry.toString());
1061                         if(hasListener) {
1062                                 b.print(" (L" + status + ")");
1063                                 listeners++;
1064                         } else {
1065                                 b.print(" (" + status + ")");
1066                         }
1067                         b.print("\n");
1068                         for(CacheEntry parent : workarea.get(entry)) {
1069                                 Collection<CacheEntry> inv = inverse.get(parent);
1070                                 if(inv == null) {
1071                                         inv = new ArrayList<CacheEntry>();
1072                                         inverse.put(parent, inv);
1073                                 }
1074                                 inv.add(entry);
1075                                 b.print("  " + parent.toString());
1076                                 b.print("\n");
1077                         }
1078                 }
1079
1080                 b.print("// Entry child listing\n");
1081                 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1082                         b.print("C " + entry.getKey().toString());
1083                         b.print("\n");
1084                         for(CacheEntry child : entry.getValue()) {
1085                                 Integer h = hist.get(child);
1086                                 if(h != null) {
1087                                         b.print("  " + h);
1088                                 } else {
1089                                         b.print("  <no children>");
1090                                 }
1091                                 b.print("  " + child.toString());
1092                                 b.print("\n");
1093                         }
1094                 }
1095
1096                 b.print("#queries: " + workarea.keySet().size() + "\n");
1097                 b.print("#listeners: " + listeners + "\n");
1098
1099                 b.close();
1100
1101                 return "Dumped " + workarea.keySet().size() + " queries.";
1102
1103         }
1104
1105         boolean removeQuery(CacheEntry entry) {
1106
1107                 // This entry has been removed before. No need to do anything here.
1108                 if(entry.isDiscarded()) return false;
1109
1110                 assert (!entry.isDiscarded());
1111
1112                 Query query = entry.getQuery();
1113
1114                 query.removeEntry(this);
1115
1116                 cache.updates++;
1117                 cache.size--;
1118
1119                 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1120                         boundQueries--;
1121                 
1122                 entry.discard();
1123
1124                 return true;
1125
1126         }
1127
1128         /**
1129          * 
1130          * @return true if this entry is being listened
1131          */
1132         private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1133
1134                 assert (e != null);
1135
1136                 CacheEntry entry = e.entry;
1137
1138                 /*
1139                  * If the dependency graph forms a DAG, some entries are inserted in the
1140                  * todo list many times. They only need to be processed once though.
1141                  */
1142                 if (entry.isDiscarded()) {
1143                         if (Development.DEVELOPMENT) {
1144                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1145                                         System.err.print("D");
1146                                         for (int i = 0; i < e.indent; i++)
1147                                                 System.err.print(" ");
1148                                         System.err.println(entry.getQuery());
1149                                 }
1150                         }
1151 //                      System.err.println(" => DISCARDED");
1152                         return false;
1153                 }
1154
1155 //              if (entry.isRefuted()) {
1156 //                      if (Development.DEVELOPMENT) {
1157 //                              if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1158 //                                      System.err.print("R");
1159 //                                      for (int i = 0; i < e.indent; i++)
1160 //                                              System.err.print(" ");
1161 //                                      System.err.println(entry.getQuery());
1162 //                              }
1163 //                      }
1164 //                      return false;
1165 //              }
1166
1167                 if (entry.isExcepted()) {
1168                         if (Development.DEVELOPMENT) {
1169                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1170                                         System.err.print("E");
1171                                 }
1172                         }
1173                 }
1174
1175                 if (entry.isPending()) {
1176                         if (Development.DEVELOPMENT) {
1177                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1178                                         System.err.print("P");
1179                                 }
1180                         }
1181                 }
1182
1183                 cache.updates++;
1184
1185                 if (Development.DEVELOPMENT) {
1186                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1187                                 System.err.print("U ");
1188                                 for (int i = 0; i < e.indent; i++)
1189                                         System.err.print(" ");
1190                                 System.err.print(entry.getQuery());
1191                         }
1192                 }
1193
1194                 Query query = entry.getQuery();
1195                 int type = query.type();
1196
1197                 boolean hasListener = listening.hasListener(entry); 
1198
1199                 if (Development.DEVELOPMENT) {
1200                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1201                                 if(listening.hasListener(entry)) {
1202                                         System.err.println(" (L)");
1203                                 } else {
1204                                         System.err.println("");
1205                                 }
1206                         }
1207                 }
1208
1209                 if(entry.isPending() || entry.isExcepted()) {
1210
1211                         // If updated
1212                         if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1213
1214                                 immediates.put(entry, entry);
1215
1216                         } else {
1217
1218                                 if(hasListener) {
1219                                         entry.refute();
1220                                 } else {
1221                                         removeQuery(entry);
1222                                 }
1223
1224                         }
1225
1226                 } else {
1227
1228                         // If updated
1229                         if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1230
1231                                 immediates.put(entry, entry);
1232
1233                         } else {
1234
1235                                 if(hasListener) {
1236                                         entry.refute();
1237                                 } else {
1238                                         removeQuery(entry);
1239                                 }
1240
1241                         }
1242
1243                 }
1244
1245 //              System.err.println(" => FOO " + type);
1246
1247                 if (hasListener) {
1248                         ArrayList<ListenerEntry> entries = listening.listeners.get(entry);
1249                         if(entries != null) {
1250                                 for (ListenerEntry le : entries) {
1251                                         listening.scheduleListener(le);
1252                                 }
1253                         }
1254                 }
1255
1256                 // If invalid, update parents
1257                 if (type == RequestFlags.INVALIDATE) {
1258                         listening.updateParents(e.indent, entry, todo);
1259                 }
1260
1261                 return hasListener;
1262
1263         }
1264
1265         /**
1266          * @param av1 an array (guaranteed)
1267          * @param av2 any object
1268          * @return <code>true</code> if the two arrays are equal
1269          */
1270         private final boolean arrayEquals(Object av1, Object av2) {
1271                 if (av2 == null)
1272                         return false;
1273                 Class<?> c1 = av1.getClass().getComponentType();
1274                 Class<?> c2 = av2.getClass().getComponentType();
1275                 if (c2 == null || !c1.equals(c2))
1276                         return false;
1277                 boolean p1 = c1.isPrimitive();
1278                 boolean p2 = c2.isPrimitive();
1279                 if (p1 != p2)
1280                         return false;
1281                 if (!p1)
1282                         return Arrays.equals((Object[]) av1, (Object[]) av2);
1283                 if (boolean.class.equals(c1))
1284                         return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1285                 else if (byte.class.equals(c1))
1286                         return Arrays.equals((byte[]) av1, (byte[]) av2);
1287                 else if (int.class.equals(c1))
1288                         return Arrays.equals((int[]) av1, (int[]) av2);
1289                 else if (long.class.equals(c1))
1290                         return Arrays.equals((long[]) av1, (long[]) av2);
1291                 else if (float.class.equals(c1))
1292                         return Arrays.equals((float[]) av1, (float[]) av2);
1293                 else if (double.class.equals(c1))
1294                         return Arrays.equals((double[]) av1, (double[]) av2);
1295                 throw new RuntimeException("??? Contact application querySupport.");
1296         }
1297
1298
1299
1300         final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1301
1302                 try {
1303
1304                         Query query = entry.getQuery();
1305
1306                         if (Development.DEVELOPMENT) {
1307                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_RECOMPUTE, Bindings.BOOLEAN)) {
1308                                         System.err.println("R " + query);
1309                                 }
1310                         }
1311
1312                         entry.prepareRecompute(querySupport);
1313                         
1314                         ReadGraphImpl parentGraph = graph.forRecompute(entry);
1315
1316                         query.recompute(parentGraph);
1317
1318                         if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1319
1320                         Object newValue = entry.getResult();
1321
1322                         if (ListenerEntry.NO_VALUE == oldValue) {
1323                                 if (Development.DEVELOPMENT) {
1324                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1325                                                 System.out.println("C " + query);
1326                                                 System.out.println("- " + oldValue);
1327                                                 System.out.println("- " + newValue);
1328                                         }
1329                                 }
1330                                 return newValue;
1331                         }
1332
1333                         boolean changed = false;
1334
1335                         if (newValue != null) {
1336                                 if (newValue.getClass().isArray()) {
1337                                         changed = !arrayEquals(newValue, oldValue);
1338                                 } else {
1339                                         changed = !newValue.equals(oldValue);
1340                                 }
1341                         } else
1342                                 changed = (oldValue != null);
1343
1344                         if (Development.DEVELOPMENT) {
1345                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1346                                         System.err.println("C " + query);
1347                                         System.err.println("- " + oldValue);
1348                                         System.err.println("- " + newValue);
1349                                 }
1350                         }
1351
1352                         return changed ? newValue : ListenerEntry.NOT_CHANGED;
1353
1354                 } catch (Throwable t) {
1355
1356                         Logger.defaultLogError(t);
1357                         entry.except(t);
1358                         return ListenerEntry.NO_VALUE;
1359
1360                 }
1361
1362         }
1363
1364
1365         /**
1366          * 
1367          * @return true if this entry still has listeners
1368          */
1369         public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1370
1371                 assert (!cache.collecting);
1372                 assert (!updating);
1373                 updating = true;
1374
1375                 boolean hadListeners = false;
1376                 boolean listenersUnknown = false;
1377
1378                 try {
1379
1380                         assert(entry != null);
1381                         LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1382                         IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1383                         todo.add(new UpdateEntry(null, entry, 0));
1384
1385                         while(true) {
1386
1387                                 // Walk the tree and collect immediate updates
1388                                 while (!todo.isEmpty()) {
1389                                         UpdateEntry e = todo.pop();
1390                                         hadListeners |= updateQuery(e, todo, immediates);
1391                                 }
1392
1393                                 if(immediates.isEmpty()) break;
1394
1395                                 // Evaluate all immediate updates and collect parents to update
1396                                 for(CacheEntry immediate : immediates.values()) {
1397
1398                                         if(immediate.isDiscarded()) {
1399                                                 continue;
1400                                         }
1401
1402                                         if(immediate.isExcepted()) {
1403
1404                                                 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1405                                                 if (newValue != ListenerEntry.NOT_CHANGED)
1406                                                         listening.updateParents(0, immediate, todo);
1407
1408                                         } else {
1409
1410                                                 Object oldValue = immediate.getResult();
1411                                                 Object newValue = compareTo(graph, immediate, oldValue);
1412
1413                                                 if (newValue != ListenerEntry.NOT_CHANGED) {
1414                                                         listening.updateParents(0, immediate, todo);
1415                                                 } else {
1416                                                         // If not changed, keep the old value
1417                                                         immediate.setResult(oldValue);
1418                                                         immediate.setReady();
1419                                                         listenersUnknown = true;
1420                                                 }
1421
1422                                         }
1423
1424                                 }
1425                                 immediates.clear();
1426
1427                         }
1428
1429                 } catch (Throwable t) {
1430                         Logger.defaultLogError(t);
1431                 }
1432
1433                 assert (updating);
1434                 updating = false;
1435
1436                 return hadListeners | listenersUnknown;
1437
1438         }
1439
1440         private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1441         private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1442         private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1443         // Maybe use a mutex from util.concurrent?
1444         private Object primitiveUpdateLock = new Object();
1445         private THashSet scheduledPrimitiveUpdates = new THashSet();
1446
1447         private ArrayList<CacheEntry> refutations = new ArrayList<>();
1448         
1449         private void markForUpdate(ReadGraphImpl graph, CacheEntry e) {
1450                 e.refute();
1451                 refutations.add(e);
1452         }
1453
1454         private void updateRefutations(ReadGraphImpl graph) {
1455                 
1456                 for(CacheEntry e : refutations)
1457                         update(graph, e);
1458                 
1459                 refutations.clear();
1460                 
1461         }
1462         
1463         public void propagateChangesInQueryCache(final ReadGraphImpl graph) {
1464                 
1465                 // Make sure that listening has performed its work
1466                 listening.sync();
1467
1468                 cache.dirty = false;
1469                 lastInvalidate = 0;
1470
1471                 if (Development.DEVELOPMENT) {
1472                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1473                                 System.err.println("== Query update ==");
1474                         }
1475                 }
1476
1477                 // Special case - one statement
1478                 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1479
1480                         long arg0 = scheduledObjectUpdates.getFirst();
1481
1482                         final int subject = (int)(arg0 >>> 32);
1483                         final int predicate = (int)(arg0 & 0xffffffff);
1484
1485                         for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1486                         for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1487                         for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1488
1489                         if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1490                                 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1491                                 if(principalTypes != null) markForUpdate(graph, principalTypes);
1492                                 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1493                                 if(types != null) markForUpdate(graph, types);
1494                         }
1495
1496                         if(predicate == subrelationOf) {
1497                                 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1498                                 if(superRelations != null) markForUpdate(graph, superRelations);
1499                         }
1500
1501                         DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1502                         if(dp != null) markForUpdate(graph, dp);
1503                         OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1504                         if(os != null) markForUpdate(graph, os);
1505
1506                         updateRefutations(graph);
1507                         
1508                         scheduledObjectUpdates.clear();
1509
1510                         if (Development.DEVELOPMENT) {
1511                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1512                                         System.err.println("== Query update ends ==");
1513                                 }
1514                         }
1515
1516                         return;
1517
1518                 }
1519
1520                 // Special case - one value
1521                 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1522
1523                         int arg0 = scheduledValueUpdates.getFirst();
1524
1525                         ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1526                         if(valueQuery != null) markForUpdate(graph, valueQuery);
1527
1528                         updateRefutations(graph);
1529
1530                         scheduledValueUpdates.clear();
1531
1532                         if (Development.DEVELOPMENT) {
1533                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1534                                         System.err.println("== Query update ends ==");
1535                                 }
1536                         }
1537                         
1538                         return;
1539
1540                 }
1541
1542                 final TIntHashSet predicates = new TIntHashSet();
1543                 final TIntHashSet orderedSets = new TIntHashSet();
1544
1545                 THashSet primitiveUpdates;
1546                 synchronized (primitiveUpdateLock) {
1547                         primitiveUpdates = scheduledPrimitiveUpdates;
1548                         scheduledPrimitiveUpdates = new THashSet();
1549                 }
1550
1551                 scheduledValueUpdates.forEach(new TIntProcedure() {
1552
1553                         @Override
1554                         public boolean execute(int arg0) {
1555                                 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1556                                 if(valueQuery != null) markForUpdate(graph, valueQuery);
1557                                 return true;
1558                         }
1559
1560                 });
1561
1562                 scheduledInvalidates.forEach(new TIntProcedure() {
1563
1564                         @Override
1565                         public boolean execute(int resource) {
1566                                 
1567                                 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1568                                 if(valueQuery != null) markForUpdate(graph, valueQuery);
1569                                 
1570                                 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1571                                 if(principalTypes != null) markForUpdate(graph, principalTypes);
1572                                 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1573                                 if(types != null) markForUpdate(graph, types);
1574
1575                                 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1576                                 if(superRelations != null) markForUpdate(graph, superRelations);
1577
1578                                 predicates.add(resource);
1579                                 
1580                                 return true;
1581                         }
1582
1583                 });
1584
1585                 scheduledObjectUpdates.forEach(new TLongProcedure() {
1586
1587                         @Override
1588                         public boolean execute(long arg0) {
1589
1590                                 final int subject = (int)(arg0 >>> 32);
1591                                 final int predicate = (int)(arg0 & 0xffffffff);
1592
1593                                 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1594                                         PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1595                                         if(principalTypes != null) markForUpdate(graph, principalTypes);
1596                                         Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1597                                         if(types != null) markForUpdate(graph, types);
1598                                 }
1599
1600                                 if(predicate == subrelationOf) {
1601                                         SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1602                                         if(superRelations != null) markForUpdate(graph, superRelations);
1603                                 }
1604
1605                                 predicates.add(subject);
1606                                 orderedSets.add(predicate);
1607
1608                                 return true;
1609
1610                         }
1611
1612                 });
1613
1614                 predicates.forEach(new TIntProcedure() {
1615
1616                         @Override
1617                         public boolean execute(final int subject) {
1618
1619                                 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1620                                 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1621                                 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1622
1623                                 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1624                                 if(entry != null) markForUpdate(graph, entry);
1625
1626                                 return true;
1627
1628                         }
1629
1630                 });
1631
1632                 orderedSets.forEach(new TIntProcedure() {
1633
1634                         @Override
1635                         public boolean execute(int orderedSet) {
1636
1637                                 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1638                                 if(entry != null) markForUpdate(graph, entry);
1639
1640                                 return true;
1641
1642                         }
1643
1644                 });
1645
1646                 updateRefutations(graph);
1647
1648                 primitiveUpdates.forEach(new TObjectProcedure() {
1649
1650                         @Override
1651                         public boolean execute(Object arg0) {
1652
1653                                 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1654                                 if (query != null) {
1655                                         boolean listening = update(graph, query);
1656                                         if (!listening && !query.hasParents()) {
1657                                                 cache.externalReadEntryMap.remove(arg0);
1658                                                 query.discard();
1659                                         }
1660                                 }
1661                                 return true;
1662                         }
1663
1664                 });
1665                 
1666                 scheduledValueUpdates.clear();
1667                 scheduledObjectUpdates.clear();
1668                 scheduledInvalidates.clear();
1669                 
1670                 if (Development.DEVELOPMENT) {
1671                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1672                                 System.err.println("== Query update ends ==");
1673                         }
1674                 }
1675
1676         }
1677
1678         public void updateValue(final int resource) {
1679                 scheduledValueUpdates.add(resource);
1680                 cache.dirty = true;
1681         }
1682
1683         public void updateStatements(final int resource, final int predicate) {
1684                 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
1685                 cache.dirty = true;
1686         }
1687         
1688         private int lastInvalidate = 0;
1689         
1690         public void invalidateResource(final int resource) {
1691                 if(lastInvalidate == resource) return;
1692                 scheduledValueUpdates.add(resource);
1693                 lastInvalidate = resource;
1694                 cache.dirty = true;
1695         }
1696
1697         public void updatePrimitive(final ExternalRead primitive) {
1698
1699                 // External reads may be updated from arbitrary threads.
1700                 // Synchronize to prevent race-conditions.
1701                 synchronized (primitiveUpdateLock) {
1702                         scheduledPrimitiveUpdates.add(primitive);
1703                 }
1704                 querySupport.dirtyPrimitives();
1705
1706         }
1707
1708         @Override
1709         public synchronized String toString() {
1710                 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
1711         }
1712
1713         @Override
1714         protected void doDispose() {
1715
1716                 requests.release(Integer.MAX_VALUE / 2);
1717                 
1718                 for(int index = 0; index < THREADS; index++) { 
1719                         executors[index].dispose();
1720                 }
1721
1722                 // First just wait
1723                 for(int i=0;i<100;i++) {
1724
1725                         boolean alive = false;
1726                         for(int index = 0; index < THREADS; index++) { 
1727                                 alive |= executors[index].isAlive();
1728                         }
1729                         if(!alive) return;
1730                         try {
1731                                 Thread.sleep(5);
1732                         } catch (InterruptedException e) {
1733                                 Logger.defaultLogError(e);
1734                         }
1735
1736                 }
1737
1738                 // Then start interrupting
1739                 for(int i=0;i<100;i++) {
1740
1741                         boolean alive = false;
1742                         for(int index = 0; index < THREADS; index++) { 
1743                                 alive |= executors[index].isAlive();
1744                         }
1745                         if(!alive) return;
1746                         for(int index = 0; index < THREADS; index++) {
1747                                 executors[index].interrupt();
1748                         }
1749                 }
1750
1751                 //              // Then just destroy
1752                 //              for(int index = 0; index < THREADS; index++) {
1753                 //                      executors[index].destroy();
1754                 //              }
1755
1756                 for(int index = 0; index < THREADS; index++) {
1757                         try {
1758                                 executors[index].join(5000);
1759                         } catch (InterruptedException e) {
1760                                 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
1761                         }
1762                         executors[index] = null;
1763                 }
1764
1765         }
1766
1767         public int getHits() {
1768                 return cache.hits;
1769         }
1770
1771         public int getMisses() {
1772                 return cache.misses;
1773         }
1774
1775         public int getSize() {
1776                 return cache.size;
1777         }
1778
1779         public Set<Long> getReferencedClusters() {
1780                 HashSet<Long> result = new HashSet<Long>();
1781                 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
1782                         Objects query = (Objects) entry.getQuery();
1783                         result.add(querySupport.getClusterId(query.r1()));
1784                 }
1785                 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
1786                         DirectPredicates query = (DirectPredicates) entry.getQuery();
1787                         result.add(querySupport.getClusterId(query.id));
1788                 }
1789                 for (CacheEntry entry : cache.valueQueryMap.values()) {
1790                         ValueQuery query = (ValueQuery) entry.getQuery();
1791                         result.add(querySupport.getClusterId(query.id));
1792                 }
1793                 return result;
1794         }
1795
1796         public void assertDone() {
1797         }
1798
1799         CacheCollectionResult allCaches(CacheCollectionResult result) {
1800                 
1801                 return cache.allCaches(result);
1802
1803         }
1804
1805         public void printDiagnostics() {
1806         }
1807
1808         public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
1809                 querySupport.requestCluster(graph, clusterId, runnable);
1810         }
1811
1812         public int clean() {
1813                 collector.collect(0, Integer.MAX_VALUE);
1814                 return cache.size;
1815         }
1816
1817         public void clean(final Collection<ExternalRead<?>> requests) {
1818                 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
1819                         Iterator<ExternalRead<?>> iterator = requests.iterator();
1820                         @Override
1821                         public CacheCollectionResult allCaches() {
1822                                 throw new UnsupportedOperationException();
1823                         }
1824                         @Override
1825                         public CacheEntryBase iterate(int level) {
1826                                 if(iterator.hasNext()) {
1827                                         ExternalRead<?> request = iterator.next();
1828                                         ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1829                                         if (entry != null) return entry;
1830                                         else return iterate(level);
1831                                 } else {
1832                                         iterator = requests.iterator();
1833                                         return null;
1834                                 }
1835                         }
1836                         @Override
1837                         public void remove() {
1838                                 throw new UnsupportedOperationException();
1839                         }
1840                         @Override
1841                         public void setLevel(CacheEntryBase entry, int level) {
1842                                 throw new UnsupportedOperationException();
1843                         }
1844                         @Override
1845                         public Collection<CacheEntry> getRootList() {
1846                                 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
1847                                 for (ExternalRead<?> request : requests) {
1848                                         ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1849                                         if (entry != null)
1850                                                 result.add(entry);
1851                                 }
1852                                 return result;
1853                         }
1854                         @Override
1855                         public int getCurrentSize() {
1856                                 return cache.size;
1857                         }
1858                         @Override
1859                         public int calculateCurrentSize() {
1860                                 // This tells the collector to attempt collecting everything.
1861                                 return Integer.MAX_VALUE;
1862                         }
1863                         @Override
1864                         public boolean start(boolean flush) {
1865                                 return true;
1866                         }
1867                 };
1868                 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
1869         }
1870
1871         public void scanPending() {
1872                 
1873                 cache.scanPending();
1874
1875         }
1876
1877         public ReadGraphImpl graphForVirtualRequest() {
1878                 return ReadGraphImpl.createAsync(this);
1879         }
1880
1881         
1882         private HashMap<Resource, Class<?>> builtinValues;
1883         
1884         public Class<?> getBuiltinValue(Resource r) {
1885                 if(builtinValues == null) initBuiltinValues();
1886                 return builtinValues.get(r);
1887         }
1888
1889         Exception callerException = null;
1890
1891         public interface AsyncBarrier {
1892                 public void inc(); 
1893                 public void dec();
1894                 //        public void inc(String debug); 
1895                 //        public void dec(String debug);
1896         }
1897
1898 //      final public QueryProcessor processor;
1899 //      final public QuerySupport support;
1900
1901         //    boolean disposed = false;
1902
1903         private void initBuiltinValues() {
1904
1905                 Layer0 b = getSession().peekService(Layer0.class);
1906                 if(b == null) return;
1907
1908                 builtinValues = new HashMap<Resource, Class<?>>();
1909
1910                 builtinValues.put(b.String, String.class);
1911                 builtinValues.put(b.Double, Double.class);
1912                 builtinValues.put(b.Float, Float.class);
1913                 builtinValues.put(b.Long, Long.class);
1914                 builtinValues.put(b.Integer, Integer.class);
1915                 builtinValues.put(b.Byte, Byte.class);
1916                 builtinValues.put(b.Boolean, Boolean.class);
1917
1918                 builtinValues.put(b.StringArray, String[].class);
1919                 builtinValues.put(b.DoubleArray, double[].class);
1920                 builtinValues.put(b.FloatArray, float[].class);
1921                 builtinValues.put(b.LongArray, long[].class);
1922                 builtinValues.put(b.IntegerArray, int[].class);
1923                 builtinValues.put(b.ByteArray, byte[].class);
1924                 builtinValues.put(b.BooleanArray, boolean[].class);
1925
1926         }
1927
1928 //      public ReadGraphSupportImpl(final QueryProcessor provider2) {
1929 //
1930 //              if (null == provider2) {
1931 //                      this.processor = null;
1932 //                      support = null;
1933 //                      return;
1934 //              }
1935 //              this.processor = provider2;
1936 //              support = provider2.getCore();
1937 //              initBuiltinValues();
1938 //
1939 //      }
1940
1941 //      final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
1942 //              return new ReadGraphSupportImpl(impl.processor);
1943 //      }
1944
1945         @Override
1946         final public Session getSession() {
1947                 return session;
1948         }
1949         
1950         final public ResourceSupport getResourceSupport() {
1951                 return resourceSupport;
1952         }
1953
1954         @Override
1955         final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
1956
1957         try {
1958
1959                 for(Resource predicate : getPredicates(impl, subject))
1960                     procedure.execute(impl, predicate);
1961
1962                 procedure.finished(impl);
1963
1964             } catch (Throwable e) {
1965                 procedure.exception(impl, e);
1966             }
1967
1968         }
1969
1970         @Override
1971         final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
1972                 
1973                 throw new UnsupportedOperationException();
1974
1975 //              assert(subject != null);
1976 //              assert(procedure != null);
1977 //
1978 //              final ListenerBase listener = getListenerBase(procedure);
1979 //
1980 //              try {
1981 //                      QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
1982 //
1983 //                              @Override
1984 //                              public void execute(ReadGraphImpl graph, int i) {
1985 //                                      try {
1986 //                                              procedure.execute(querySupport.getResource(i));
1987 //                                      } catch (Throwable t2) {
1988 //                                              Logger.defaultLogError(t2);
1989 //                                      }
1990 //                              }
1991 //
1992 //                              @Override
1993 //                              public void finished(ReadGraphImpl graph) {
1994 //                                      try {
1995 //                                              procedure.finished();
1996 //                                      } catch (Throwable t2) {
1997 //                                              Logger.defaultLogError(t2);
1998 //                                      }
1999 ////                            impl.state.barrier.dec();
2000 //                              }
2001 //
2002 //                              @Override
2003 //                              public void exception(ReadGraphImpl graph, Throwable t) {
2004 //                                      try {
2005 //                                              procedure.exception(t);
2006 //                                      } catch (Throwable t2) {
2007 //                                              Logger.defaultLogError(t2);
2008 //                                      }
2009 ////                            impl.state.barrier.dec();
2010 //                              }
2011 //
2012 //                      });
2013 //              } catch (DatabaseException e) {
2014 //                      Logger.defaultLogError(e);
2015 //              }
2016
2017         }
2018         
2019         @Override
2020         final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2021                 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null); 
2022         }
2023
2024         @Override
2025         final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2026                         final Resource predicate, final MultiProcedure<Statement> procedure) {
2027
2028                 assert(subject != null);
2029                 assert(predicate != null);
2030                 assert(procedure != null);
2031
2032                 final ListenerBase listener = getListenerBase(procedure);
2033
2034 //              impl.state.barrier.inc();
2035
2036                 try {
2037                         Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2038
2039                                 @Override
2040                                 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2041                                         try {
2042                                                 procedure.execute(querySupport.getStatement(s, p, o));
2043                                         } catch (Throwable t2) {
2044                                                 Logger.defaultLogError(t2);
2045                                         }
2046                                 }
2047
2048                                 @Override
2049                                 public void finished(ReadGraphImpl graph) {
2050                                         try {
2051                                                 procedure.finished();
2052                                         } catch (Throwable t2) {
2053                                                 Logger.defaultLogError(t2);
2054                                         }
2055 //                              impl.state.barrier.dec();
2056                                 }
2057
2058                                 @Override
2059                                 public void exception(ReadGraphImpl graph, Throwable t) {
2060                                         try {
2061                                                 procedure.exception(t);
2062                                         } catch (Throwable t2) {
2063                                                 Logger.defaultLogError(t2);
2064                                         }
2065 //                              impl.state.barrier.dec();
2066                                 }
2067
2068                         });
2069                 } catch (DatabaseException e) {
2070                         Logger.defaultLogError(e);
2071                 }
2072
2073         }
2074
2075         @Override
2076         final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2077                         final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2078
2079                 assert(subject != null);
2080                 assert(predicate != null);
2081                 assert(procedure != null);
2082
2083                 final ListenerBase listener = getListenerBase(procedure);
2084
2085                 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2086
2087                         boolean first = true;
2088
2089                         @Override
2090                         public void execute(ReadGraphImpl graph, int s, int p, int o) {
2091                                 try {
2092                                         if(first) {
2093                                                 procedure.execute(graph, querySupport.getStatement(s, p, o));
2094                                         } else {
2095                                                 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2096                                         }
2097                                 } catch (Throwable t2) {
2098                                         Logger.defaultLogError(t2);
2099                                 }
2100                         }
2101
2102                         @Override
2103                         public void finished(ReadGraphImpl graph) {
2104
2105                                 try {
2106                                         if(first) {
2107                                                 first = false;
2108                                                 procedure.finished(graph);
2109 //                                              impl.state.barrier.dec(this);
2110                                         } else {
2111                                                 procedure.finished(impl.newRestart(graph));
2112                                         }
2113                                 } catch (Throwable t2) {
2114                                         Logger.defaultLogError(t2);
2115                                 }
2116
2117                         }
2118
2119                         @Override
2120                         public void exception(ReadGraphImpl graph, Throwable t) {
2121
2122                                 try {
2123                                         if(first) {
2124                                                 first = false;
2125                                                 procedure.exception(graph, t);
2126 //                                              impl.state.barrier.dec(this);
2127                                         } else {
2128                                                 procedure.exception(impl.newRestart(graph), t);
2129                                         }
2130                                 } catch (Throwable t2) {
2131                                         Logger.defaultLogError(t2);
2132                                 }
2133
2134                         }
2135
2136                 };
2137
2138                 int sId = querySupport.getId(subject);
2139                 int pId = querySupport.getId(predicate);
2140
2141 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2142 //              else impl.state.barrier.inc(null, null);
2143
2144                 try {
2145                         Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2146                 } catch (DatabaseException e) {
2147                         Logger.defaultLogError(e);
2148                 }
2149
2150         }
2151
2152         @Override
2153         final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2154                         final Resource predicate, final StatementProcedure procedure) {
2155
2156                 assert(subject != null);
2157                 assert(predicate != null);
2158                 assert(procedure != null);
2159
2160                 final ListenerBase listener = getListenerBase(procedure);
2161
2162                 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2163
2164                         boolean first = true;
2165
2166                         @Override
2167                         public void execute(ReadGraphImpl graph, int s, int p, int o) {
2168                                 try {
2169                                         if(first) {
2170                                                 procedure.execute(graph, s, p, o);
2171                                         } else {
2172                                                 procedure.execute(impl.newRestart(graph), s, p, o);
2173                                         }
2174                                 } catch (Throwable t2) {
2175                                         Logger.defaultLogError(t2);
2176                                 }
2177                         }
2178
2179                         @Override
2180                         public void finished(ReadGraphImpl graph) {
2181
2182                                 try {
2183                                         if(first) {
2184                                                 first = false;
2185                                                 procedure.finished(graph);
2186 //                                              impl.state.barrier.dec(this);
2187                                         } else {
2188                                                 procedure.finished(impl.newRestart(graph));
2189                                         }
2190                                 } catch (Throwable t2) {
2191                                         Logger.defaultLogError(t2);
2192                                 }
2193
2194                         }
2195
2196                         @Override
2197                         public void exception(ReadGraphImpl graph, Throwable t) {
2198
2199                                 try {
2200                                         if(first) {
2201                                                 first = false;
2202                                                 procedure.exception(graph, t);
2203 //                                              impl.state.barrier.dec(this);
2204                                         } else {
2205                                                 procedure.exception(impl.newRestart(graph), t);
2206                                         }
2207                                 } catch (Throwable t2) {
2208                                         Logger.defaultLogError(t2);
2209                                 }
2210
2211                         }
2212
2213                 };
2214
2215                 int sId = querySupport.getId(subject);
2216                 int pId = querySupport.getId(predicate);
2217
2218 //              if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2219 //              else impl.state.barrier.inc(null, null);
2220
2221                 try {
2222                         Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2223                 } catch (DatabaseException e) {
2224                         Logger.defaultLogError(e);
2225                 }
2226
2227         }
2228         
2229         @Override
2230         final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2231
2232                 assert(subject != null);
2233                 assert(predicate != null);
2234                 assert(procedure != null);
2235
2236                 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2237
2238                         private Set<Statement> current = null;
2239                         private Set<Statement> run = new HashSet<Statement>();
2240
2241                         @Override
2242                         public void execute(AsyncReadGraph graph, Statement result) {
2243
2244                                 boolean found = false;
2245
2246                                 if(current != null) {
2247
2248                                         found = current.remove(result);
2249
2250                                 }
2251
2252                                 if(!found) procedure.add(graph, result);
2253
2254                                 run.add(result);
2255
2256                         }
2257
2258                         @Override
2259                         public void finished(AsyncReadGraph graph) {
2260
2261                                 if(current != null) { 
2262                                         for(Statement r : current) procedure.remove(graph, r);
2263                                 }
2264
2265                                 current = run;
2266
2267                                 run = new HashSet<Statement>();
2268
2269                         }
2270
2271                         @Override
2272                         public void exception(AsyncReadGraph graph, Throwable t) {
2273                                 procedure.exception(graph, t);
2274                         }
2275
2276                         @Override
2277                         public boolean isDisposed() {
2278                                 return procedure.isDisposed();
2279                         }
2280
2281                 });
2282
2283         }
2284
2285         @Override
2286         final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2287                         final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2288
2289                 assert(subject != null);
2290                 assert(predicate != null);
2291                 assert(procedure != null);
2292
2293                 final ListenerBase listener = getListenerBase(procedure);
2294
2295 //              impl.state.barrier.inc();
2296
2297                 try {
2298                         QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2299
2300                                 @Override
2301                                 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2302                                         try {
2303                                                 procedure.execute(graph, querySupport.getStatement(s, p, o));
2304                                         } catch (Throwable t2) {
2305                                                 Logger.defaultLogError(t2);
2306                                         }
2307                                 }
2308
2309                                 @Override
2310                                 public void finished(ReadGraphImpl graph) {
2311                                         try {
2312                                                 procedure.finished(graph);
2313                                         } catch (Throwable t2) {
2314                                                 Logger.defaultLogError(t2);
2315                                         }
2316 //                              impl.state.barrier.dec();
2317                                 }
2318
2319                                 @Override
2320                                 public void exception(ReadGraphImpl graph, Throwable t) {
2321                                         try {
2322                                                 procedure.exception(graph, t);
2323                                         } catch (Throwable t2) {
2324                                                 Logger.defaultLogError(t2);
2325                                         }
2326 //                              impl.state.barrier.dec();
2327                                 }
2328
2329                         });
2330                 } catch (DatabaseException e) {
2331                         Logger.defaultLogError(e);
2332                 }
2333
2334         }
2335
2336         private static ListenerBase getListenerBase(Object procedure) {
2337                 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2338                 else return null;
2339         }
2340
2341         @Override
2342         final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2343
2344                 assert(subject != null);
2345                 assert(predicate != null);
2346                 assert(procedure != null);
2347
2348                 final ListenerBase listener = getListenerBase(procedure);
2349
2350 //              impl.state.barrier.inc();
2351
2352                 try {
2353                         QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2354
2355                                 @Override
2356                                 public void execute(ReadGraphImpl graph, int i) {
2357                                         try {
2358                                                 procedure.execute(querySupport.getResource(i));
2359                                         } catch (Throwable t2) {
2360                                                 Logger.defaultLogError(t2);
2361                                         }
2362                                 }
2363
2364                                 @Override
2365                                 public void finished(ReadGraphImpl graph) {
2366                                         try {
2367                                                 procedure.finished();
2368                                         } catch (Throwable t2) {
2369                                                 Logger.defaultLogError(t2);
2370                                         }
2371 //                              impl.state.barrier.dec();
2372                                 }
2373
2374                                 @Override
2375                                 public void exception(ReadGraphImpl graph, Throwable t) {
2376                                         System.out.println("forEachObject exception " + t);
2377                                         try {
2378                                                 procedure.exception(t);
2379                                         } catch (Throwable t2) {
2380                                                 Logger.defaultLogError(t2);
2381                                         }
2382 //                              impl.state.barrier.dec();
2383                                 }
2384
2385                         });
2386                 } catch (DatabaseException e) {
2387                         Logger.defaultLogError(e);
2388                 }
2389
2390         }
2391
2392         @Override
2393         final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
2394
2395                 assert(subject != null);
2396                 assert(procedure != null);
2397
2398                 final ListenerBase listener = getListenerBase(procedure);
2399
2400                 int sId = querySupport.getId(subject);
2401
2402                 try {
2403                         QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
2404
2405                                 @Override
2406                                 public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
2407                                         procedure.execute(graph, result);
2408                                 }
2409
2410                                 @Override
2411                                 public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
2412                                         procedure.exception(graph, throwable);
2413                                 }
2414                                 
2415                         });
2416                 } catch (DatabaseException e) {
2417                         Logger.defaultLogError(e);
2418                 }
2419
2420         }
2421
2422         final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
2423
2424 //              assert(subject != null);
2425 //              assert(procedure != null);
2426 //
2427 //              final ListenerBase listener = getListenerBase(procedure);
2428 //
2429 //              org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2430                 
2431                 return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
2432
2433         }
2434
2435 //      @Override
2436 //      final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2437 //
2438 //              assert(subject != null);
2439 //              assert(procedure != null);
2440 //
2441 //              final ListenerBase listener = getListenerBase(procedure);
2442 //
2443 //              org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2444 //
2445 //      }
2446         
2447         private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2448
2449         @Override
2450         final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2451
2452                 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2453
2454                         private Resource single = null;
2455
2456                         @Override
2457                         public synchronized void execute(AsyncReadGraph graph, Resource result) {
2458                                 if(single == null) {
2459                                         single = result;
2460                                 } else {
2461                                         single = INVALID_RESOURCE;
2462                                 }
2463                         }
2464
2465                         @Override
2466                         public synchronized void finished(AsyncReadGraph graph) {
2467                                 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2468                                 else procedure.execute(graph, single);
2469                         }
2470
2471                         @Override
2472                         public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2473                                 procedure.exception(graph, throwable);
2474                         }
2475
2476                 });
2477
2478         }
2479
2480         final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2481                 
2482                 final int sId = querySupport.getId(subject);
2483                 final int pId = querySupport.getId(predicate);
2484
2485                 try {
2486                         QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2487                 } catch (DatabaseException e) {
2488                         Logger.defaultLogError(e);
2489                 }
2490                 
2491         }
2492         
2493         static class Runner2Procedure implements IntProcedure {
2494             
2495             public int single = 0;
2496             public Throwable t = null;
2497
2498             public void clear() {
2499                 single = 0;
2500                 t = null;
2501             }
2502             
2503         @Override
2504         public void execute(ReadGraphImpl graph, int i) {
2505             if(single == 0) single = i;
2506             else single = -1;
2507         }
2508
2509         @Override
2510         public void finished(ReadGraphImpl graph) {
2511             if(single == -1) single = 0;
2512         }
2513
2514         @Override
2515         public void exception(ReadGraphImpl graph, Throwable throwable) {
2516             single = 0;
2517             this.t = throwable;
2518         }
2519         
2520         public int get() throws DatabaseException {
2521             if(t != null) {
2522                 if(t instanceof DatabaseException) throw (DatabaseException)t;
2523                 else throw new DatabaseException(t);
2524             }
2525             return single;
2526         }
2527             
2528         }
2529         
2530         final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2531                 
2532                 final int sId = querySupport.getId(subject);
2533                 final int pId = querySupport.getId(predicate);
2534
2535                 Runner2Procedure proc = new Runner2Procedure();
2536                 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2537                 return proc.get();
2538             
2539         }
2540
2541         final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2542
2543                 assert(subject != null);
2544                 assert(predicate != null);
2545
2546                 final ListenerBase listener = getListenerBase(procedure);
2547
2548                 if(impl.parent != null || listener != null) {
2549
2550                         IntProcedure ip = new IntProcedure() {
2551
2552                                 AtomicBoolean first = new AtomicBoolean(true);
2553
2554                                 @Override
2555                                 public void execute(ReadGraphImpl graph, int i) {
2556                                         try {
2557                                                 if(first.get()) {
2558                                                         procedure.execute(impl, querySupport.getResource(i));
2559                                                 } else {
2560                                                         procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2561                                                 }
2562                                         } catch (Throwable t2) {
2563                                                 Logger.defaultLogError(t2);
2564                                         }
2565
2566                                 }
2567
2568                                 @Override
2569                                 public void finished(ReadGraphImpl graph) {
2570                                         try {
2571                                                 if(first.compareAndSet(true, false)) {
2572                                                         procedure.finished(impl);
2573 //                                                      impl.state.barrier.dec(this);
2574                                                 } else {
2575                                                         procedure.finished(impl.newRestart(graph));
2576                                                 }
2577                                         } catch (Throwable t2) {
2578                                                 Logger.defaultLogError(t2);
2579                                         }
2580                                 }
2581
2582                                 @Override
2583                                 public void exception(ReadGraphImpl graph, Throwable t) {
2584                                         try {
2585                                                 procedure.exception(graph, t);
2586                                         } catch (Throwable t2) {
2587                                                 Logger.defaultLogError(t2);
2588                                         }
2589 //                                      impl.state.barrier.dec(this);
2590                                 }
2591
2592                                 @Override
2593                                 public String toString() {
2594                                         return "forEachObject with " + procedure;
2595                                 }
2596
2597                         };
2598
2599 //                      if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2600 //                      else impl.state.barrier.inc(null, null);
2601
2602                         forEachObject(impl, subject, predicate, listener, ip);
2603
2604                 } else {
2605
2606                         IntProcedure ip = new IntProcedure() {
2607
2608                                 @Override
2609                                 public void execute(ReadGraphImpl graph, int i) {
2610                                         procedure.execute(graph, querySupport.getResource(i));
2611                                 }
2612
2613                                 @Override
2614                                 public void finished(ReadGraphImpl graph) {
2615                                         procedure.finished(graph);
2616                                 }
2617
2618                                 @Override
2619                                 public void exception(ReadGraphImpl graph, Throwable t) {
2620                                         procedure.exception(graph, t);
2621                                 }
2622
2623                                 @Override
2624                                 public String toString() {
2625                                         return "forEachObject with " + procedure;
2626                                 }
2627
2628                         };
2629
2630                         forEachObject(impl, subject, predicate, listener, ip);
2631
2632                 }
2633
2634         }
2635
2636         @Override
2637         final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2638
2639                 assert(subject != null);
2640                 assert(predicate != null);
2641                 assert(procedure != null);
2642
2643                 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2644
2645                         private Set<Resource> current = null;
2646                         private Set<Resource> run = new HashSet<Resource>();
2647
2648                         @Override
2649                         public void execute(AsyncReadGraph graph, Resource result) {
2650
2651                                 boolean found = false;
2652
2653                                 if(current != null) {
2654
2655                                         found = current.remove(result);
2656
2657                                 }
2658
2659                                 if(!found) procedure.add(graph, result);
2660
2661                                 run.add(result);
2662
2663                         }
2664
2665                         @Override
2666                         public void finished(AsyncReadGraph graph) {
2667
2668                                 if(current != null) { 
2669                                         for(Resource r : current) procedure.remove(graph, r);
2670                                 }
2671
2672                                 current = run;
2673
2674                                 run = new HashSet<Resource>();
2675
2676                         }
2677
2678                         @Override
2679                         public boolean isDisposed() {
2680                                 return procedure.isDisposed();
2681                         }
2682
2683                         @Override
2684                         public void exception(AsyncReadGraph graph, Throwable t) {
2685                                 procedure.exception(graph, t);
2686                         }
2687
2688                         @Override
2689                         public String toString() {
2690                                 return "forObjectSet " + procedure;
2691                         }
2692
2693                 });
2694
2695         }
2696
2697         @Override
2698         final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
2699
2700                 assert(subject != null);
2701                 assert(procedure != null);
2702
2703                 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
2704
2705                         private Set<Resource> current = null;
2706                         private Set<Resource> run = new HashSet<Resource>();
2707
2708                         @Override
2709                         public void execute(AsyncReadGraph graph, Resource result) {
2710
2711                                 boolean found = false;
2712
2713                                 if(current != null) {
2714
2715                                         found = current.remove(result);
2716
2717                                 }
2718
2719                                 if(!found) procedure.add(graph, result);
2720
2721                                 run.add(result);
2722
2723                         }
2724
2725                         @Override
2726                         public void finished(AsyncReadGraph graph) {
2727
2728                                 if(current != null) { 
2729                                         for(Resource r : current) procedure.remove(graph, r);
2730                                 }
2731
2732                                 current = run;
2733
2734                                 run = new HashSet<Resource>();
2735
2736                         }
2737
2738                         @Override
2739                         public boolean isDisposed() {
2740                                 return procedure.isDisposed();
2741                         }
2742
2743                         @Override
2744                         public void exception(AsyncReadGraph graph, Throwable t) {
2745                                 procedure.exception(graph, t);
2746                         }
2747
2748                         @Override
2749                         public String toString() {
2750                                 return "forPredicateSet " + procedure;
2751                         }
2752
2753                 });
2754
2755         }
2756
2757         @Override
2758         final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
2759
2760                 assert(subject != null);