First step to enable reading of cache when not writing
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.query;
13
14 import java.io.BufferedOutputStream;
15 import java.io.File;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.simantics.databoard.Bindings;
36 import org.simantics.db.AsyncReadGraph;
37 import org.simantics.db.DevelopmentKeys;
38 import org.simantics.db.DirectStatements;
39 import org.simantics.db.ReadGraph;
40 import org.simantics.db.RelationInfo;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
46 import org.simantics.db.common.utils.Logger;
47 import org.simantics.db.debug.ListenerReport;
48 import org.simantics.db.exception.DatabaseException;
49 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
50 import org.simantics.db.exception.NoInverseException;
51 import org.simantics.db.exception.ResourceNotFoundException;
52 import org.simantics.db.impl.ResourceImpl;
53 import org.simantics.db.impl.graph.BarrierTracing;
54 import org.simantics.db.impl.graph.ReadGraphImpl;
55 import org.simantics.db.impl.graph.ReadGraphSupport;
56 import org.simantics.db.impl.graph.WriteGraphImpl;
57 import org.simantics.db.impl.procedure.IntProcedureAdapter;
58 import org.simantics.db.impl.procedure.InternalProcedure;
59 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
60 import org.simantics.db.impl.support.ResourceSupport;
61 import org.simantics.db.procedure.AsyncMultiListener;
62 import org.simantics.db.procedure.AsyncMultiProcedure;
63 import org.simantics.db.procedure.AsyncProcedure;
64 import org.simantics.db.procedure.AsyncSetListener;
65 import org.simantics.db.procedure.ListenerBase;
66 import org.simantics.db.procedure.MultiProcedure;
67 import org.simantics.db.procedure.StatementProcedure;
68 import org.simantics.db.procedure.SyncMultiProcedure;
69 import org.simantics.db.request.AsyncMultiRead;
70 import org.simantics.db.request.ExternalRead;
71 import org.simantics.db.request.MultiRead;
72 import org.simantics.db.request.RequestFlags;
73 import org.simantics.layer0.Layer0;
74 import org.simantics.utils.DataContainer;
75 import org.simantics.utils.Development;
76 import org.simantics.utils.datastructures.Pair;
77 import org.simantics.utils.datastructures.collections.CollectionUtils;
78 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
79
80 import gnu.trove.procedure.TIntProcedure;
81 import gnu.trove.procedure.TLongProcedure;
82 import gnu.trove.procedure.TObjectProcedure;
83 import gnu.trove.set.hash.THashSet;
84 import gnu.trove.set.hash.TIntHashSet;
85
86 @SuppressWarnings({"rawtypes", "unchecked"})
87 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
88
89         public static int                                       indent                = 0;
90
91         
92         // Garbage collection
93         
94         public int                                              boundQueries          = 0;
95
96
97         final private int                                       functionalRelation;
98
99         final private int                                       superrelationOf;
100
101         final private int                                       instanceOf;
102
103         final private int                                       inverseOf;
104
105         final private int                                       asserts;
106
107         final private int                                       hasPredicate;
108
109         final private int                                       hasPredicateInverse;
110
111         final private int                                       hasObject;
112
113         final private int                                       inherits;
114
115         final private int                                       subrelationOf;
116
117         final private int                                       rootLibrary;
118
119         /**
120          * A cache for the root library resource. Initialized in
121          * {@link #getRootLibraryResource()}.
122          */
123         private volatile ResourceImpl                           rootLibraryResource;
124
125         final private int                                       library;
126
127         final private int                                       consistsOf;
128
129         final private int                                       hasName;
130
131         AtomicInteger                                       sleepers = new AtomicInteger(0);
132
133         private boolean                                         updating              = false;
134
135
136         private boolean                                         firingListeners       = false;
137
138         final public QueryCache                                 cache;
139         final public QuerySupport                               querySupport;
140         final public Session                                    session;
141         final public ResourceSupport                            resourceSupport;
142
143         private THashSet<ListenerEntry>                         scheduledListeners    = new THashSet<ListenerEntry>();
144
145         QueryThread[]                                   executors;
146
147 //      public ArrayList<SessionTask>[]                           queues;
148         
149         public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
150
151         enum ThreadState {
152
153                 INIT, RUN, SLEEP, DISPOSED
154
155         }
156
157         public ThreadState[]                                                                    threadStates;
158 //      public ReentrantLock[]                                                                  threadLocks;
159 //      public Condition[]                                                                          threadConditions;
160
161         //public ArrayList<SessionTask>[]                           ownTasks;
162
163         //public ArrayList<SessionTask>[]                           ownSyncTasks;
164
165         //ArrayList<SessionTask>[]                           delayQueues;
166         
167         final Object querySupportLock;
168         
169         public Long modificationCounter = 0L;
170
171         public void close() {
172         }
173
174         public SessionTask getOwnTask(ReadGraphImpl impl) {
175                 Set<ReadGraphImpl> ancestors = impl.ancestorSet();
176                 synchronized(querySupportLock) {
177                         int index = 0;
178                         while(index < freeScheduling.size()) {
179                                 SessionTask task = freeScheduling.get(index);
180                                 if(task.hasCommonParent(ancestors)) {
181                                         return freeScheduling.remove(index);
182                                 }
183                                 index++;
184                         }
185                 }
186                 return null;
187         }
188
189     public SessionTask getSubTask(ReadGraphImpl impl) {
190         Set<ReadGraphImpl> onlyThis = Collections.singleton(impl);
191         synchronized(querySupportLock) {
192             int index = 0;
193             while(index < freeScheduling.size()) {
194                 SessionTask task = freeScheduling.get(index);
195                 if(task.hasCommonParent(onlyThis)) {
196                     return freeScheduling.remove(index);
197                 }
198                 index++;
199             }
200         }
201         return null;
202     }
203
204         public boolean performPending(ReadGraphImpl graph) {
205                 SessionTask task = getOwnTask(graph);
206                 if(task != null) {
207                         task.run(QueryProcessor.thread.get());
208                         return true;
209                 } else {
210                         return false;
211                 }
212         }
213
214 //      final public void scheduleOwn(int caller, SessionTask request) {
215 //              ownTasks[caller].add(request);
216 //      }
217
218         final public void schedule(SessionTask request) {
219             
220                 //int performer = request.thread;
221
222 //              if(DebugPolicy.SCHEDULE)
223 //                      System.out.println("schedule " + request + " " + " -> " + performer);
224
225                 //assert(performer >= 0);
226
227                 assert(request != null);
228
229 //              if(caller == performer) {
230 //                      request.run(caller);
231 //              } else {
232                         
233 //                      if(performer == THREADS) {
234                                 
235                                 synchronized(querySupportLock) {
236
237                                         if(BarrierTracing.BOOKKEEPING) {
238                                                 Exception current = new Exception();
239                                                 Exception previous = BarrierTracing.tasks.put(request, current);
240                                                 if(previous != null) {
241                                                         previous.printStackTrace();
242                                                         current.printStackTrace();
243                                                 }
244                                         }
245                                     
246                                         freeScheduling.add(request);
247                                         
248                                         querySupportLock.notifyAll();
249
250                                 }
251
252                                 return;
253                                 
254 //                      }
255 //                      
256 //                      ReentrantLock queueLock = threadLocks[performer];
257 //                      queueLock.lock();
258 //                      queues[performer].add(request);
259 //                      // This thread could have been sleeping
260 //                      if(queues[performer].size() == 1) {
261 //                              //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
262 //                              threadConditions[performer].signalAll();
263 //                      }
264 //                      queueLock.unlock();
265 //              }
266
267         }
268
269
270         final int THREADS;
271         final public int  THREAD_MASK;
272
273         final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
274
275         public static abstract class SessionTask {
276
277                 public final ReadGraphImpl graph;
278                 private Set<ReadGraphImpl> ancestors;
279                 private int counter = 0;
280                 private Exception trace;
281
282                 public SessionTask(ReadGraphImpl graph) {
283                         this.graph = graph;
284                         if(graph != null) graph.asyncBarrier.inc();
285                 }
286
287                 public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
288                         if(graph == null) return false;
289                         if(ancestors == null) ancestors = graph.ancestorSet();
290                         return !Collections.disjoint(ancestors, otherAncestors);
291                 }
292
293                 public abstract void run0(int thread);
294
295                 public final void run(int thread) {
296                     if(counter++ > 0) {
297                         if(BarrierTracing.BOOKKEEPING) {
298                             trace.printStackTrace();
299                             new Exception().printStackTrace();
300                         }
301                         throw new IllegalStateException("Multiple invocations of SessionTask!");
302                     }
303                     if(BarrierTracing.BOOKKEEPING) {
304                         trace = new Exception();
305                     }
306                     run0(thread);
307                     if(graph != null) graph.asyncBarrier.dec();
308                 }
309
310                 @Override
311                 public String toString() {
312                         return "SessionTask[" + graph.parent + "]";
313                 }
314
315         }
316
317         public static abstract class SessionRead extends SessionTask {
318
319                 final public Semaphore notify;
320                 final public DataContainer<Throwable> throwable; 
321
322                 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
323                         super(null);
324                         this.throwable = throwable;
325                         this.notify = notify;
326                 }
327
328         }
329
330         long waitingTime = 0;
331
332         static int koss = 0;
333         static int koss2 = 0;
334
335         public boolean resume(ReadGraphImpl graph) {
336                 return executors[0].runSynchronized();
337         }
338         
339         //private WeakReference<GarbageTracker> garbageTracker;
340         
341         private class GarbageTracker    {
342                 
343                 @Override
344                 protected void finalize() throws Throwable {
345                         
346 //                      System.err.println("GarbageTracker");
347 //                      
348 //                      garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
349                         
350                         super.finalize();
351                         
352                 }
353                 
354         }
355
356         public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
357                         throws DatabaseException {
358
359                 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
360                 
361                 THREADS = threads;
362                 THREAD_MASK = threads - 1;
363
364                 querySupport = core;
365                 cache = new QueryCache(core, threads);
366                 session = querySupport.getSession();
367                 resourceSupport = querySupport.getSupport();
368                 querySupportLock = core.getLock();
369
370                 executors = new QueryThread[THREADS];
371 //              queues = new ArrayList[THREADS];
372 //              threadLocks = new ReentrantLock[THREADS];
373 //              threadConditions = new Condition[THREADS];
374                 threadStates = new ThreadState[THREADS];
375 //              ownTasks = new ArrayList[THREADS];
376 //              ownSyncTasks = new ArrayList[THREADS];
377 //              delayQueues = new ArrayList[THREADS * THREADS];
378
379                 //        freeSchedule = new AtomicInteger(0);
380
381 //              for (int i = 0; i < THREADS * THREADS; i++) {
382 //                      delayQueues[i] = new ArrayList<SessionTask>();
383 //              }
384
385                 for (int i = 0; i < THREADS; i++) {
386
387                         //            tasks[i] = new ArrayList<Runnable>();
388 //                      ownTasks[i] = new ArrayList<SessionTask>();
389 //                      ownSyncTasks[i] = new ArrayList<SessionTask>();
390 //                      queues[i] = new ArrayList<SessionTask>();
391 //                      threadLocks[i] = new ReentrantLock();
392 //                      threadConditions[i] = threadLocks[i].newCondition();
393                         //            limits[i] = false;
394                         threadStates[i] = ThreadState.INIT;
395
396                 }
397
398                 for (int i = 0; i < THREADS; i++) {
399
400                         final int index = i;
401
402                         executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
403
404                         threadSet.add(executors[i]);
405
406                 }
407
408                 // Now start threads
409                 for (int i = 0; i < THREADS; i++) {
410                         executors[i].start();
411                 }
412
413                 // Make sure that query threads are up and running
414                 while(sleepers.get() != THREADS) {
415                         try {
416                                 Thread.sleep(5);
417                         } catch (InterruptedException e) {
418                                 e.printStackTrace();
419                         }
420                 }
421
422                 rootLibrary = core.getBuiltin("http:/");
423                 boolean builtinsInstalled = rootLibrary != 0;
424
425                 if (builtinsInstalled) {
426                         functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
427                         assert (functionalRelation != 0);
428                 } else
429                         functionalRelation = 0;
430
431                 if (builtinsInstalled) {
432                         instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
433                         assert (instanceOf != 0);
434                 } else
435                         instanceOf = 0;
436
437                 if (builtinsInstalled) {
438                         inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
439                         assert (inverseOf != 0);
440                 } else
441                         inverseOf = 0;
442
443
444                 if (builtinsInstalled) {
445                         inherits = core.getBuiltin(Layer0.URIs.Inherits);
446                         assert (inherits != 0);
447                 } else
448                         inherits = 0;
449
450                 if (builtinsInstalled) {
451                         asserts = core.getBuiltin(Layer0.URIs.Asserts);
452                         assert (asserts != 0);
453                 } else
454                         asserts = 0;
455
456                 if (builtinsInstalled) {
457                         hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
458                         assert (hasPredicate != 0);
459                 } else
460                         hasPredicate = 0;
461
462                 if (builtinsInstalled) {
463                         hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
464                         assert (hasPredicateInverse != 0);
465                 } else
466                         hasPredicateInverse = 0;
467
468                 if (builtinsInstalled) {
469                         hasObject = core.getBuiltin(Layer0.URIs.HasObject);
470                         assert (hasObject != 0);
471                 } else
472                         hasObject = 0;
473
474                 if (builtinsInstalled) {
475                         subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
476                         assert (subrelationOf != 0);
477                 } else
478                         subrelationOf = 0;
479
480                 if (builtinsInstalled) {
481                         superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
482                         assert (superrelationOf != 0);
483                 } else
484                         superrelationOf = 0;
485
486                 if (builtinsInstalled) {
487                         library = core.getBuiltin(Layer0.URIs.Library);
488                         assert (library != 0);
489                 } else
490                         library = 0;
491
492                 if (builtinsInstalled) {
493                         consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
494                         assert (consistsOf != 0);
495                 } else
496                         consistsOf = 0;
497
498                 if (builtinsInstalled) {
499                         hasName = core.getBuiltin(Layer0.URIs.HasName);
500                         assert (hasName != 0);
501                 } else
502                         hasName = 0;
503
504         }
505
506         final public void releaseWrite(ReadGraphImpl graph) {
507                 performDirtyUpdates(graph);
508                 modificationCounter++;
509         }
510
511         final public int getId(final Resource r) {
512                 return querySupport.getId(r);
513         }
514
515         public QuerySupport getCore() {
516                 return querySupport;
517         }
518
519         public int getFunctionalRelation() {
520                 return functionalRelation;
521         }
522
523         public int getInherits() {
524                 return inherits;
525         }
526
527         public int getInstanceOf() {
528                 return instanceOf;
529         }
530
531         public int getInverseOf() {
532                 return inverseOf;
533         }
534
535         public int getSubrelationOf() {
536                 return subrelationOf;
537         }
538
539         public int getSuperrelationOf() {
540                 return superrelationOf;
541         }
542
543         public int getAsserts() {
544                 return asserts;
545         }
546
547         public int getHasPredicate() {
548                 return hasPredicate;
549         }
550
551         public int getHasPredicateInverse() {
552                 return hasPredicateInverse;
553         }
554
555         public int getHasObject() {
556                 return hasObject;
557         }
558
559         public int getRootLibrary() {
560                 return rootLibrary;
561         }
562
563         public Resource getRootLibraryResource() {
564                 if (rootLibraryResource == null) {
565                         // Synchronization is not needed here, it doesn't matter if multiple
566                         // threads simultaneously set rootLibraryResource once.
567                         int root = getRootLibrary();
568                         if (root == 0)
569                                 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
570                         this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
571                 }
572                 return rootLibraryResource;
573         }
574
575         public int getLibrary() {
576                 return library;
577         }
578
579         public int getConsistsOf() {
580                 return consistsOf;
581         }
582
583         public int getHasName() {
584                 return hasName;
585         }
586
587         public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
588
589                 try {
590                         
591                         QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
592
593                                 @Override
594                                 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
595
596                                         if (result != null && result != 0) {
597                                                 procedure.execute(graph, result);
598                                                 return;
599                                         }
600
601                                         // Fall back to using the fixed builtins.
602 //                                      result = querySupport.getBuiltin(id);
603 //                                      if (result != 0) {
604 //                                              procedure.execute(graph, result);
605 //                                              return;
606 //                                      } 
607
608 //                                      try {
609 //                                              result = querySupport.getRandomAccessReference(id);
610 //                                      } catch (ResourceNotFoundException e) {
611 //                                              procedure.exception(graph, e);
612 //                                              return;
613 //                                      }
614
615                                         if (result != 0) {
616                                                 procedure.execute(graph, result);
617                                         } else {
618                                                 procedure.exception(graph, new ResourceNotFoundException(id));
619                                         }
620
621                                 }
622
623                                 @Override
624                                 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
625                                         procedure.exception(graph, t);
626                                 }
627
628                         });
629                 } catch (DatabaseException e) {
630                     
631                     try {
632                         
633                 procedure.exception(graph, e);
634                 
635             } catch (DatabaseException e1) {
636                 
637                 Logger.defaultLogError(e1);
638                 
639             }
640                     
641                 }
642
643         }
644
645         public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
646
647                 Integer result = querySupport.getBuiltin(id);
648                 if (result != 0) {
649                         procedure.execute(graph, result);
650                 } else {
651                         procedure.exception(graph, new ResourceNotFoundException(id));
652                 }
653
654         }
655
656         final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
657
658                 try {
659                         QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
660                 } catch (DatabaseException e) {
661                         throw new IllegalStateException(e);
662                 }
663
664         }
665
666         public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
667
668                 
669                 try {
670                         QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
671                 } catch (DatabaseException e) {
672                         throw new IllegalStateException(e);
673                 }
674
675         }
676
677         final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
678                 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
679         }
680
681 //    @Override
682 //      public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
683 //      
684 //      return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
685 //
686 //      }
687
688         public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
689
690                 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
691
692         }
693
694         public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
695
696                 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
697
698         }
699
700         boolean isBound(ExternalReadEntry<?> entry) {
701                 if(entry.hasParents()) return true;
702                 else if(hasListener(entry)) return true;
703                 else return false;
704         }
705
706         synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
707
708                 if (parent != null && !inferred) {
709                         try {
710                                 if(!child.isImmutable(graph))
711                                         child.addParent(parent);
712                         } catch (DatabaseException e) {
713                                 Logger.defaultLogError(e);
714                         }
715                         if (Development.DEVELOPMENT) {
716                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
717                                         System.out.println(child + " -> " + parent);
718                                 }
719                         }
720                 }
721
722                 if (listener != null) {
723                         return registerListener(child, listener, procedure);
724                 } else {
725                         return null;
726                 }
727
728         }
729
730         
731         static class Dummy implements InternalProcedure<Object>, IntProcedure {
732
733                 @Override
734                 public void execute(ReadGraphImpl graph, int i) {
735                 }
736
737                 @Override
738                 public void finished(ReadGraphImpl graph) {
739                 }
740
741                 @Override
742                 public void execute(ReadGraphImpl graph, Object result) {
743                 }
744
745                 @Override
746                 public void exception(ReadGraphImpl graph, Throwable throwable) {
747                 }
748                 
749         }
750         
751         private static final Dummy dummy = new Dummy();
752
753         /*
754     public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
755
756         if (DebugPolicy.PERFORM)
757             System.out.println("PE[ " + (query.hashCode() &  THREAD_MASK) + "] " + query);
758
759         assert (!dirty);
760         assert (!collecting);
761
762         assert(query.assertNotDiscarded());
763
764         registerDependencies(graph, query, parent, listener, procedure, false);
765
766         // FRESH, REFUTED, EXCEPTED go here 
767         if (!query.isReady()) {
768
769             size++;
770             misses++;
771
772             query.computeForEach(graph, this, (Procedure)dummy, true);
773             return query.get(graph, this, null);
774
775         } else {
776
777             hits++;
778
779             return query.get(graph, this, procedure);
780
781         }
782
783     }
784         */
785         
786
787         interface QueryCollectorSupport {
788                 public CacheCollectionResult allCaches();
789                 public Collection<CacheEntry> getRootList();
790                 public int getCurrentSize();
791                 public int calculateCurrentSize();
792                 public CacheEntryBase iterate(int level);
793                 public void remove();
794                 public void setLevel(CacheEntryBase entry, int level);
795                 public boolean start(boolean flush);
796         }
797
798         interface QueryCollector {
799
800                 public void collect(int youngTarget, int allowedTimeInMs);
801
802         }
803
804         class QueryCollectorSupportImpl implements QueryCollectorSupport {
805
806                 private static final boolean DEBUG = false;
807                 private static final double ITERATION_RATIO = 0.2;
808                 
809                 private CacheCollectionResult iteration = new CacheCollectionResult();
810                 private boolean fresh = true;
811                 private boolean needDataInStart = true;
812                 
813                 QueryCollectorSupportImpl() {
814                         iteration.restart();
815                 }
816
817                 public CacheCollectionResult allCaches() {
818                         CacheCollectionResult result = new CacheCollectionResult();
819                         QueryProcessor.this.allCaches(result);
820                         result.restart();
821                         return result;
822                 }
823                 
824                 public boolean start(boolean flush) {
825                         // We need new data from query maps
826                         fresh = true;
827                         if(needDataInStart || flush) {
828                                 // Last run ended after processing all queries => refresh data
829                                 restart(flush ? 0.0 : ITERATION_RATIO);
830                         } else {
831                                 // continue with previous big data
832                         }
833                         // Notify caller about iteration situation
834                         return iteration.isAtStart();
835                 }
836
837                 private void restart(double targetRatio) {
838                         
839                         needDataInStart = true;
840
841                         long start = System.nanoTime();
842                         if(fresh) {
843                                 
844                                 // We need new data from query maps
845                                 
846                                 int iterationSize = iteration.size()+1;
847                                 int diff = calculateCurrentSize()-iterationSize;
848                                 
849                                 double ratio = (double)diff / (double)iterationSize;
850                                 boolean dirty = Math.abs(ratio) >= targetRatio;
851                                 
852                                 if(dirty) {
853                                         iteration = allCaches();
854                                         if(DEBUG) {
855                                                 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
856                                                 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
857                                                         System.err.print(" " + iteration.levels[i].size());
858                                                 System.err.println("");
859                                         }
860                                 } else {
861                                         iteration.restart();
862                                 }
863                                 
864                                 fresh = false;
865                                 needDataInStart = false;
866                         } else {
867                                 // We are returning here within the same GC round - reuse the cache table
868                                 iteration.restart();
869                         }
870                         
871                         return;
872                         
873                 }
874                 
875                 @Override
876                 public CacheEntryBase iterate(int level) {
877                         
878                         CacheEntryBase entry = iteration.next(level);
879                         if(entry == null) {
880                                 restart(ITERATION_RATIO);
881                                 return null;
882                         }
883                         
884                         while(entry != null && entry.isDiscarded()) {
885                                 entry = iteration.next(level);
886                         }
887                         
888                         return entry;
889                         
890                 }
891                 
892                 @Override
893                 public void remove() {
894                         iteration.remove();
895                 }
896                 
897                 @Override
898                 public void setLevel(CacheEntryBase entry, int level) {
899                         iteration.setLevel(entry, level);
900                 }
901
902                 public Collection<CacheEntry> getRootList() {
903                         return cache.getRootList();
904                 }
905
906                 @Override
907                 public int calculateCurrentSize() {
908                         return cache.calculateCurrentSize();
909                 }
910
911                 @Override
912                 public int getCurrentSize() {
913                         return cache.size;
914                 }
915
916         }
917         //    final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
918
919         private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
920         private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
921
922     public int querySize() {
923         return cache.size;
924     }
925
926         public void gc(int youngTarget, int allowedTimeInMs) {
927
928                 collector.collect(youngTarget, allowedTimeInMs);
929
930         }
931
932         public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
933
934                 assert (entry != null);
935
936                 if (base.isDisposed())
937                         return null;
938
939                 return addListener(entry, base, procedure);
940
941         }
942
943         private void primeListenerEntry(final ListenerEntry entry, final Object result) {
944                 entry.setLastKnown(result);
945         }
946
947         private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
948
949                 assert (entry != null);
950                 assert (procedure != null);
951
952                 ArrayList<ListenerEntry> list = cache.listeners.get(entry);
953                 if (list == null) {
954                         list = new ArrayList<ListenerEntry>(1);
955                         cache.listeners.put(entry, list);
956                 }
957
958                 ListenerEntry result = new ListenerEntry(entry, base, procedure);
959                 int currentIndex = list.indexOf(result);
960                 // There was already a listener
961                 if(currentIndex > -1) {
962                         ListenerEntry current = list.get(currentIndex);
963                         if(!current.base.isDisposed()) return null;
964                         list.set(currentIndex, result);
965                 } else {
966                         list.add(result);
967                 }
968
969                 if (Development.DEVELOPMENT) {
970                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
971                                 new Exception().printStackTrace();
972                                 System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
973                         }
974                 }
975
976                 return result;
977
978         }
979
980         private void scheduleListener(ListenerEntry entry) {
981                 assert (entry != null);
982                 if (Development.DEVELOPMENT) {
983                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
984                                 System.err.println("Scheduled " + entry.procedure);
985                         }
986                 }
987                 scheduledListeners.add(entry);
988         }
989
990         private void removeListener(ListenerEntry entry) {
991                 assert (entry != null);
992                 ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
993                 if(list == null) return;
994                 boolean success = list.remove(entry);
995                 assert (success);
996                 if (list.isEmpty())
997                         cache.listeners.remove(entry.entry);
998         }
999
1000         private boolean hasListener(CacheEntry entry) {
1001                 if(cache.listeners.get(entry) != null) return true;
1002                 return false;
1003         }
1004
1005         boolean hasListenerAfterDisposing(CacheEntry entry) {
1006                 if(cache.listeners.get(entry) != null) {
1007                         ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1008                         ArrayList<ListenerEntry> list = null;
1009                         for (ListenerEntry e : entries) {
1010                                 if (e.base.isDisposed()) {
1011                                         if(list == null) list = new ArrayList<ListenerEntry>();
1012                                         list.add(e);
1013                                 }
1014                         }
1015                         if(list != null) {
1016                                 for (ListenerEntry e : list) {
1017                                         entries.remove(e);
1018                                 }
1019                         }
1020                         if (entries.isEmpty()) {
1021                                 cache.listeners.remove(entry);
1022                                 return false;
1023                         }
1024                         return true;
1025                 }
1026                 return false;
1027         }
1028
1029         List<ListenerEntry> getListenerEntries(CacheEntry entry) {
1030                 hasListenerAfterDisposing(entry);
1031                 if(cache.listeners.get(entry) != null)
1032                         return cache.listeners.get(entry);
1033                 else 
1034                         return Collections.emptyList();
1035         }
1036
1037         void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
1038
1039                 if(!workarea.containsKey(entry)) {
1040
1041                         HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
1042                         for(ListenerEntry e : getListenerEntries(entry))
1043                                 ls.add(e.base);
1044
1045                         workarea.put(entry, ls);
1046
1047                         for(CacheEntry parent : entry.getParents(this)) {
1048                                 processListenerReport(parent, workarea);
1049                                 ls.addAll(workarea.get(parent));
1050                         }
1051
1052                 }
1053
1054         }
1055
1056         public synchronized ListenerReport getListenerReport() throws IOException {
1057
1058                 class ListenerReportImpl implements ListenerReport {
1059
1060                         Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
1061
1062                         @Override
1063                         public void print(PrintStream b) {
1064                                 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
1065                                 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
1066                                         for(ListenerBase l : e.getValue()) {
1067                                                 Integer i = hist.get(l);
1068                                                 hist.put(l, i != null ? i-1 : -1);
1069                                         }
1070                                 }
1071
1072                                 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1073                                         b.print("" + -p.second + " " + p.first + "\n");
1074                                 }
1075
1076                                 b.flush();
1077                         }
1078
1079                 }
1080
1081                 ListenerReportImpl result = new ListenerReportImpl();
1082
1083                 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1084                 for(CacheEntryBase entry : all) {
1085                         hasListenerAfterDisposing(entry);
1086                 }
1087                 for(CacheEntryBase entry : all) {
1088                         processListenerReport(entry, result.workarea);
1089                 }
1090
1091                 return result;
1092
1093         }
1094
1095         public synchronized String reportListeners(File file) throws IOException {
1096
1097                 if (!isAlive())
1098                         return "Disposed!";
1099
1100                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1101                 ListenerReport report = getListenerReport();
1102                 report.print(b);
1103
1104                 return "Done reporting listeners.";
1105
1106         }
1107
1108         void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1109
1110                 if(entry.isDiscarded()) return;
1111                 if(workarea.containsKey(entry)) return;
1112                 
1113                 Iterable<CacheEntry> parents = entry.getParents(this);
1114                 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1115                 for(CacheEntry e : parents) {
1116                         if(e.isDiscarded()) continue;
1117                         ps.add(e);
1118                         processParentReport(e, workarea);
1119                 }
1120                 workarea.put(entry, ps);
1121
1122         }
1123
1124         public synchronized String reportQueryActivity(File file) throws IOException {
1125                 
1126                 System.err.println("reportQueries " + file.getAbsolutePath());
1127
1128                 if (!isAlive())
1129                         return "Disposed!";
1130
1131                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1132
1133                 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
1134                 Collections.reverse(entries);
1135                 
1136                 for(Pair<String,Integer> entry : entries) {
1137                         b.println(entry.first + ": " + entry.second);
1138                 }
1139
1140                 b.close();
1141                 
1142                 Development.histogram.clear();
1143
1144                 return "OK";
1145
1146         }
1147         
1148         public synchronized String reportQueries(File file) throws IOException {
1149
1150                 System.err.println("reportQueries " + file.getAbsolutePath());
1151
1152                 if (!isAlive())
1153                         return "Disposed!";
1154
1155                 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1156
1157                 long start = System.nanoTime();
1158
1159 //              ArrayList<CacheEntry> all = ;
1160                 
1161                 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
1162                 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1163                 for(CacheEntryBase entry : caches) {
1164                         processParentReport(entry, workarea);
1165                 }
1166                 
1167                 //        for(CacheEntry e : all) System.err.println("entry: " + e);
1168
1169                 long duration = System.nanoTime() - start;
1170                 System.err.println("Query root set in " + 1e-9*duration + "s.");
1171
1172                 start = System.nanoTime();
1173
1174                 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>(); 
1175
1176                 int listeners = 0;
1177
1178                 for(CacheEntry entry : workarea.keySet()) {
1179                         boolean listener = hasListenerAfterDisposing(entry);
1180                         boolean hasParents = entry.getParents(this).iterator().hasNext();
1181                         if(listener) {
1182                                 // Bound
1183                                 flagMap.put(entry, 0);
1184                         } else if (!hasParents) {
1185                                 // Unbound
1186                                 flagMap.put(entry, 1);
1187                         } else {
1188                                 // Unknown
1189                                 flagMap.put(entry, 2);
1190                         }
1191                         //              // Write leaf bit
1192                         //              entry.flags |= 4;
1193                 }
1194
1195                 boolean done = true;
1196                 int loops = 0;
1197
1198                 do {
1199
1200                         done = true;
1201
1202                         long start2 = System.nanoTime();
1203
1204                         int boundCounter = 0;
1205                         int unboundCounter = 0;
1206                         int unknownCounter = 0;
1207
1208                         for(CacheEntry<?> entry : workarea.keySet()) {
1209
1210                                 //System.err.println("process " + entry);
1211
1212                                 int flags = flagMap.get(entry);
1213                                 int bindStatus = flags & 3;
1214
1215                                 if(bindStatus == 0) boundCounter++;
1216                                 else if(bindStatus == 1) unboundCounter++;
1217                                 else if(bindStatus == 2) unknownCounter++;
1218
1219                                 if(bindStatus < 2) continue;
1220
1221                                 int newStatus = 1;
1222                                 for(CacheEntry parent : entry.getParents(this)) {
1223
1224                                         if(parent.isDiscarded()) flagMap.put(parent, 1);
1225
1226                                         int flags2 = flagMap.get(parent);
1227                                         int bindStatus2 = flags2 & 3;
1228                                         // Parent is bound => child is bound
1229                                         if(bindStatus2 == 0) {
1230                                                 newStatus = 0;
1231                                                 break;
1232                                         }
1233                                         // Parent is unknown => child is unknown
1234                                         else if (bindStatus2 == 2) {
1235                                                 newStatus = 2;
1236                                                 done = false;
1237                                                 break;
1238                                         }
1239                                 }
1240
1241                                 flagMap.put(entry, newStatus);
1242
1243                         }
1244
1245                         duration = System.nanoTime() - start2;
1246                         System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1247                         b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1248
1249                 } while(!done && loops++ < 20);
1250
1251                 if(loops >= 20) {
1252
1253                         for(CacheEntry entry : workarea.keySet()) {
1254
1255                                 int bindStatus = flagMap.get(entry);
1256                                 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1257
1258                         }
1259
1260                 }
1261
1262                 duration = System.nanoTime() - start;
1263                 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1264
1265                 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1266
1267                 for(CacheEntry entry : workarea.keySet()) {
1268                         Class<?> clazz = entry.getClass();
1269                         if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass(); 
1270                         else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass(); 
1271                         else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass(); 
1272                         else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass(); 
1273                         else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass(); 
1274                         Integer c = counts.get(clazz);
1275                         if(c == null) counts.put(clazz, -1);
1276                         else counts.put(clazz, c-1);
1277                 }
1278
1279                 b.print("// Simantics DB client query report file\n");
1280                 b.print("// This file contains the following information\n");
1281                 b.print("// -The amount of cached query instances per query class\n");
1282                 b.print("// -The sizes of retained child sets\n");
1283                 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1284                 b.print("//  -Followed by status, where\n");
1285                 b.print("//   -0=bound\n");
1286                 b.print("//   -1=free\n");
1287                 b.print("//   -2=unknown\n");
1288                 b.print("//   -L=has listener\n");
1289                 b.print("// -List of children for each query (search for 'C <query name>')\n");
1290
1291                 b.print("----------------------------------------\n");
1292
1293                 b.print("// Queries by class\n");
1294                 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1295                         b.print(-p.second + " " + p.first.getName() + "\n");
1296                 }
1297
1298                 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1299                 for(CacheEntry e : workarea.keySet())
1300                         hist.put(e, -1);
1301                 
1302                 boolean changed = true;
1303                 int iter = 0;
1304                 while(changed && iter++<50) {
1305                         
1306                         changed = false;
1307                         
1308                         Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1309                         for(CacheEntry e : workarea.keySet())
1310                                 newHist.put(e, -1);
1311
1312                         for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1313                                 Integer c = hist.get(e.getKey());
1314                                 for(CacheEntry p : e.getValue()) {
1315                                         Integer i = newHist.get(p);
1316                                         newHist.put(p, i+c);
1317                                 }
1318                         }
1319                         for(CacheEntry e : workarea.keySet()) {
1320                                 Integer value = newHist.get(e);
1321                                 Integer old = hist.get(e);
1322                                 if(!value.equals(old)) {
1323                                         hist.put(e, value);
1324 //                                      System.err.println("hist " + e + ": " + old + " => " + value);
1325                                         changed = true;
1326                                 }
1327                         }
1328                         
1329                         System.err.println("Retained set iteration " + iter);
1330
1331                 }
1332
1333                 b.print("// Queries by retained set\n");
1334                 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1335                         b.print("" + -p.second + " " + p.first + "\n");
1336                 }
1337
1338                 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1339
1340                 b.print("// Entry parent listing\n");
1341                 for(CacheEntry entry : workarea.keySet()) {
1342                         int status = flagMap.get(entry);
1343                         boolean hasListener = hasListenerAfterDisposing(entry);
1344                         b.print("Q " + entry.toString());
1345                         if(hasListener) {
1346                                 b.print(" (L" + status + ")");
1347                                 listeners++;
1348                         } else {
1349                                 b.print(" (" + status + ")");
1350                         }
1351                         b.print("\n");
1352                         for(CacheEntry parent : workarea.get(entry)) {
1353                                 Collection<CacheEntry> inv = inverse.get(parent);
1354                                 if(inv == null) {
1355                                         inv = new ArrayList<CacheEntry>();
1356                                         inverse.put(parent, inv);
1357                                 }
1358                                 inv.add(entry);
1359                                 b.print("  " + parent.toString());
1360                                 b.print("\n");
1361                         }
1362                 }
1363
1364                 b.print("// Entry child listing\n");
1365                 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1366                         b.print("C " + entry.getKey().toString());
1367                         b.print("\n");
1368                         for(CacheEntry child : entry.getValue()) {
1369                                 Integer h = hist.get(child);
1370                                 if(h != null) {
1371                                         b.print("  " + h);
1372                                 } else {
1373                                         b.print("  <no children>");
1374                                 }
1375                                 b.print("  " + child.toString());
1376                                 b.print("\n");
1377                         }
1378                 }
1379
1380                 b.print("#queries: " + workarea.keySet().size() + "\n");
1381                 b.print("#listeners: " + listeners + "\n");
1382
1383                 b.close();
1384
1385                 return "Dumped " + workarea.keySet().size() + " queries.";
1386
1387         }
1388
1389         class UpdateEntry {
1390
1391                 public CacheEntry caller;
1392
1393                 public CacheEntry entry;
1394
1395                 public int         indent;
1396
1397                 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
1398                         this.caller = caller;
1399                         this.entry = entry;
1400                         this.indent = indent;
1401                 }
1402
1403         };
1404
1405         boolean removeQuery(CacheEntry entry) {
1406
1407                 // This entry has been removed before. No need to do anything here.
1408                 if(entry.isDiscarded()) return false;
1409
1410                 assert (!entry.isDiscarded());
1411
1412                 Query query = entry.getQuery();
1413
1414                 query.removeEntry(this);
1415
1416                 cache.updates++;
1417                 cache.size--;
1418
1419                 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1420                         boundQueries--;
1421                 
1422                 entry.discard();
1423
1424                 return true;
1425
1426         }
1427
1428         /**
1429          * 
1430          * @return true if this entry is being listened
1431          */
1432         private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1433
1434                 assert (e != null);
1435
1436                 CacheEntry entry = e.entry;
1437
1438                 /*
1439                  * If the dependency graph forms a DAG, some entries are inserted in the
1440                  * todo list many times. They only need to be processed once though.
1441                  */
1442                 if (entry.isDiscarded()) {
1443                         if (Development.DEVELOPMENT) {
1444                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1445                                         System.err.print("D");
1446                                         for (int i = 0; i < e.indent; i++)
1447                                                 System.err.print(" ");
1448                                         System.err.println(entry.getQuery());
1449                                 }
1450                         }
1451 //                      System.err.println(" => DISCARDED");
1452                         return false;
1453                 }
1454
1455 //              if (entry.isRefuted()) {
1456 //                      if (Development.DEVELOPMENT) {
1457 //                              if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1458 //                                      System.err.print("R");
1459 //                                      for (int i = 0; i < e.indent; i++)
1460 //                                              System.err.print(" ");
1461 //                                      System.err.println(entry.getQuery());
1462 //                              }
1463 //                      }
1464 //                      return false;
1465 //              }
1466
1467                 if (entry.isExcepted()) {
1468                         if (Development.DEVELOPMENT) {
1469                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1470                                         System.err.print("E");
1471                                 }
1472                         }
1473                 }
1474
1475                 if (entry.isPending()) {
1476                         if (Development.DEVELOPMENT) {
1477                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1478                                         System.err.print("P");
1479                                 }
1480                         }
1481                 }
1482
1483                 cache.updates++;
1484
1485                 if (Development.DEVELOPMENT) {
1486                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1487                                 System.err.print("U ");
1488                                 for (int i = 0; i < e.indent; i++)
1489                                         System.err.print(" ");
1490                                 System.err.print(entry.getQuery());
1491                         }
1492                 }
1493
1494                 Query query = entry.getQuery();
1495                 int type = query.type();
1496
1497                 boolean hasListener = hasListener(entry); 
1498
1499                 if (Development.DEVELOPMENT) {
1500                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1501                                 if(hasListener(entry)) {
1502                                         System.err.println(" (L)");
1503                                 } else {
1504                                         System.err.println("");
1505                                 }
1506                         }
1507                 }
1508
1509                 if(entry.isPending() || entry.isExcepted()) {
1510
1511                         // If updated
1512                         if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1513
1514                                 immediates.put(entry, entry);
1515
1516                         } else {
1517
1518                                 if(hasListener) {
1519                                         entry.refute();
1520                                 } else {
1521                                         removeQuery(entry);
1522                                 }
1523
1524                         }
1525
1526                 } else {
1527
1528                         // If updated
1529                         if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1530
1531                                 immediates.put(entry, entry);
1532
1533                         } else {
1534
1535                                 if(hasListener) {
1536                                         entry.refute();
1537                                 } else {
1538                                         removeQuery(entry);
1539                                 }
1540
1541                         }
1542
1543                 }
1544
1545 //              System.err.println(" => FOO " + type);
1546
1547                 if (hasListener) {
1548                         ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1549                         if(entries != null) {
1550                                 for (ListenerEntry le : entries) {
1551                                         scheduleListener(le);
1552                                 }
1553                         }
1554                 }
1555
1556                 // If invalid, update parents
1557                 if (type == RequestFlags.INVALIDATE) {
1558                         updateParents(e.indent, entry, todo);
1559                 }
1560
1561                 return hasListener;
1562
1563         }
1564
1565         private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
1566
1567                 Iterable<CacheEntry> oldParents = entry.getParents(this);
1568                 for (CacheEntry parent : oldParents) {
1569 //                      System.err.println("updateParents " + entry + " => " + parent);
1570                         if(!parent.isDiscarded())
1571                                 todo.push(new UpdateEntry(entry, parent, indent + 2));
1572                 }
1573
1574         }
1575
1576         private boolean pruneListener(ListenerEntry entry) {
1577                 if (entry.base.isDisposed()) {
1578                         removeListener(entry);
1579                         return true;
1580                 } else {
1581                         return false;
1582                 }
1583         }
1584
1585         /**
1586          * @param av1 an array (guaranteed)
1587          * @param av2 any object
1588          * @return <code>true</code> if the two arrays are equal
1589          */
1590         private final boolean arrayEquals(Object av1, Object av2) {
1591                 if (av2 == null)
1592                         return false;
1593                 Class<?> c1 = av1.getClass().getComponentType();
1594                 Class<?> c2 = av2.getClass().getComponentType();
1595                 if (c2 == null || !c1.equals(c2))
1596                         return false;
1597                 boolean p1 = c1.isPrimitive();
1598                 boolean p2 = c2.isPrimitive();
1599                 if (p1 != p2)
1600                         return false;
1601                 if (!p1)
1602                         return Arrays.equals((Object[]) av1, (Object[]) av2);
1603                 if (boolean.class.equals(c1))
1604                         return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1605                 else if (byte.class.equals(c1))
1606                         return Arrays.equals((byte[]) av1, (byte[]) av2);
1607                 else if (int.class.equals(c1))
1608                         return Arrays.equals((int[]) av1, (int[]) av2);
1609                 else if (long.class.equals(c1))
1610                         return Arrays.equals((long[]) av1, (long[]) av2);
1611                 else if (float.class.equals(c1))
1612                         return Arrays.equals((float[]) av1, (float[]) av2);
1613                 else if (double.class.equals(c1))
1614                         return Arrays.equals((double[]) av1, (double[]) av2);
1615                 throw new RuntimeException("??? Contact application querySupport.");
1616         }
1617
1618
1619
1620         final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1621
1622                 try {
1623
1624                         Query query = entry.getQuery();
1625
1626                         if (Development.DEVELOPMENT) {
1627                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_RECOMPUTE, Bindings.BOOLEAN)) {
1628                                         System.err.println("R " + query);
1629                                 }
1630                         }
1631
1632                         entry.prepareRecompute(querySupport);
1633                         
1634                         ReadGraphImpl parentGraph = graph.forRecompute(entry);
1635
1636                         query.recompute(parentGraph);
1637
1638                         if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1639
1640                         Object newValue = entry.getResult();
1641
1642                         if (ListenerEntry.NO_VALUE == oldValue) {
1643                                 if (Development.DEVELOPMENT) {
1644                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1645                                                 System.out.println("C " + query);
1646                                                 System.out.println("- " + oldValue);
1647                                                 System.out.println("- " + newValue);
1648                                         }
1649                                 }
1650                                 return newValue;
1651                         }
1652
1653                         boolean changed = false;
1654
1655                         if (newValue != null) {
1656                                 if (newValue.getClass().isArray()) {
1657                                         changed = !arrayEquals(newValue, oldValue);
1658                                 } else {
1659                                         changed = !newValue.equals(oldValue);
1660                                 }
1661                         } else
1662                                 changed = (oldValue != null);
1663
1664                         if (Development.DEVELOPMENT) {
1665                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1666                                         System.err.println("C " + query);
1667                                         System.err.println("- " + oldValue);
1668                                         System.err.println("- " + newValue);
1669                                 }
1670                         }
1671
1672                         return changed ? newValue : ListenerEntry.NOT_CHANGED;
1673
1674                 } catch (Throwable t) {
1675
1676                         Logger.defaultLogError(t);
1677                         entry.except(t);
1678                         return ListenerEntry.NO_VALUE;
1679
1680                 }
1681
1682         }
1683
1684         public boolean hasScheduledUpdates() {
1685                 return !scheduledListeners.isEmpty();
1686         }
1687
1688         public void performScheduledUpdates(WriteGraphImpl graph) {
1689
1690                 assert (!updating);
1691                 assert (!cache.collecting);
1692                 assert (!firingListeners);
1693
1694                 firingListeners = true;
1695
1696                 try {
1697
1698                         // Performing may cause further events to be scheduled.
1699                         while (!scheduledListeners.isEmpty()) {
1700
1701 //                              graph.restart();
1702 //                              graph.state.barrier.inc();
1703
1704                                 // Clone current events to make new entries possible during
1705                                 // firing.
1706                                 THashSet<ListenerEntry> entries = scheduledListeners;
1707                                 scheduledListeners = new THashSet<ListenerEntry>();
1708
1709                                 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
1710
1711                                 for (ListenerEntry listenerEntry : entries) {
1712
1713                                         if (pruneListener(listenerEntry)) {
1714                                                 if (Development.DEVELOPMENT) {
1715                                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
1716                                                                 new Exception().printStackTrace();
1717                                                                 System.err.println("Pruned " + listenerEntry.procedure);
1718                                                         }
1719                                                 }
1720                                                 continue;
1721                                         }
1722
1723                                         final CacheEntry entry = listenerEntry.entry;
1724                                         assert (entry != null);
1725
1726                                         Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
1727
1728                                         if (newValue != ListenerEntry.NOT_CHANGED) {
1729                                                 if (Development.DEVELOPMENT) {
1730                                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
1731                                                                 new Exception().printStackTrace();
1732                                                                 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
1733                                                         }
1734                                                 }
1735                                                 schedule.add(listenerEntry);
1736                                                 listenerEntry.setLastKnown(entry.getResult());
1737                                         }
1738
1739                                 }
1740
1741                                 for(ListenerEntry listenerEntry : schedule) {
1742                                         final CacheEntry entry = listenerEntry.entry;
1743                                         if (Development.DEVELOPMENT) {
1744                                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
1745                                                         System.err.println("Firing " + listenerEntry.procedure);
1746                                                 }
1747                                         }
1748                                         try {
1749                                                 if (Development.DEVELOPMENT) {
1750                                                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
1751                                                                 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
1752                                                         }
1753                                                 }
1754                                                 entry.performFromCache(graph, listenerEntry.procedure);
1755                                         } catch (Throwable t) {
1756                                                 t.printStackTrace();
1757                                         }
1758                                 }
1759
1760 //                              graph.state.barrier.dec();
1761 //                              graph.waitAsync(null);
1762 //                              graph.state.barrier.assertReady();
1763
1764                         }
1765
1766                 } finally {
1767                         firingListeners = false;
1768                 }
1769
1770         }
1771
1772         /**
1773          * 
1774          * @return true if this entry still has listeners
1775          */
1776         public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1777
1778                 assert (!cache.collecting);
1779                 assert (!updating);
1780                 updating = true;
1781
1782                 boolean hadListeners = false;
1783                 boolean listenersUnknown = false;
1784
1785                 try {
1786
1787                         assert(entry != null);
1788                         LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1789                         IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1790                         todo.add(new UpdateEntry(null, entry, 0));
1791
1792                         while(true) {
1793
1794                                 // Walk the tree and collect immediate updates
1795                                 while (!todo.isEmpty()) {
1796                                         UpdateEntry e = todo.pop();
1797                                         hadListeners |= updateQuery(e, todo, immediates);
1798                                 }
1799
1800                                 if(immediates.isEmpty()) break;
1801
1802                                 // Evaluate all immediate updates and collect parents to update
1803                                 for(CacheEntry immediate : immediates.values()) {
1804
1805                                         if(immediate.isDiscarded()) {
1806                                                 continue;
1807                                         }
1808
1809                                         if(immediate.isExcepted()) {
1810
1811                                                 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1812                                                 if (newValue != ListenerEntry.NOT_CHANGED)
1813                                                         updateParents(0, immediate, todo);
1814
1815                                         } else {
1816
1817                                                 Object oldValue = immediate.getResult();
1818                                                 Object newValue = compareTo(graph, immediate, oldValue);
1819
1820                                                 if (newValue != ListenerEntry.NOT_CHANGED) {
1821                                                         updateParents(0, immediate, todo);
1822                                                 } else {
1823                                                         // If not changed, keep the old value
1824                                                         immediate.setResult(oldValue);
1825                                                         immediate.setReady();
1826                                                         listenersUnknown = true;
1827                                                 }
1828
1829                                         }
1830
1831                                 }
1832                                 immediates.clear();
1833
1834                         }
1835
1836                 } catch (Throwable t) {
1837                         Logger.defaultLogError(t);
1838                 }
1839
1840                 assert (updating);
1841                 updating = false;
1842
1843                 return hadListeners | listenersUnknown;
1844
1845         }
1846
1847         private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1848         private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1849         private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1850         // Maybe use a mutex from util.concurrent?
1851         private Object primitiveUpdateLock = new Object();
1852         private THashSet scheduledPrimitiveUpdates = new THashSet();
1853
1854         private ArrayList<CacheEntry> refutations = new ArrayList<>();
1855         
1856         private void markForUpdate(ReadGraphImpl graph, CacheEntry e) {
1857                 e.refute();
1858                 refutations.add(e);
1859         }
1860
1861         private void updateRefutations(ReadGraphImpl graph) {
1862                 
1863                 for(CacheEntry e : refutations)
1864                         update(graph, e);
1865                 
1866                 refutations.clear();
1867                 
1868         }
1869         
1870         public void performDirtyUpdates(final ReadGraphImpl graph) {
1871
1872                 cache.dirty = false;
1873                 lastInvalidate = 0;
1874
1875                 if (Development.DEVELOPMENT) {
1876                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1877                                 System.err.println("== Query update ==");
1878                         }
1879                 }
1880
1881                 // Special case - one statement
1882                 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1883
1884                         long arg0 = scheduledObjectUpdates.getFirst();
1885
1886                         final int subject = (int)(arg0 >>> 32);
1887                         final int predicate = (int)(arg0 & 0xffffffff);
1888
1889                         for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1890                         for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1891                         for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1892
1893                         if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1894                                 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1895                                 if(principalTypes != null) markForUpdate(graph, principalTypes);
1896                                 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1897                                 if(types != null) markForUpdate(graph, types);
1898                         }
1899
1900                         if(predicate == subrelationOf) {
1901                                 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1902                                 if(superRelations != null) markForUpdate(graph, superRelations);
1903                         }
1904
1905                         DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1906                         if(dp != null) markForUpdate(graph, dp);
1907                         OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1908                         if(os != null) markForUpdate(graph, os);
1909
1910                         updateRefutations(graph);
1911                         
1912                         scheduledObjectUpdates.clear();
1913
1914                         if (Development.DEVELOPMENT) {
1915                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1916                                         System.err.println("== Query update ends ==");
1917                                 }
1918                         }
1919
1920                         return;
1921
1922                 }
1923
1924                 // Special case - one value
1925                 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1926
1927                         int arg0 = scheduledValueUpdates.getFirst();
1928
1929                         ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1930                         if(valueQuery != null) markForUpdate(graph, valueQuery);
1931
1932                         updateRefutations(graph);
1933
1934                         scheduledValueUpdates.clear();
1935
1936                         if (Development.DEVELOPMENT) {
1937                                 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1938                                         System.err.println("== Query update ends ==");
1939                                 }
1940                         }
1941                         
1942                         return;
1943
1944                 }
1945
1946                 final TIntHashSet predicates = new TIntHashSet();
1947                 final TIntHashSet orderedSets = new TIntHashSet();
1948
1949                 THashSet primitiveUpdates;
1950                 synchronized (primitiveUpdateLock) {
1951                         primitiveUpdates = scheduledPrimitiveUpdates;
1952                         scheduledPrimitiveUpdates = new THashSet();
1953                 }
1954
1955                 scheduledValueUpdates.forEach(new TIntProcedure() {
1956
1957                         @Override
1958                         public boolean execute(int arg0) {
1959                                 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1960                                 if(valueQuery != null) markForUpdate(graph, valueQuery);
1961                                 return true;
1962                         }
1963
1964                 });
1965
1966                 scheduledInvalidates.forEach(new TIntProcedure() {
1967
1968                         @Override
1969                         public boolean execute(int resource) {
1970                                 
1971                                 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1972                                 if(valueQuery != null) markForUpdate(graph, valueQuery);
1973                                 
1974                                 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1975                                 if(principalTypes != null) markForUpdate(graph, principalTypes);
1976                                 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1977                                 if(types != null) markForUpdate(graph, types);
1978
1979                                 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1980                                 if(superRelations != null) markForUpdate(graph, superRelations);
1981
1982                                 predicates.add(resource);
1983                                 
1984                                 return true;
1985                         }
1986
1987                 });
1988
1989                 scheduledObjectUpdates.forEach(new TLongProcedure() {
1990
1991                         @Override
1992                         public boolean execute(long arg0) {
1993
1994                                 final int subject = (int)(arg0 >>> 32);
1995                                 final int predicate = (int)(arg0 & 0xffffffff);
1996
1997                                 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1998                                         PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1999                                         if(principalTypes != null) markForUpdate(graph, principalTypes);
2000                                         Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
2001                                         if(types != null) markForUpdate(graph, types);
2002                                 }
2003
2004                                 if(predicate == subrelationOf) {
2005                                         SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
2006                                         if(superRelations != null) markForUpdate(graph, superRelations);
2007                                 }
2008
2009                                 predicates.add(subject);
2010                                 orderedSets.add(predicate);
2011
2012                                 return true;
2013
2014                         }
2015
2016                 });
2017
2018                 predicates.forEach(new TIntProcedure() {
2019
2020                         @Override
2021                         public boolean execute(final int subject) {
2022
2023                                 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
2024                                 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
2025                                 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
2026
2027                                 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
2028                                 if(entry != null) markForUpdate(graph, entry);
2029
2030                                 return true;
2031
2032                         }
2033
2034                 });
2035
2036                 orderedSets.forEach(new TIntProcedure() {
2037
2038                         @Override
2039                         public boolean execute(int orderedSet) {
2040
2041                                 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
2042                                 if(entry != null) markForUpdate(graph, entry);
2043
2044                                 return true;
2045
2046                         }
2047
2048                 });
2049
2050                 updateRefutations(graph);
2051
2052                 primitiveUpdates.forEach(new TObjectProcedure() {
2053
2054                         @Override
2055                         public boolean execute(Object arg0) {
2056
2057                                 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
2058                                 if (query != null) {
2059                                         boolean listening = update(graph, query);
2060                                         if (!listening && !query.hasParents()) {
2061                                                 cache.externalReadEntryMap.remove(arg0);
2062                                                 query.discard();
2063                                         }
2064                                 }
2065                                 return true;
2066                         }
2067
2068                 });
2069                 
2070                 scheduledValueUpdates.clear();
2071                 scheduledObjectUpdates.clear();
2072                 scheduledInvalidates.clear();
2073                 
2074                 if (Development.DEVELOPMENT) {
2075                         if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2076                                 System.err.println("== Query update ends ==");
2077                         }
2078                 }
2079
2080         }
2081
2082         public void updateValue(final int resource) {
2083                 scheduledValueUpdates.add(resource);
2084                 cache.dirty = true;
2085         }
2086
2087         public void updateStatements(final int resource, final int predicate) {
2088                 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
2089                 cache.dirty = true;
2090         }
2091         
2092         private int lastInvalidate = 0;
2093         
2094         public void invalidateResource(final int resource) {
2095                 if(lastInvalidate == resource) return;
2096                 scheduledValueUpdates.add(resource);
2097                 lastInvalidate = resource;
2098                 cache.dirty = true;
2099         }
2100
2101         public void updatePrimitive(final ExternalRead primitive) {
2102
2103                 // External reads may be updated from arbitrary threads.
2104                 // Synchronize to prevent race-conditions.
2105                 synchronized (primitiveUpdateLock) {
2106                         scheduledPrimitiveUpdates.add(primitive);
2107                 }
2108                 querySupport.dirtyPrimitives();
2109
2110         }
2111
2112         @Override
2113         public synchronized String toString() {
2114                 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
2115         }
2116
2117         @Override
2118         protected void doDispose() {
2119
2120                 for(int index = 0; index < THREADS; index++) { 
2121                         executors[index].dispose();
2122                 }
2123
2124                 // First just wait
2125                 for(int i=0;i<100;i++) {
2126
2127                         boolean alive = false;
2128                         for(int index = 0; index < THREADS; index++) { 
2129                                 alive |= executors[index].isAlive();
2130                         }
2131                         if(!alive) return;
2132                         try {
2133                                 Thread.sleep(5);
2134                         } catch (InterruptedException e) {
2135                                 Logger.defaultLogError(e);
2136                         }
2137
2138                 }
2139
2140                 // Then start interrupting
2141                 for(int i=0;i<100;i++) {
2142
2143                         boolean alive = false;
2144                         for(int index = 0; index < THREADS; index++) { 
2145                                 alive |= executors[index].isAlive();
2146                         }
2147                         if(!alive) return;
2148                         for(int index = 0; index < THREADS; index++) {
2149                                 executors[index].interrupt();
2150                         }
2151                 }
2152
2153                 //              // Then just destroy
2154                 //              for(int index = 0; index < THREADS; index++) {
2155                 //                      executors[index].destroy();
2156                 //              }
2157
2158                 for(int index = 0; index < THREADS; index++) {
2159                         try {
2160                                 executors[index].join(5000);
2161                         } catch (InterruptedException e) {
2162                                 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2163                         }
2164                         executors[index] = null;
2165                 }
2166
2167         }
2168
2169         public int getHits() {
2170                 return cache.hits;
2171         }
2172
2173         public int getMisses() {
2174                 return cache.misses;
2175         }
2176
2177         public int getSize() {
2178                 return cache.size;
2179         }
2180
2181         public Set<Long> getReferencedClusters() {
2182                 HashSet<Long> result = new HashSet<Long>();
2183                 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
2184                         Objects query = (Objects) entry.getQuery();
2185                         result.add(querySupport.getClusterId(query.r1()));
2186                 }
2187                 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
2188                         DirectPredicates query = (DirectPredicates) entry.getQuery();
2189                         result.add(querySupport.getClusterId(query.id));
2190                 }
2191                 for (CacheEntry entry : cache.valueQueryMap.values()) {
2192                         ValueQuery query = (ValueQuery) entry.getQuery();
2193                         result.add(querySupport.getClusterId(query.id));
2194                 }
2195                 return result;
2196         }
2197
2198         public void assertDone() {
2199         }
2200
2201         CacheCollectionResult allCaches(CacheCollectionResult result) {
2202                 
2203                 return cache.allCaches(result);
2204
2205         }
2206
2207         public void printDiagnostics() {
2208         }
2209
2210         public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
2211                 querySupport.requestCluster(graph, clusterId, runnable);
2212         }
2213
2214         public int clean() {
2215                 collector.collect(0, Integer.MAX_VALUE);
2216                 return cache.size;
2217         }
2218
2219         public void clean(final Collection<ExternalRead<?>> requests) {
2220                 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
2221                         Iterator<ExternalRead<?>> iterator = requests.iterator();
2222                         @Override
2223                         public CacheCollectionResult allCaches() {
2224                                 throw new UnsupportedOperationException();
2225                         }
2226                         @Override
2227                         public CacheEntryBase iterate(int level) {
2228                                 if(iterator.hasNext()) {
2229                                         ExternalRead<?> request = iterator.next();
2230                                         ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2231                                         if (entry != null) return entry;
2232                                         else return iterate(level);
2233                                 } else {
2234                                         iterator = requests.iterator();
2235                                         return null;
2236                                 }
2237                         }
2238                         @Override
2239                         public void remove() {
2240                                 throw new UnsupportedOperationException();
2241                         }
2242                         @Override
2243                         public void setLevel(CacheEntryBase entry, int level) {
2244                                 throw new UnsupportedOperationException();
2245                         }
2246                         @Override
2247                         public Collection<CacheEntry> getRootList() {
2248                                 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
2249                                 for (ExternalRead<?> request : requests) {
2250                                         ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2251                                         if (entry != null)
2252                                                 result.add(entry);
2253                                 }
2254                                 return result;
2255                         }
2256                         @Override
2257                         public int getCurrentSize() {
2258                                 return cache.size;
2259                         }
2260                         @Override
2261                         public int calculateCurrentSize() {
2262                                 // This tells the collector to attempt collecting everything.
2263                                 return Integer.MAX_VALUE;
2264                         }
2265                         @Override
2266                         public boolean start(boolean flush) {
2267                                 return true;
2268                         }
2269                 };
2270                 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
2271         }
2272
2273         public void scanPending() {
2274                 
2275                 cache.scanPending();
2276
2277         }
2278
2279         public ReadGraphImpl graphForVirtualRequest() {
2280                 return ReadGraphImpl.createAsync(this);
2281         }
2282
2283         
2284         private HashMap<Resource, Class<?>> builtinValues;
2285         
2286         public Class<?> getBuiltinValue(Resource r) {
2287                 if(builtinValues == null) initBuiltinValues();
2288                 return builtinValues.get(r);
2289         }
2290
2291         Exception callerException = null;
2292
2293         public interface AsyncBarrier {
2294                 public void inc(); 
2295                 public void dec();
2296                 //        public void inc(String debug); 
2297                 //        public void dec(String debug);
2298         }
2299
2300 //      final public QueryProcessor processor;
2301 //      final public QuerySupport support;
2302
2303         //    boolean disposed = false;
2304
2305         private void initBuiltinValues() {
2306
2307                 Layer0 b = getSession().peekService(Layer0.class);
2308                 if(b == null) return;
2309
2310                 builtinValues = new HashMap<Resource, Class<?>>();
2311
2312                 builtinValues.put(b.String, String.class);
2313                 builtinValues.put(b.Double, Double.class);
2314                 builtinValues.put(b.Float, Float.class);
2315                 builtinValues.put(b.Long, Long.class);
2316                 builtinValues.put(b.Integer, Integer.class);
2317                 builtinValues.put(b.Byte, Byte.class);
2318                 builtinValues.put(b.Boolean, Boolean.class);
2319
2320                 builtinValues.put(b.StringArray, String[].class);
2321                 builtinValues.put(b.DoubleArray, double[].class);
2322                 builtinValues.put(b.FloatArray, float[].class);
2323                 builtinValues.put(b.LongArray, long[].class);
2324                 builtinValues.put(b.IntegerArray, int[].class);
2325                 builtinValues.put(b.ByteArray, byte[].class);
2326                 builtinValues.put(b.BooleanArray, boolean[].class);
2327
2328         }
2329
2330 //      public ReadGraphSupportImpl(final QueryProcessor provider2) {
2331 //
2332 //              if (null == provider2) {
2333 //                      this.processor = null;
2334 //                      support = null;
2335 //                      return;
2336 //              }
2337 //              this.processor = provider2;
2338 //              support = provider2.getCore();
2339 //              initBuiltinValues();
2340 //
2341 //      }
2342
2343 //      final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2344 //              return new ReadGraphSupportImpl(impl.processor);
2345 //      }
2346
2347         @Override
2348         final public Session getSession() {
2349                 return session;
2350         }
2351         
2352         final public ResourceSupport getResourceSupport() {
2353                 return resourceSupport;
2354         }
2355
2356         @Override
2357         final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2358
2359         try {
2360
2361                 for(Resource predicate : getPredicates(impl, subject))
2362                     procedure.execute(impl, predicate);
2363
2364                 procedure.finished(impl);
2365
2366             } catch (Throwable e) {
2367                 procedure.exception(impl, e);
2368             }
2369
2370         }
2371
2372         @Override
2373         final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2374                 
2375                 throw new UnsupportedOperationException();
2376
2377 //              assert(subject != null);
2378 //              assert(procedure != null);
2379 //
2380 //              final ListenerBase listener = getListenerBase(procedure);
2381 //
2382 //              try {
2383 //                      QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2384 //
2385 //                              @Override
2386 //                              public void execute(ReadGraphImpl graph, int i) {
2387 //                                      try {
2388 //                                              procedure.execute(querySupport.getResource(i));
2389 //                                      } catch (Throwable t2) {
2390 //                                              Logger.defaultLogError(t2);
2391 //                                      }
2392 //                              }
2393 //
2394 //                              @Override
2395 //                              public void finished(ReadGraphImpl graph) {
2396 //                                      try {
2397 //                                              procedure.finished();
2398 //                                      } catch (Throwable t2) {
2399 //                                              Logger.defaultLogError(t2);
2400 //                                      }
2401 ////                            impl.state.barrier.dec();
2402 //                              }
2403 //
2404 //                              @Override
2405 //                              public void exception(ReadGraphImpl graph, Throwable t) {
2406 //                                      try {
2407 //                                              procedure.exception(t);
2408 //                                      } catch (Throwable t2) {
2409 //                                              Logger.defaultLogError(t2);
2410 //                                      }
2411 ////                            impl.state.barrier.dec();
2412 //                              }
2413 //
2414 //                      });
2415 //              } catch (DatabaseException e) {
2416 //                      Logger.defaultLogError(e);
2417 //              }
2418
2419         }
2420         
2421         @Override
2422         final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2423                 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null); 
2424         }
2425
2426         @Override
2427         final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2428                         final Resource predicate, final MultiProcedure<Statement> procedure) {
2429
2430                 assert(subject != null);
2431                 assert(predicate != null);
2432                 assert(procedure != null);
2433
2434                 final ListenerBase listener = getListenerBase(procedure);
2435
2436 //              impl.state.barrier.inc();
2437
2438                 try {
2439                         Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2440
2441                                 @Override
2442                                 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2443                                         try {
2444                                                 procedure.execute(querySupport.getStatement(s, p, o));
2445                                         } catch (Throwable t2) {
2446                                                 Logger.defaultLogError(t2);
2447                                         }
2448                                 }
2449
2450                                 @Override
2451                                 public void finished(ReadGraphImpl graph) {
2452                                         try {
2453                                                 procedure.finished();
2454                                         } catch (Throwable t2) {
2455                                                 Logger.defaultLogError(t2);
2456                                         }
2457 //                              impl.state.barrier.dec();
2458                                 }
2459
2460                                 @Override
2461                                 public void exception(ReadGraphImpl graph, Throwable t) {
2462                                         try {
2463                                                 procedure.exception(t);
2464                                         } catch (Throwable t2) {
2465                                                 Logger.defaultLogError(t2);