1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.lang.ref.WeakReference;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.IdentityHashMap;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
32 import java.util.concurrent.Semaphore;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.locks.Condition;
36 import java.util.concurrent.locks.ReentrantLock;
38 import org.simantics.databoard.Bindings;
39 import org.simantics.db.AsyncReadGraph;
40 import org.simantics.db.DevelopmentKeys;
41 import org.simantics.db.DirectStatements;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.RelationInfo;
44 import org.simantics.db.Resource;
45 import org.simantics.db.Session;
46 import org.simantics.db.Statement;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
49 import org.simantics.db.common.utils.Logger;
50 import org.simantics.db.debug.ListenerReport;
51 import org.simantics.db.exception.DatabaseException;
52 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
53 import org.simantics.db.exception.NoInverseException;
54 import org.simantics.db.exception.ResourceNotFoundException;
55 import org.simantics.db.impl.DebugPolicy;
56 import org.simantics.db.impl.ResourceImpl;
57 import org.simantics.db.impl.graph.MultiIntProcedure;
58 import org.simantics.db.impl.graph.ReadGraphImpl;
59 import org.simantics.db.impl.graph.ReadGraphSupport;
60 import org.simantics.db.impl.graph.WriteGraphImpl;
61 import org.simantics.db.impl.procedure.IntProcedureAdapter;
62 import org.simantics.db.impl.procedure.InternalProcedure;
63 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
64 import org.simantics.db.impl.support.ResourceSupport;
65 import org.simantics.db.procedure.AsyncMultiListener;
66 import org.simantics.db.procedure.AsyncMultiProcedure;
67 import org.simantics.db.procedure.AsyncProcedure;
68 import org.simantics.db.procedure.AsyncSetListener;
69 import org.simantics.db.procedure.ListenerBase;
70 import org.simantics.db.procedure.MultiProcedure;
71 import org.simantics.db.procedure.Procedure;
72 import org.simantics.db.procedure.StatementProcedure;
73 import org.simantics.db.request.AsyncMultiRead;
74 import org.simantics.db.request.AsyncRead;
75 import org.simantics.db.request.ExternalRead;
76 import org.simantics.db.request.MultiRead;
77 import org.simantics.db.request.Read;
78 import org.simantics.db.request.RequestFlags;
79 import org.simantics.db.request.WriteTraits;
80 import org.simantics.layer0.Layer0;
81 import org.simantics.utils.DataContainer;
82 import org.simantics.utils.Development;
83 import org.simantics.utils.datastructures.Pair;
84 import org.simantics.utils.datastructures.collections.CollectionUtils;
85 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
87 import gnu.trove.procedure.TIntProcedure;
88 import gnu.trove.procedure.TLongProcedure;
89 import gnu.trove.procedure.TObjectProcedure;
90 import gnu.trove.set.hash.THashSet;
91 import gnu.trove.set.hash.TIntHashSet;
93 @SuppressWarnings({"rawtypes", "unchecked"})
94 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
96 public static int indent = 0;
101 public int boundQueries = 0;
104 final private int functionalRelation;
106 final private int superrelationOf;
108 final private int instanceOf;
110 final private int inverseOf;
112 final private int asserts;
114 final private int hasPredicate;
116 final private int hasPredicateInverse;
118 final private int hasObject;
120 final private int inherits;
122 final private int subrelationOf;
124 final private int rootLibrary;
127 * A cache for the root library resource. Initialized in
128 * {@link #getRootLibraryResource()}.
130 private volatile ResourceImpl rootLibraryResource;
132 final private int library;
134 final private int consistsOf;
136 final private int hasName;
138 AtomicInteger sleepers = new AtomicInteger(0);
140 private boolean updating = false;
143 private boolean firingListeners = false;
145 final public QueryCache cache;
146 final public QuerySupport querySupport;
147 final public Session session;
148 final public ResourceSupport resourceSupport;
150 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
152 QueryThread[] executors;
154 public ArrayList<SessionTask>[] queues;
158 INIT, RUN, SLEEP, DISPOSED
162 public ThreadState[] threadStates;
163 public ReentrantLock[] threadLocks;
164 public Condition[] threadConditions;
166 public ArrayList<SessionTask>[] ownTasks;
168 public ArrayList<SessionTask>[] ownSyncTasks;
170 ArrayList<SessionTask>[] delayQueues;
172 public boolean synch = true;
174 final Object querySupportLock;
176 public Long modificationCounter = 0L;
178 public void close() {
181 final public void scheduleOwn(int caller, SessionTask request) {
182 ownTasks[caller].add(request);
185 final public void scheduleAlways(int caller, SessionTask request) {
187 int performer = request.thread;
188 if(caller == performer) {
189 ownTasks[caller].add(request);
191 schedule(caller, request);
196 final public void schedule(int caller, SessionTask request) {
198 int performer = request.thread;
200 if(DebugPolicy.SCHEDULE)
201 System.out.println("schedule " + request + " " + caller + " -> " + performer);
203 assert(performer >= 0);
205 assert(request != null);
207 if(caller == performer) {
210 ReentrantLock queueLock = threadLocks[performer];
212 queues[performer].add(request);
213 // This thread could have been sleeping
214 if(queues[performer].size() == 1) {
215 if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
216 threadConditions[performer].signalAll();
225 final public int THREAD_MASK;
227 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
229 public static abstract class SessionTask {
231 final public int thread;
232 final public int syncCaller;
233 final public Object object;
235 public SessionTask(WriteTraits object, int thread) {
236 this.thread = thread;
237 this.syncCaller = -1;
238 this.object = object;
241 public SessionTask(Object object, int thread, int syncCaller) {
242 this.thread = thread;
243 this.syncCaller = syncCaller;
244 this.object = object;
247 public abstract void run(int thread);
250 public String toString() {
251 return "SessionTask[" + object + "]";
256 public static abstract class SessionRead extends SessionTask {
258 final public Semaphore notify;
259 final public DataContainer<Throwable> throwable;
261 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
262 super(object, thread, thread);
263 this.throwable = throwable;
264 this.notify = notify;
267 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
268 super(object, thread, syncThread);
269 this.throwable = throwable;
270 this.notify = notify;
275 long waitingTime = 0;
278 static int koss2 = 0;
280 public boolean resume(ReadGraphImpl graph) {
281 return executors[0].runSynchronized();
284 //private WeakReference<GarbageTracker> garbageTracker;
286 private class GarbageTracker {
289 protected void finalize() throws Throwable {
291 // System.err.println("GarbageTracker");
293 // garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
301 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
302 throws DatabaseException {
304 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
307 THREAD_MASK = threads - 1;
310 cache = new QueryCache(core, threads);
311 session = querySupport.getSession();
312 resourceSupport = querySupport.getSupport();
313 querySupportLock = core.getLock();
315 executors = new QueryThread[THREADS];
316 queues = new ArrayList[THREADS];
317 threadLocks = new ReentrantLock[THREADS];
318 threadConditions = new Condition[THREADS];
319 threadStates = new ThreadState[THREADS];
320 ownTasks = new ArrayList[THREADS];
321 ownSyncTasks = new ArrayList[THREADS];
322 delayQueues = new ArrayList[THREADS * THREADS];
324 // freeSchedule = new AtomicInteger(0);
326 for (int i = 0; i < THREADS * THREADS; i++) {
327 delayQueues[i] = new ArrayList<SessionTask>();
330 for (int i = 0; i < THREADS; i++) {
332 // tasks[i] = new ArrayList<Runnable>();
333 ownTasks[i] = new ArrayList<SessionTask>();
334 ownSyncTasks[i] = new ArrayList<SessionTask>();
335 queues[i] = new ArrayList<SessionTask>();
336 threadLocks[i] = new ReentrantLock();
337 threadConditions[i] = threadLocks[i].newCondition();
338 // limits[i] = false;
339 threadStates[i] = ThreadState.INIT;
343 for (int i = 0; i < THREADS; i++) {
347 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
349 threadSet.add(executors[i]);
354 for (int i = 0; i < THREADS; i++) {
355 executors[i].start();
358 // Make sure that query threads are up and running
359 while(sleepers.get() != THREADS) {
362 } catch (InterruptedException e) {
367 rootLibrary = core.getBuiltin("http:/");
368 boolean builtinsInstalled = rootLibrary != 0;
370 if (builtinsInstalled) {
371 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
372 assert (functionalRelation != 0);
374 functionalRelation = 0;
376 if (builtinsInstalled) {
377 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
378 assert (instanceOf != 0);
382 if (builtinsInstalled) {
383 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
384 assert (inverseOf != 0);
389 if (builtinsInstalled) {
390 inherits = core.getBuiltin(Layer0.URIs.Inherits);
391 assert (inherits != 0);
395 if (builtinsInstalled) {
396 asserts = core.getBuiltin(Layer0.URIs.Asserts);
397 assert (asserts != 0);
401 if (builtinsInstalled) {
402 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
403 assert (hasPredicate != 0);
407 if (builtinsInstalled) {
408 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
409 assert (hasPredicateInverse != 0);
411 hasPredicateInverse = 0;
413 if (builtinsInstalled) {
414 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
415 assert (hasObject != 0);
419 if (builtinsInstalled) {
420 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
421 assert (subrelationOf != 0);
425 if (builtinsInstalled) {
426 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
427 assert (superrelationOf != 0);
431 if (builtinsInstalled) {
432 library = core.getBuiltin(Layer0.URIs.Library);
433 assert (library != 0);
437 if (builtinsInstalled) {
438 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
439 assert (consistsOf != 0);
443 if (builtinsInstalled) {
444 hasName = core.getBuiltin(Layer0.URIs.HasName);
445 assert (hasName != 0);
451 final public void releaseWrite(ReadGraphImpl graph) {
452 performDirtyUpdates(graph);
453 modificationCounter++;
456 final public int getId(final Resource r) {
457 return querySupport.getId(r);
460 public QuerySupport getCore() {
464 public int getFunctionalRelation() {
465 return functionalRelation;
468 public int getInherits() {
472 public int getInstanceOf() {
476 public int getInverseOf() {
480 public int getSubrelationOf() {
481 return subrelationOf;
484 public int getSuperrelationOf() {
485 return superrelationOf;
488 public int getAsserts() {
492 public int getHasPredicate() {
496 public int getHasPredicateInverse() {
497 return hasPredicateInverse;
500 public int getHasObject() {
504 public int getRootLibrary() {
508 public Resource getRootLibraryResource() {
509 if (rootLibraryResource == null) {
510 // Synchronization is not needed here, it doesn't matter if multiple
511 // threads simultaneously set rootLibraryResource once.
512 int root = getRootLibrary();
514 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
515 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
517 return rootLibraryResource;
520 public int getLibrary() {
524 public int getConsistsOf() {
528 public int getHasName() {
532 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
536 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
539 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
541 if (result != null && result != 0) {
542 procedure.execute(graph, result);
546 // Fall back to using the fixed builtins.
547 // result = querySupport.getBuiltin(id);
548 // if (result != 0) {
549 // procedure.execute(graph, result);
554 // result = querySupport.getRandomAccessReference(id);
555 // } catch (ResourceNotFoundException e) {
556 // procedure.exception(graph, e);
561 procedure.execute(graph, result);
563 procedure.exception(graph, new ResourceNotFoundException(id));
569 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
570 procedure.exception(graph, t);
574 } catch (DatabaseException e) {
575 Logger.defaultLogError(e);
580 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
582 Integer result = querySupport.getBuiltin(id);
584 procedure.execute(graph, result);
586 procedure.exception(graph, new ResourceNotFoundException(id));
591 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
594 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
595 } catch (DatabaseException e) {
596 throw new IllegalStateException(e);
601 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
605 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
606 } catch (DatabaseException e) {
607 throw new IllegalStateException(e);
612 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
613 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
617 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
619 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
623 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
625 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
629 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
631 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
635 boolean isBound(ExternalReadEntry<?> entry) {
636 if(entry.hasParents()) return true;
637 else if(hasListener(entry)) return true;
641 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
643 if (parent != null && !inferred) {
645 if(!child.isImmutable(graph))
646 child.addParent(parent);
647 } catch (DatabaseException e) {
648 Logger.defaultLogError(e);
650 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
653 if (listener != null) {
654 return registerListener(child, listener, procedure);
662 static class Dummy implements InternalProcedure<Object>, IntProcedure {
665 public void execute(ReadGraphImpl graph, int i) {
669 public void finished(ReadGraphImpl graph) {
673 public void execute(ReadGraphImpl graph, Object result) {
677 public void exception(ReadGraphImpl graph, Throwable throwable) {
682 private static final Dummy dummy = new Dummy();
685 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
687 if (DebugPolicy.PERFORM)
688 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
691 assert (!collecting);
693 assert(query.assertNotDiscarded());
695 registerDependencies(graph, query, parent, listener, procedure, false);
697 // FRESH, REFUTED, EXCEPTED go here
698 if (!query.isReady()) {
703 query.computeForEach(graph, this, (Procedure)dummy, true);
704 return query.get(graph, this, null);
710 return query.get(graph, this, procedure);
718 interface QueryCollectorSupport {
719 public CacheCollectionResult allCaches();
720 public Collection<CacheEntry> getRootList();
721 public int getCurrentSize();
722 public int calculateCurrentSize();
723 public CacheEntryBase iterate(int level);
724 public void remove();
725 public void setLevel(CacheEntryBase entry, int level);
726 public boolean start(boolean flush);
729 interface QueryCollector {
731 public void collect(int youngTarget, int allowedTimeInMs);
735 class QueryCollectorSupportImpl implements QueryCollectorSupport {
737 private static final boolean DEBUG = false;
738 private static final double ITERATION_RATIO = 0.2;
740 private CacheCollectionResult iteration = new CacheCollectionResult();
741 private boolean fresh = true;
742 private boolean needDataInStart = true;
744 QueryCollectorSupportImpl() {
748 public CacheCollectionResult allCaches() {
749 CacheCollectionResult result = new CacheCollectionResult();
750 QueryProcessor.this.allCaches(result);
755 public boolean start(boolean flush) {
756 // We need new data from query maps
758 if(needDataInStart || flush) {
759 // Last run ended after processing all queries => refresh data
760 restart(flush ? 0.0 : ITERATION_RATIO);
762 // continue with previous big data
764 // Notify caller about iteration situation
765 return iteration.isAtStart();
768 private void restart(double targetRatio) {
770 needDataInStart = true;
772 long start = System.nanoTime();
775 // We need new data from query maps
777 int iterationSize = iteration.size()+1;
778 int diff = calculateCurrentSize()-iterationSize;
780 double ratio = (double)diff / (double)iterationSize;
781 boolean dirty = Math.abs(ratio) >= targetRatio;
784 iteration = allCaches();
786 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
787 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
788 System.err.print(" " + iteration.levels[i].size());
789 System.err.println("");
796 needDataInStart = false;
798 // We are returning here within the same GC round - reuse the cache table
807 public CacheEntryBase iterate(int level) {
809 CacheEntryBase entry = iteration.next(level);
811 restart(ITERATION_RATIO);
815 while(entry != null && entry.isDiscarded()) {
816 entry = iteration.next(level);
824 public void remove() {
829 public void setLevel(CacheEntryBase entry, int level) {
830 iteration.setLevel(entry, level);
833 public Collection<CacheEntry> getRootList() {
834 return cache.getRootList();
838 public int calculateCurrentSize() {
839 return cache.calculateCurrentSize();
843 public int getCurrentSize() {
848 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
850 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
851 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
853 public int querySize() {
857 public void gc(int youngTarget, int allowedTimeInMs) {
859 collector.collect(youngTarget, allowedTimeInMs);
863 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
865 assert (entry != null);
867 if (base.isDisposed())
870 return addListener(entry, base, procedure);
874 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
875 entry.setLastKnown(result);
878 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
880 assert (entry != null);
881 assert (procedure != null);
883 ArrayList<ListenerEntry> list = cache.listeners.get(entry);
885 list = new ArrayList<ListenerEntry>(1);
886 cache.listeners.put(entry, list);
889 ListenerEntry result = new ListenerEntry(entry, base, procedure);
890 int currentIndex = list.indexOf(result);
891 // There was already a listener
892 if(currentIndex > -1) {
893 ListenerEntry current = list.get(currentIndex);
894 if(!current.base.isDisposed()) return null;
895 list.set(currentIndex, result);
900 if(DebugPolicy.LISTENER) {
901 new Exception().printStackTrace();
902 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
909 private void scheduleListener(ListenerEntry entry) {
910 assert (entry != null);
911 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
912 scheduledListeners.add(entry);
915 private void removeListener(ListenerEntry entry) {
916 assert (entry != null);
917 ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
918 if(list == null) return;
919 boolean success = list.remove(entry);
922 cache.listeners.remove(entry.entry);
925 private boolean hasListener(CacheEntry entry) {
926 if(cache.listeners.get(entry) != null) return true;
930 boolean hasListenerAfterDisposing(CacheEntry entry) {
931 if(cache.listeners.get(entry) != null) {
932 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
933 ArrayList<ListenerEntry> list = null;
934 for (ListenerEntry e : entries) {
935 if (e.base.isDisposed()) {
936 if(list == null) list = new ArrayList<ListenerEntry>();
941 for (ListenerEntry e : list) {
945 if (entries.isEmpty()) {
946 cache.listeners.remove(entry);
954 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
955 hasListenerAfterDisposing(entry);
956 if(cache.listeners.get(entry) != null)
957 return cache.listeners.get(entry);
959 return Collections.emptyList();
962 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
964 if(!workarea.containsKey(entry)) {
966 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
967 for(ListenerEntry e : getListenerEntries(entry))
970 workarea.put(entry, ls);
972 for(CacheEntry parent : entry.getParents(this)) {
973 processListenerReport(parent, workarea);
974 ls.addAll(workarea.get(parent));
981 public synchronized ListenerReport getListenerReport() throws IOException {
983 class ListenerReportImpl implements ListenerReport {
985 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
988 public void print(PrintStream b) {
989 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
990 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
991 for(ListenerBase l : e.getValue()) {
992 Integer i = hist.get(l);
993 hist.put(l, i != null ? i-1 : -1);
997 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
998 b.print("" + -p.second + " " + p.first + "\n");
1006 ListenerReportImpl result = new ListenerReportImpl();
1008 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1009 for(CacheEntryBase entry : all) {
1010 hasListenerAfterDisposing(entry);
1012 for(CacheEntryBase entry : all) {
1013 processListenerReport(entry, result.workarea);
1020 public synchronized String reportListeners(File file) throws IOException {
1025 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1026 ListenerReport report = getListenerReport();
1029 return "Done reporting listeners.";
1033 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1035 if(entry.isDiscarded()) return;
1036 if(workarea.containsKey(entry)) return;
1038 Iterable<CacheEntry> parents = entry.getParents(this);
1039 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1040 for(CacheEntry e : parents) {
1041 if(e.isDiscarded()) continue;
1043 processParentReport(e, workarea);
1045 workarea.put(entry, ps);
1049 public synchronized String reportQueryActivity(File file) throws IOException {
1051 System.err.println("reportQueries " + file.getAbsolutePath());
1056 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1058 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
1059 Collections.reverse(entries);
1061 for(Pair<String,Integer> entry : entries) {
1062 b.println(entry.first + ": " + entry.second);
1067 Development.histogram.clear();
1073 public synchronized String reportQueries(File file) throws IOException {
1075 System.err.println("reportQueries " + file.getAbsolutePath());
1080 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1082 long start = System.nanoTime();
1084 // ArrayList<CacheEntry> all = ;
1086 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
1087 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1088 for(CacheEntryBase entry : caches) {
1089 processParentReport(entry, workarea);
1092 // for(CacheEntry e : all) System.err.println("entry: " + e);
1094 long duration = System.nanoTime() - start;
1095 System.err.println("Query root set in " + 1e-9*duration + "s.");
1097 start = System.nanoTime();
1099 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
1103 for(CacheEntry entry : workarea.keySet()) {
1104 boolean listener = hasListenerAfterDisposing(entry);
1105 boolean hasParents = entry.getParents(this).iterator().hasNext();
1108 flagMap.put(entry, 0);
1109 } else if (!hasParents) {
1111 flagMap.put(entry, 1);
1114 flagMap.put(entry, 2);
1116 // // Write leaf bit
1117 // entry.flags |= 4;
1120 boolean done = true;
1127 long start2 = System.nanoTime();
1129 int boundCounter = 0;
1130 int unboundCounter = 0;
1131 int unknownCounter = 0;
1133 for(CacheEntry<?> entry : workarea.keySet()) {
1135 //System.err.println("process " + entry);
1137 int flags = flagMap.get(entry);
1138 int bindStatus = flags & 3;
1140 if(bindStatus == 0) boundCounter++;
1141 else if(bindStatus == 1) unboundCounter++;
1142 else if(bindStatus == 2) unknownCounter++;
1144 if(bindStatus < 2) continue;
1147 for(CacheEntry parent : entry.getParents(this)) {
1149 if(parent.isDiscarded()) flagMap.put(parent, 1);
1151 int flags2 = flagMap.get(parent);
1152 int bindStatus2 = flags2 & 3;
1153 // Parent is bound => child is bound
1154 if(bindStatus2 == 0) {
1158 // Parent is unknown => child is unknown
1159 else if (bindStatus2 == 2) {
1166 flagMap.put(entry, newStatus);
1170 duration = System.nanoTime() - start2;
1171 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1172 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1174 } while(!done && loops++ < 20);
1178 for(CacheEntry entry : workarea.keySet()) {
1180 int bindStatus = flagMap.get(entry);
1181 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1187 duration = System.nanoTime() - start;
1188 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1190 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1192 for(CacheEntry entry : workarea.keySet()) {
1193 Class<?> clazz = entry.getClass();
1194 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
1195 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
1196 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
1197 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
1198 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
1199 Integer c = counts.get(clazz);
1200 if(c == null) counts.put(clazz, -1);
1201 else counts.put(clazz, c-1);
1204 b.print("// Simantics DB client query report file\n");
1205 b.print("// This file contains the following information\n");
1206 b.print("// -The amount of cached query instances per query class\n");
1207 b.print("// -The sizes of retained child sets\n");
1208 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1209 b.print("// -Followed by status, where\n");
1210 b.print("// -0=bound\n");
1211 b.print("// -1=free\n");
1212 b.print("// -2=unknown\n");
1213 b.print("// -L=has listener\n");
1214 b.print("// -List of children for each query (search for 'C <query name>')\n");
1216 b.print("----------------------------------------\n");
1218 b.print("// Queries by class\n");
1219 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1220 b.print(-p.second + " " + p.first.getName() + "\n");
1223 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1224 for(CacheEntry e : workarea.keySet())
1227 boolean changed = true;
1229 while(changed && iter++<50) {
1233 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1234 for(CacheEntry e : workarea.keySet())
1237 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1238 Integer c = hist.get(e.getKey());
1239 for(CacheEntry p : e.getValue()) {
1240 Integer i = newHist.get(p);
1241 newHist.put(p, i+c);
1244 for(CacheEntry e : workarea.keySet()) {
1245 Integer value = newHist.get(e);
1246 Integer old = hist.get(e);
1247 if(!value.equals(old)) {
1249 // System.err.println("hist " + e + ": " + old + " => " + value);
1254 System.err.println("Retained set iteration " + iter);
1258 b.print("// Queries by retained set\n");
1259 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1260 b.print("" + -p.second + " " + p.first + "\n");
1263 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1265 b.print("// Entry parent listing\n");
1266 for(CacheEntry entry : workarea.keySet()) {
1267 int status = flagMap.get(entry);
1268 boolean hasListener = hasListenerAfterDisposing(entry);
1269 b.print("Q " + entry.toString());
1271 b.print(" (L" + status + ")");
1274 b.print(" (" + status + ")");
1277 for(CacheEntry parent : workarea.get(entry)) {
1278 Collection<CacheEntry> inv = inverse.get(parent);
1280 inv = new ArrayList<CacheEntry>();
1281 inverse.put(parent, inv);
1284 b.print(" " + parent.toString());
1289 b.print("// Entry child listing\n");
1290 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1291 b.print("C " + entry.getKey().toString());
1293 for(CacheEntry child : entry.getValue()) {
1294 Integer h = hist.get(child);
1298 b.print(" <no children>");
1300 b.print(" " + child.toString());
1305 b.print("#queries: " + workarea.keySet().size() + "\n");
1306 b.print("#listeners: " + listeners + "\n");
1310 return "Dumped " + workarea.keySet().size() + " queries.";
1316 public CacheEntry caller;
1318 public CacheEntry entry;
1322 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
1323 this.caller = caller;
1325 this.indent = indent;
1330 boolean removeQuery(CacheEntry entry) {
1332 // This entry has been removed before. No need to do anything here.
1333 if(entry.isDiscarded()) return false;
1335 assert (!entry.isDiscarded());
1337 Query query = entry.getQuery();
1339 query.removeEntry(this);
1344 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1355 * @return true if this entry is being listened
1357 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1361 CacheEntry entry = e.entry;
1363 System.err.println("updateQuery " + entry);
1366 * If the dependency graph forms a DAG, some entries are inserted in the
1367 * todo list many times. They only need to be processed once though.
1369 if (entry.isDiscarded()) {
1370 if (Development.DEVELOPMENT) {
1371 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1372 System.out.print("D");
1373 for (int i = 0; i < e.indent; i++)
1374 System.out.print(" ");
1375 System.out.println(entry.getQuery());
1378 // System.err.println(" => DISCARDED");
1382 if (entry.isRefuted()) {
1383 if (Development.DEVELOPMENT) {
1384 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1385 System.out.print("R");
1386 for (int i = 0; i < e.indent; i++)
1387 System.out.print(" ");
1388 System.out.println(entry.getQuery());
1394 if (entry.isExcepted()) {
1395 if (Development.DEVELOPMENT) {
1396 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1397 System.out.print("E");
1402 if (entry.isPending()) {
1403 if (Development.DEVELOPMENT) {
1404 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1405 System.out.print("P");
1412 if (Development.DEVELOPMENT) {
1413 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1414 System.out.print("U ");
1415 for (int i = 0; i < e.indent; i++)
1416 System.out.print(" ");
1417 System.out.print(entry.getQuery());
1421 Query query = entry.getQuery();
1422 int type = query.type();
1424 boolean hasListener = hasListener(entry);
1426 if (Development.DEVELOPMENT) {
1427 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1428 if(hasListener(entry)) {
1429 System.out.println(" (L)");
1431 System.out.println("");
1436 if(entry.isPending() || entry.isExcepted()) {
1439 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1441 immediates.put(entry, entry);
1456 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1458 immediates.put(entry, entry);
1472 // System.err.println(" => FOO " + type);
1475 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1476 if(entries != null) {
1477 for (ListenerEntry le : entries) {
1478 scheduleListener(le);
1483 // If invalid, update parents
1484 if (type == RequestFlags.INVALIDATE) {
1485 updateParents(e.indent, entry, todo);
1492 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
1494 Iterable<CacheEntry> oldParents = entry.getParents(this);
1495 for (CacheEntry parent : oldParents) {
1496 // System.err.println("updateParents " + entry + " => " + parent);
1497 if(!parent.isDiscarded())
1498 todo.push(new UpdateEntry(entry, parent, indent + 2));
1503 private boolean pruneListener(ListenerEntry entry) {
1504 if (entry.base.isDisposed()) {
1505 removeListener(entry);
1513 * @param av1 an array (guaranteed)
1514 * @param av2 any object
1515 * @return <code>true</code> if the two arrays are equal
1517 private final boolean arrayEquals(Object av1, Object av2) {
1520 Class<?> c1 = av1.getClass().getComponentType();
1521 Class<?> c2 = av2.getClass().getComponentType();
1522 if (c2 == null || !c1.equals(c2))
1524 boolean p1 = c1.isPrimitive();
1525 boolean p2 = c2.isPrimitive();
1529 return Arrays.equals((Object[]) av1, (Object[]) av2);
1530 if (boolean.class.equals(c1))
1531 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1532 else if (byte.class.equals(c1))
1533 return Arrays.equals((byte[]) av1, (byte[]) av2);
1534 else if (int.class.equals(c1))
1535 return Arrays.equals((int[]) av1, (int[]) av2);
1536 else if (long.class.equals(c1))
1537 return Arrays.equals((long[]) av1, (long[]) av2);
1538 else if (float.class.equals(c1))
1539 return Arrays.equals((float[]) av1, (float[]) av2);
1540 else if (double.class.equals(c1))
1541 return Arrays.equals((double[]) av1, (double[]) av2);
1542 throw new RuntimeException("??? Contact application querySupport.");
1547 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1551 Query query = entry.getQuery();
1553 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
1555 entry.prepareRecompute(querySupport);
1557 ReadGraphImpl parentGraph = graph.withParent(entry);
1559 query.recompute(parentGraph);
1561 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1563 Object newValue = entry.getResult();
1565 if (ListenerEntry.NO_VALUE == oldValue) {
1566 if(DebugPolicy.CHANGES) {
1567 System.out.println("C " + query);
1568 System.out.println("- " + oldValue);
1569 System.out.println("- " + newValue);
1574 boolean changed = false;
1576 if (newValue != null) {
1577 if (newValue.getClass().isArray()) {
1578 changed = !arrayEquals(newValue, oldValue);
1580 changed = !newValue.equals(oldValue);
1583 changed = (oldValue != null);
1585 if(DebugPolicy.CHANGES && changed) {
1586 System.out.println("C " + query);
1587 System.out.println("- " + oldValue);
1588 System.out.println("- " + newValue);
1591 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1593 } catch (Throwable t) {
1595 Logger.defaultLogError(t);
1597 return ListenerEntry.NO_VALUE;
1603 public boolean hasScheduledUpdates() {
1604 return !scheduledListeners.isEmpty();
1607 public void performScheduledUpdates(WriteGraphImpl graph) {
1610 assert (!cache.collecting);
1611 assert (!firingListeners);
1613 firingListeners = true;
1617 // Performing may cause further events to be scheduled.
1618 while (!scheduledListeners.isEmpty()) {
1621 // graph.state.barrier.inc();
1623 // Clone current events to make new entries possible during
1625 THashSet<ListenerEntry> entries = scheduledListeners;
1626 scheduledListeners = new THashSet<ListenerEntry>();
1628 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
1630 for (ListenerEntry listenerEntry : entries) {
1632 if (pruneListener(listenerEntry)) {
1633 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
1637 final CacheEntry entry = listenerEntry.entry;
1638 assert (entry != null);
1640 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
1642 if (newValue != ListenerEntry.NOT_CHANGED) {
1643 if(DebugPolicy.LISTENER)
1644 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
1645 schedule.add(listenerEntry);
1646 listenerEntry.setLastKnown(entry.getResult());
1651 for(ListenerEntry listenerEntry : schedule) {
1652 final CacheEntry entry = listenerEntry.entry;
1653 if(DebugPolicy.LISTENER)
1654 System.out.println("Firing " + listenerEntry.procedure);
1656 if(DebugPolicy.LISTENER)
1657 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
1658 entry.performFromCache(graph, listenerEntry.procedure);
1659 } catch (Throwable t) {
1660 t.printStackTrace();
1664 // graph.state.barrier.dec();
1665 // graph.waitAsync(null);
1666 // graph.state.barrier.assertReady();
1671 firingListeners = false;
1678 * @return true if this entry still has listeners
1680 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1682 assert (!cache.collecting);
1686 boolean hadListeners = false;
1687 boolean listenersUnknown = false;
1691 assert(entry != null);
1692 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1693 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1694 todo.add(new UpdateEntry(null, entry, 0));
1698 // Walk the tree and collect immediate updates
1699 while (!todo.isEmpty()) {
1700 UpdateEntry e = todo.pop();
1701 hadListeners |= updateQuery(e, todo, immediates);
1704 if(immediates.isEmpty()) break;
1706 // Evaluate all immediate updates and collect parents to update
1707 for(CacheEntry immediate : immediates.values()) {
1709 if(immediate.isDiscarded()) {
1713 if(immediate.isExcepted()) {
1715 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1716 if (newValue != ListenerEntry.NOT_CHANGED)
1717 updateParents(0, immediate, todo);
1721 Object oldValue = immediate.getResult();
1722 Object newValue = compareTo(graph, immediate, oldValue);
1724 if (newValue != ListenerEntry.NOT_CHANGED) {
1725 updateParents(0, immediate, todo);
1727 // If not changed, keep the old value
1728 immediate.setResult(oldValue);
1729 listenersUnknown = true;
1739 } catch (Throwable t) {
1740 Logger.defaultLogError(t);
1746 return hadListeners | listenersUnknown;
1750 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1751 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1752 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1753 // Maybe use a mutex from util.concurrent?
1754 private Object primitiveUpdateLock = new Object();
1755 private THashSet scheduledPrimitiveUpdates = new THashSet();
1757 public void performDirtyUpdates(final ReadGraphImpl graph) {
1759 cache.dirty = false;
1762 if (Development.DEVELOPMENT) {
1763 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1764 System.err.println("== Query update ==");
1768 // Special case - one statement
1769 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1771 long arg0 = scheduledObjectUpdates.getFirst();
1773 final int subject = (int)(arg0 >>> 32);
1774 final int predicate = (int)(arg0 & 0xffffffff);
1776 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1777 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1778 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1780 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1781 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1782 if(principalTypes != null) update(graph, principalTypes);
1783 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1784 if(types != null) update(graph, types);
1787 if(predicate == subrelationOf) {
1788 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1789 if(superRelations != null) update(graph, superRelations);
1792 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1793 if(dp != null) update(graph, dp);
1794 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1795 if(os != null) update(graph, os);
1797 scheduledObjectUpdates.clear();
1802 // Special case - one value
1803 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1805 int arg0 = scheduledValueUpdates.getFirst();
1807 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1808 if(valueQuery != null) update(graph, valueQuery);
1810 scheduledValueUpdates.clear();
1815 final TIntHashSet predicates = new TIntHashSet();
1816 final TIntHashSet orderedSets = new TIntHashSet();
1818 THashSet primitiveUpdates;
1819 synchronized (primitiveUpdateLock) {
1820 primitiveUpdates = scheduledPrimitiveUpdates;
1821 scheduledPrimitiveUpdates = new THashSet();
1824 primitiveUpdates.forEach(new TObjectProcedure() {
1827 public boolean execute(Object arg0) {
1829 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1830 if (query != null) {
1831 boolean listening = update(graph, query);
1832 if (!listening && !query.hasParents()) {
1833 cache.externalReadEntryMap.remove(arg0);
1842 scheduledValueUpdates.forEach(new TIntProcedure() {
1845 public boolean execute(int arg0) {
1846 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1847 if(valueQuery != null) update(graph, valueQuery);
1853 scheduledInvalidates.forEach(new TIntProcedure() {
1856 public boolean execute(int resource) {
1858 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1859 if(valueQuery != null) update(graph, valueQuery);
1861 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1862 if(principalTypes != null) update(graph, principalTypes);
1863 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1864 if(types != null) update(graph, types);
1866 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1867 if(superRelations != null) update(graph, superRelations);
1869 predicates.add(resource);
1876 scheduledObjectUpdates.forEach(new TLongProcedure() {
1879 public boolean execute(long arg0) {
1881 final int subject = (int)(arg0 >>> 32);
1882 final int predicate = (int)(arg0 & 0xffffffff);
1884 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1885 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1886 if(principalTypes != null) update(graph, principalTypes);
1887 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1888 if(types != null) update(graph, types);
1891 if(predicate == subrelationOf) {
1892 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1893 if(superRelations != null) update(graph, superRelations);
1896 predicates.add(subject);
1897 orderedSets.add(predicate);
1905 predicates.forEach(new TIntProcedure() {
1908 public boolean execute(final int subject) {
1910 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1911 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1912 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1914 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1915 if(entry != null) update(graph, entry);
1923 orderedSets.forEach(new TIntProcedure() {
1926 public boolean execute(int orderedSet) {
1928 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1929 if(entry != null) update(graph, entry);
1937 // for (Integer subject : predicates) {
1938 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
1939 // if(entry != null) update(graph, entry);
1943 if (Development.DEVELOPMENT) {
1944 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1945 System.err.println("== Query update ends ==");
1949 scheduledValueUpdates.clear();
1950 scheduledObjectUpdates.clear();
1951 scheduledInvalidates.clear();
1955 public void updateValue(final int resource) {
1956 scheduledValueUpdates.add(resource);
1960 public void updateStatements(final int resource, final int predicate) {
1961 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
1965 private int lastInvalidate = 0;
1967 public void invalidateResource(final int resource) {
1968 if(lastInvalidate == resource) return;
1969 scheduledValueUpdates.add(resource);
1970 lastInvalidate = resource;
1974 public void updatePrimitive(final ExternalRead primitive) {
1976 // External reads may be updated from arbitrary threads.
1977 // Synchronize to prevent race-conditions.
1978 synchronized (primitiveUpdateLock) {
1979 scheduledPrimitiveUpdates.add(primitive);
1981 querySupport.dirtyPrimitives();
1986 public synchronized String toString() {
1987 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
1991 protected void doDispose() {
1993 for(int index = 0; index < THREADS; index++) {
1994 executors[index].dispose();
1998 for(int i=0;i<100;i++) {
2000 boolean alive = false;
2001 for(int index = 0; index < THREADS; index++) {
2002 alive |= executors[index].isAlive();
2007 } catch (InterruptedException e) {
2008 Logger.defaultLogError(e);
2013 // Then start interrupting
2014 for(int i=0;i<100;i++) {
2016 boolean alive = false;
2017 for(int index = 0; index < THREADS; index++) {
2018 alive |= executors[index].isAlive();
2021 for(int index = 0; index < THREADS; index++) {
2022 executors[index].interrupt();
2026 // // Then just destroy
2027 // for(int index = 0; index < THREADS; index++) {
2028 // executors[index].destroy();
2031 for(int index = 0; index < THREADS; index++) {
2033 executors[index].join(5000);
2034 } catch (InterruptedException e) {
2035 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2037 executors[index] = null;
2042 public int getHits() {
2046 public int getMisses() {
2047 return cache.misses;
2050 public int getSize() {
2054 public Set<Long> getReferencedClusters() {
2055 HashSet<Long> result = new HashSet<Long>();
2056 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
2057 Objects query = (Objects) entry.getQuery();
2058 result.add(querySupport.getClusterId(query.r1()));
2060 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
2061 DirectPredicates query = (DirectPredicates) entry.getQuery();
2062 result.add(querySupport.getClusterId(query.id));
2064 for (CacheEntry entry : cache.valueQueryMap.values()) {
2065 ValueQuery query = (ValueQuery) entry.getQuery();
2066 result.add(querySupport.getClusterId(query.id));
2071 public void assertDone() {
2074 CacheCollectionResult allCaches(CacheCollectionResult result) {
2076 return cache.allCaches(result);
2080 public void printDiagnostics() {
2083 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
2084 querySupport.requestCluster(graph, clusterId, runnable);
2087 public int clean() {
2088 collector.collect(0, Integer.MAX_VALUE);
2092 public void clean(final Collection<ExternalRead<?>> requests) {
2093 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
2094 Iterator<ExternalRead<?>> iterator = requests.iterator();
2096 public CacheCollectionResult allCaches() {
2097 throw new UnsupportedOperationException();
2100 public CacheEntryBase iterate(int level) {
2101 if(iterator.hasNext()) {
2102 ExternalRead<?> request = iterator.next();
2103 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2104 if (entry != null) return entry;
2105 else return iterate(level);
2107 iterator = requests.iterator();
2112 public void remove() {
2113 throw new UnsupportedOperationException();
2116 public void setLevel(CacheEntryBase entry, int level) {
2117 throw new UnsupportedOperationException();
2120 public Collection<CacheEntry> getRootList() {
2121 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
2122 for (ExternalRead<?> request : requests) {
2123 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2130 public int getCurrentSize() {
2134 public int calculateCurrentSize() {
2135 // This tells the collector to attempt collecting everything.
2136 return Integer.MAX_VALUE;
2139 public boolean start(boolean flush) {
2143 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
2146 public void scanPending() {
2148 cache.scanPending();
2152 public ReadGraphImpl graphForVirtualRequest() {
2153 return ReadGraphImpl.createAsync(this);
2157 private HashMap<Resource, Class<?>> builtinValues;
2159 public Class<?> getBuiltinValue(Resource r) {
2160 if(builtinValues == null) initBuiltinValues();
2161 return builtinValues.get(r);
2164 Exception callerException = null;
2166 public interface AsyncBarrier {
2169 // public void inc(String debug);
2170 // public void dec(String debug);
2173 // final public QueryProcessor processor;
2174 // final public QuerySupport support;
2176 // boolean disposed = false;
2178 private void initBuiltinValues() {
2180 Layer0 b = getSession().peekService(Layer0.class);
2181 if(b == null) return;
2183 builtinValues = new HashMap<Resource, Class<?>>();
2185 builtinValues.put(b.String, String.class);
2186 builtinValues.put(b.Double, Double.class);
2187 builtinValues.put(b.Float, Float.class);
2188 builtinValues.put(b.Long, Long.class);
2189 builtinValues.put(b.Integer, Integer.class);
2190 builtinValues.put(b.Byte, Byte.class);
2191 builtinValues.put(b.Boolean, Boolean.class);
2193 builtinValues.put(b.StringArray, String[].class);
2194 builtinValues.put(b.DoubleArray, double[].class);
2195 builtinValues.put(b.FloatArray, float[].class);
2196 builtinValues.put(b.LongArray, long[].class);
2197 builtinValues.put(b.IntegerArray, int[].class);
2198 builtinValues.put(b.ByteArray, byte[].class);
2199 builtinValues.put(b.BooleanArray, boolean[].class);
2203 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
2205 // if (null == provider2) {
2206 // this.processor = null;
2210 // this.processor = provider2;
2211 // support = provider2.getCore();
2212 // initBuiltinValues();
2216 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2217 // return new ReadGraphSupportImpl(impl.processor);
2221 final public Session getSession() {
2225 final public ResourceSupport getResourceSupport() {
2226 return resourceSupport;
2230 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2232 throw new UnsupportedOperationException();
2234 // assert(subject != null);
2235 // assert(procedure != null);
2237 // final ListenerBase listener = getListenerBase(procedure);
2239 // IntProcedure ip = new IntProcedure() {
2241 // AtomicBoolean first = new AtomicBoolean(true);
2244 // public void execute(ReadGraphImpl graph, int i) {
2246 // if(first.get()) {
2247 // procedure.execute(graph, querySupport.getResource(i));
2249 // procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2251 // } catch (Throwable t2) {
2252 // Logger.defaultLogError(t2);
2257 // public void finished(ReadGraphImpl graph) {
2259 // if(first.compareAndSet(true, false)) {
2260 // procedure.finished(graph);
2261 //// impl.state.barrier.dec(this);
2263 // procedure.finished(impl.newRestart(graph));
2266 // } catch (Throwable t2) {
2267 // Logger.defaultLogError(t2);
2272 // public void exception(ReadGraphImpl graph, Throwable t) {
2274 // if(first.compareAndSet(true, false)) {
2275 // procedure.exception(graph, t);
2277 // procedure.exception(impl.newRestart(graph), t);
2279 // } catch (Throwable t2) {
2280 // Logger.defaultLogError(t2);
2286 // int sId = querySupport.getId(subject);
2289 // QueryCache.runnerPredicates(impl, sId, impl.parent, listener, ip);
2290 // } catch (DatabaseException e) {
2291 // Logger.defaultLogError(e);
2297 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2299 throw new UnsupportedOperationException();
2301 // assert(subject != null);
2302 // assert(procedure != null);
2304 // final ListenerBase listener = getListenerBase(procedure);
2307 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2310 // public void execute(ReadGraphImpl graph, int i) {
2312 // procedure.execute(querySupport.getResource(i));
2313 // } catch (Throwable t2) {
2314 // Logger.defaultLogError(t2);
2319 // public void finished(ReadGraphImpl graph) {
2321 // procedure.finished();
2322 // } catch (Throwable t2) {
2323 // Logger.defaultLogError(t2);
2325 //// impl.state.barrier.dec();
2329 // public void exception(ReadGraphImpl graph, Throwable t) {
2331 // procedure.exception(t);
2332 // } catch (Throwable t2) {
2333 // Logger.defaultLogError(t2);
2335 //// impl.state.barrier.dec();
2339 // } catch (DatabaseException e) {
2340 // Logger.defaultLogError(e);
2346 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2347 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2351 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2352 final Resource predicate, final MultiProcedure<Statement> procedure) {
2354 assert(subject != null);
2355 assert(predicate != null);
2356 assert(procedure != null);
2358 final ListenerBase listener = getListenerBase(procedure);
2360 // impl.state.barrier.inc();
2363 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2366 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2368 procedure.execute(querySupport.getStatement(s, p, o));
2369 } catch (Throwable t2) {
2370 Logger.defaultLogError(t2);
2375 public void finished(ReadGraphImpl graph) {
2377 procedure.finished();
2378 } catch (Throwable t2) {
2379 Logger.defaultLogError(t2);
2381 // impl.state.barrier.dec();
2385 public void exception(ReadGraphImpl graph, Throwable t) {
2387 procedure.exception(t);
2388 } catch (Throwable t2) {
2389 Logger.defaultLogError(t2);
2391 // impl.state.barrier.dec();
2395 } catch (DatabaseException e) {
2396 Logger.defaultLogError(e);
2402 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2403 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2405 assert(subject != null);
2406 assert(predicate != null);
2407 assert(procedure != null);
2409 final ListenerBase listener = getListenerBase(procedure);
2411 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2413 boolean first = true;
2416 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2419 procedure.execute(graph, querySupport.getStatement(s, p, o));
2421 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2423 } catch (Throwable t2) {
2424 Logger.defaultLogError(t2);
2429 public void finished(ReadGraphImpl graph) {
2434 procedure.finished(graph);
2435 // impl.state.barrier.dec(this);
2437 procedure.finished(impl.newRestart(graph));
2439 } catch (Throwable t2) {
2440 Logger.defaultLogError(t2);
2446 public void exception(ReadGraphImpl graph, Throwable t) {
2451 procedure.exception(graph, t);
2452 // impl.state.barrier.dec(this);
2454 procedure.exception(impl.newRestart(graph), t);
2456 } catch (Throwable t2) {
2457 Logger.defaultLogError(t2);
2464 int sId = querySupport.getId(subject);
2465 int pId = querySupport.getId(predicate);
2467 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2468 // else impl.state.barrier.inc(null, null);
2471 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2472 } catch (DatabaseException e) {
2473 Logger.defaultLogError(e);
2479 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2480 final Resource predicate, final StatementProcedure procedure) {
2482 assert(subject != null);
2483 assert(predicate != null);
2484 assert(procedure != null);
2486 final ListenerBase listener = getListenerBase(procedure);
2488 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2490 boolean first = true;
2493 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2496 procedure.execute(graph, s, p, o);
2498 procedure.execute(impl.newRestart(graph), s, p, o);
2500 } catch (Throwable t2) {
2501 Logger.defaultLogError(t2);
2506 public void finished(ReadGraphImpl graph) {
2511 procedure.finished(graph);
2512 // impl.state.barrier.dec(this);
2514 procedure.finished(impl.newRestart(graph));
2516 } catch (Throwable t2) {
2517 Logger.defaultLogError(t2);
2523 public void exception(ReadGraphImpl graph, Throwable t) {
2528 procedure.exception(graph, t);
2529 // impl.state.barrier.dec(this);
2531 procedure.exception(impl.newRestart(graph), t);
2533 } catch (Throwable t2) {
2534 Logger.defaultLogError(t2);
2541 int sId = querySupport.getId(subject);
2542 int pId = querySupport.getId(predicate);
2544 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2545 // else impl.state.barrier.inc(null, null);
2548 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2549 } catch (DatabaseException e) {
2550 Logger.defaultLogError(e);
2556 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2558 assert(subject != null);
2559 assert(predicate != null);
2560 assert(procedure != null);
2562 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2564 private Set<Statement> current = null;
2565 private Set<Statement> run = new HashSet<Statement>();
2568 public void execute(AsyncReadGraph graph, Statement result) {
2570 boolean found = false;
2572 if(current != null) {
2574 found = current.remove(result);
2578 if(!found) procedure.add(graph, result);
2585 public void finished(AsyncReadGraph graph) {
2587 if(current != null) {
2588 for(Statement r : current) procedure.remove(graph, r);
2593 run = new HashSet<Statement>();
2598 public void exception(AsyncReadGraph graph, Throwable t) {
2599 procedure.exception(graph, t);
2603 public boolean isDisposed() {
2604 return procedure.isDisposed();
2612 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2613 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2615 assert(subject != null);
2616 assert(predicate != null);
2617 assert(procedure != null);
2619 final ListenerBase listener = getListenerBase(procedure);
2621 // impl.state.barrier.inc();
2624 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2627 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2629 procedure.execute(graph, querySupport.getStatement(s, p, o));
2630 } catch (Throwable t2) {
2631 Logger.defaultLogError(t2);
2636 public void finished(ReadGraphImpl graph) {
2638 procedure.finished(graph);
2639 } catch (Throwable t2) {
2640 Logger.defaultLogError(t2);
2642 // impl.state.barrier.dec();
2646 public void exception(ReadGraphImpl graph, Throwable t) {
2648 procedure.exception(graph, t);
2649 } catch (Throwable t2) {
2650 Logger.defaultLogError(t2);
2652 // impl.state.barrier.dec();
2656 } catch (DatabaseException e) {
2657 Logger.defaultLogError(e);
2662 private static ListenerBase getListenerBase(Object procedure) {
2663 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2668 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2670 assert(subject != null);
2671 assert(predicate != null);
2672 assert(procedure != null);
2674 final ListenerBase listener = getListenerBase(procedure);
2676 // impl.state.barrier.inc();
2679 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2682 public void execute(ReadGraphImpl graph, int i) {
2684 procedure.execute(querySupport.getResource(i));
2685 } catch (Throwable t2) {
2686 Logger.defaultLogError(t2);
2691 public void finished(ReadGraphImpl graph) {
2693 procedure.finished();
2694 } catch (Throwable t2) {
2695 Logger.defaultLogError(t2);
2697 // impl.state.barrier.dec();
2701 public void exception(ReadGraphImpl graph, Throwable t) {
2702 System.out.println("forEachObject exception " + t);
2704 procedure.exception(t);
2705 } catch (Throwable t2) {
2706 Logger.defaultLogError(t2);
2708 // impl.state.barrier.dec();
2712 } catch (DatabaseException e) {
2713 Logger.defaultLogError(e);
2719 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2721 throw new UnsupportedOperationException();
2723 // assert(subject != null);
2724 // assert(procedure != null);
2726 // final ListenerBase listener = getListenerBase(procedure);
2728 // MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);
2730 // int sId = querySupport.getId(subject);
2733 // QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, proc);
2734 // } catch (DatabaseException e) {
2735 // Logger.defaultLogError(e);
2741 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {
2743 assert(subject != null);
2744 assert(procedure != null);
2746 final ListenerBase listener = getListenerBase(procedure);
2748 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2753 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2755 assert(subject != null);
2756 assert(procedure != null);
2758 final ListenerBase listener = getListenerBase(procedure);
2760 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2764 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2767 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2769 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2771 private Resource single = null;
2774 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2775 if(single == null) {
2778 single = INVALID_RESOURCE;
2783 public synchronized void finished(AsyncReadGraph graph) {
2784 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2785 else procedure.execute(graph, single);
2789 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2790 procedure.exception(graph, throwable);
2797 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2799 final int sId = querySupport.getId(subject);
2800 final int pId = querySupport.getId(predicate);
2803 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2804 } catch (DatabaseException e) {
2805 Logger.defaultLogError(e);
2810 static class Runner2Procedure implements IntProcedure {
2812 public int single = 0;
2813 public Throwable t = null;
2815 public void clear() {
2821 public void execute(ReadGraphImpl graph, int i) {
2822 if(single == 0) single = i;
2827 public void finished(ReadGraphImpl graph) {
2828 if(single == -1) single = 0;
2832 public void exception(ReadGraphImpl graph, Throwable throwable) {
2837 public int get() throws DatabaseException {
2839 if(t instanceof DatabaseException) throw (DatabaseException)t;
2840 else throw new DatabaseException(t);
2847 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2849 final int sId = querySupport.getId(subject);
2850 final int pId = querySupport.getId(predicate);
2852 Runner2Procedure proc = new Runner2Procedure();
2853 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2858 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2860 assert(subject != null);
2861 assert(predicate != null);
2863 final ListenerBase listener = getListenerBase(procedure);
2865 if(impl.parent != null || listener != null) {
2867 IntProcedure ip = new IntProcedure() {
2869 AtomicBoolean first = new AtomicBoolean(true);
2872 public void execute(ReadGraphImpl graph, int i) {
2875 procedure.execute(impl, querySupport.getResource(i));
2877 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2879 } catch (Throwable t2) {
2880 Logger.defaultLogError(t2);
2886 public void finished(ReadGraphImpl graph) {
2888 if(first.compareAndSet(true, false)) {
2889 procedure.finished(impl);
2890 // impl.state.barrier.dec(this);
2892 procedure.finished(impl.newRestart(graph));
2894 } catch (Throwable t2) {
2895 Logger.defaultLogError(t2);
2900 public void exception(ReadGraphImpl graph, Throwable t) {
2902 procedure.exception(graph, t);
2903 } catch (Throwable t2) {
2904 Logger.defaultLogError(t2);
2906 // impl.state.barrier.dec(this);
2910 public String toString() {
2911 return "forEachObject with " + procedure;
2916 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2917 // else impl.state.barrier.inc(null, null);
2919 forEachObject(impl, subject, predicate, listener, ip);
2923 IntProcedure ip = new IntProcedure() {
2926 public void execute(ReadGraphImpl graph, int i) {
2927 procedure.execute(graph, querySupport.getResource(i));
2931 public void finished(ReadGraphImpl graph) {
2932 procedure.finished(graph);
2936 public void exception(ReadGraphImpl graph, Throwable t) {
2937 procedure.exception(graph, t);
2941 public String toString() {
2942 return "forEachObject with " + procedure;
2947 forEachObject(impl, subject, predicate, listener, ip);
2954 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2956 assert(subject != null);
2957 assert(predicate != null);
2958 assert(procedure != null);
2960 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2962 private Set<Resource> current = null;
2963 private Set<Resource> run = new HashSet<Resource>();
2966 public void execute(AsyncReadGraph graph, Resource result) {
2968 boolean found = false;
2970 if(current != null) {
2972 found = current.remove(result);
2976 if(!found) procedure.add(graph, result);
2983 public void finished(AsyncReadGraph graph) {
2985 if(current != null) {
2986 for(Resource r : current) procedure.remove(graph, r);
2991 run = new HashSet<Resource>();
2996 public boolean isDisposed() {
2997 return procedure.isDisposed();
3001 public void exception(AsyncReadGraph graph, Throwable t) {
3002 procedure.exception(graph, t);
3006 public String toString() {
3007 return "forObjectSet " + procedure;
3015 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3017 assert(subject != null);
3018 assert(procedure != null);
3020 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
3022 private Set<Resource> current = null;
3023 private Set<Resource> run = new HashSet<Resource>();
3026 public void execute(AsyncReadGraph graph, Resource result) {
3028 boolean found = false;
3030 if(current != null) {
3032 found = current.remove(result);
3036 if(!found) procedure.add(graph, result);
3043 public void finished(AsyncReadGraph graph) {
3045 if(current != null) {
3046 for(Resource r : current) procedure.remove(graph, r);
3051 run = new HashSet<Resource>();
3056 public boolean isDisposed() {
3057 return procedure.isDisposed();
3061 public void exception(AsyncReadGraph graph, Throwable t) {
3062 procedure.exception(graph, t);
3066 public String toString() {
3067 return "forPredicateSet " + procedure;
3075 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3077 assert(subject != null);
3078 assert(procedure != null);
3080 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
3082 private Set<Resource> current = null;
3083 private Set<Resource> run = new HashSet<Resource>();
3086 public void execute(AsyncReadGraph graph, Resource result) {
3088 boolean found = false;
3090 if(current != null) {
3092 found = current.remove(result);
3096 if(!found) procedure.add(graph, result);
3103 public void finished(AsyncReadGraph graph) {
3105 if(current != null) {
3106 for(Resource r : current) procedure.remove(graph, r);
3111 run = new HashSet<Resource>();
3116 public boolean isDisposed() {
3117 return procedure.isDisposed();
3121 public void exception(AsyncReadGraph graph, Throwable t) {
3122 procedure.exception(graph, t);
3126 public String toString() {
3127 return "forPrincipalTypeSet " + procedure;
3135 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3137 assert(subject != null);
3138 assert(predicate != null);
3139 assert(procedure != null);
3141 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3143 private Set<Resource> current = null;
3144 private Set<Resource> run = new HashSet<Resource>();
3147 public void execute(AsyncReadGraph graph, Resource result) {
3149 boolean found = false;
3151 if(current != null) {
3153 found = current.remove(result);
3157 if(!found) procedure.add(graph, result);
3164 public void finished(AsyncReadGraph graph) {
3166 if(current != null) {
3167 for(Resource r : current) procedure.remove(graph, r);
3172 run = new HashSet<Resource>();
3177 public boolean isDisposed() {
3178 return procedure.isDisposed();
3182 public void exception(AsyncReadGraph graph, Throwable t) {
3183 procedure.exception(graph, t);
3187 public String toString() {
3188 return "forObjectSet " + procedure;
3196 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3198 assert(subject != null);
3199 assert(predicate != null);
3200 assert(procedure != null);
3202 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3204 private Set<Statement> current = null;
3205 private Set<Statement> run = new HashSet<Statement>();
3208 public void execute(AsyncReadGraph graph, Statement result) {
3210 boolean found = false;
3212 if(current != null) {
3214 found = current.remove(result);
3218 if(!found) procedure.add(graph, result);
3225 public void finished(AsyncReadGraph graph) {
3227 if(current != null) {
3228 for(Statement s : current) procedure.remove(graph, s);
3233 run = new HashSet<Statement>();
3238 public boolean isDisposed() {
3239 return procedure.isDisposed();
3243 public void exception(AsyncReadGraph graph, Throwable t) {
3244 procedure.exception(graph, t);
3248 public String toString() {
3249 return "forStatementSet " + procedure;
3257 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
3258 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3260 assert(subject != null);
3261 assert(predicate != null);
3262 assert(procedure != null);
3264 final ListenerBase listener = getListenerBase(procedure);
3267 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3270 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3272 procedure.execute(graph, querySupport.getResource(o));
3273 } catch (Throwable t2) {
3274 Logger.defaultLogError(t2);
3279 public void finished(ReadGraphImpl graph) {
3281 procedure.finished(graph);
3282 } catch (Throwable t2) {
3283 Logger.defaultLogError(t2);
3285 // impl.state.barrier.dec();
3289 public void exception(ReadGraphImpl graph, Throwable t) {
3291 procedure.exception(graph, t);
3292 } catch (Throwable t2) {
3293 Logger.defaultLogError(t2);
3295 // impl.state.barrier.dec();
3299 } catch (DatabaseException e) {
3300 Logger.defaultLogError(e);
3306 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3308 assert(subject != null);
3309 assert(procedure != null);
3311 final ListenerBase listener = getListenerBase(procedure);
3313 IntProcedure ip = new IntProcedure() {
3316 public void execute(ReadGraphImpl graph, int i) {
3318 procedure.execute(graph, querySupport.getResource(i));
3319 } catch (Throwable t2) {
3320 Logger.defaultLogError(t2);
3325 public void finished(ReadGraphImpl graph) {
3327 procedure.finished(graph);
3328 } catch (Throwable t2) {
3329 Logger.defaultLogError(t2);
3331 // impl.state.barrier.dec(this);
3335 public void exception(ReadGraphImpl graph, Throwable t) {
3337 procedure.exception(graph, t);
3338 } catch (Throwable t2) {
3339 Logger.defaultLogError(t2);
3341 // impl.state.barrier.dec(this);
3346 int sId = querySupport.getId(subject);
3348 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3349 // else impl.state.barrier.inc(null, null);
3352 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3353 } catch (DatabaseException e) {
3354 Logger.defaultLogError(e);
3360 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3362 assert(subject != null);
3363 assert(procedure != null);
3365 final ListenerBase listener = getListenerBase(procedure);
3367 // impl.state.barrier.inc();
3370 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3373 public void execute(ReadGraphImpl graph, int i) {
3375 procedure.execute(querySupport.getResource(i));
3376 } catch (Throwable t2) {
3377 Logger.defaultLogError(t2);
3382 public void finished(ReadGraphImpl graph) {
3384 procedure.finished();
3385 } catch (Throwable t2) {
3386 Logger.defaultLogError(t2);
3388 // impl.state.barrier.dec();
3392 public void exception(ReadGraphImpl graph, Throwable t) {
3394 procedure.exception(t);
3395 } catch (Throwable t2) {
3396 Logger.defaultLogError(t2);
3398 // impl.state.barrier.dec();
3402 } catch (DatabaseException e) {
3403 Logger.defaultLogError(e);
3407 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3409 assert(subject != null);
3410 assert(procedure != null);
3412 final ListenerBase listener = getListenerBase(procedure);
3413 assert(listener == null);
3415 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3418 public void execute(final ReadGraphImpl graph, IntSet set) {
3419 procedure.execute(graph, set);
3423 public void exception(ReadGraphImpl graph, Throwable t) {
3424 procedure.exception(graph, t);
3429 int sId = querySupport.getId(subject);
3432 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3433 } catch (DatabaseException e) {
3434 Logger.defaultLogError(e);
3440 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3442 assert(subject != null);
3444 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3449 final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {
3451 assert(subject != null);
3452 assert(procedure != null);
3454 final ListenerBase listener = getListenerBase(procedure);
3455 assert(listener == null);
3459 QueryCache.runnerRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<RelationInfo>() {
3462 public void execute(final ReadGraphImpl graph, RelationInfo set) {
3463 procedure.execute(graph, set);
3467 public void exception(ReadGraphImpl graph, Throwable t) {
3468 procedure.exception(graph, t);
3472 } catch (DatabaseException e) {
3473 Logger.defaultLogError(e);
3479 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3481 assert(subject != null);
3482 assert(procedure != null);
3484 final ListenerBase listener = getListenerBase(procedure);
3487 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3489 AtomicBoolean first = new AtomicBoolean(true);
3492 public void execute(final ReadGraphImpl graph, IntSet set) {
3493 // final HashSet<Resource> result = new HashSet<Resource>();
3494 // set.forEach(new TIntProcedure() {
3497 // public boolean execute(int type) {
3498 // result.add(querySupport.getResource(type));
3504 if(first.compareAndSet(true, false)) {
3505 procedure.execute(graph, set);
3506 // impl.state.barrier.dec();
3508 procedure.execute(impl.newRestart(graph), set);
3510 } catch (Throwable t2) {
3511 Logger.defaultLogError(t2);
3516 public void exception(ReadGraphImpl graph, Throwable t) {
3518 if(first.compareAndSet(true, false)) {
3519 procedure.exception(graph, t);
3520 // impl.state.barrier.dec();
3522 procedure.exception(impl.newRestart(graph), t);
3524 } catch (Throwable t2) {
3525 Logger.defaultLogError(t2);
3530 } catch (DatabaseException e) {
3531 Logger.defaultLogError(e);
3537 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3539 assert(subject != null);
3540 assert(procedure != null);
3542 final ListenerBase listener = getListenerBase(procedure);
3544 IntProcedure ip = new IntProcedureAdapter() {
3547 public void execute(final ReadGraphImpl graph, int superRelation) {
3549 procedure.execute(graph, querySupport.getResource(superRelation));
3550 } catch (Throwable t2) {
3551 Logger.defaultLogError(t2);
3556 public void finished(final ReadGraphImpl graph) {
3558 procedure.finished(graph);
3559 } catch (Throwable t2) {
3560 Logger.defaultLogError(t2);
3562 // impl.state.barrier.dec(this);
3567 public void exception(ReadGraphImpl graph, Throwable t) {
3569 procedure.exception(graph, t);
3570 } catch (Throwable t2) {
3571 Logger.defaultLogError(t2);
3573 // impl.state.barrier.dec(this);
3578 int sId = querySupport.getId(subject);
3580 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3581 // else impl.state.barrier.inc(null, null);
3584 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3585 } catch (DatabaseException e) {
3586 Logger.defaultLogError(e);
3589 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3594 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3596 assert(subject != null);
3597 assert(procedure != null);
3599 final ListenerBase listener = getListenerBase(procedure);
3601 // impl.state.barrier.inc();
3603 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3608 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3610 assert(subject != null);
3611 assert(procedure != null);
3613 final ListenerBase listener = getListenerBase(procedure);
3615 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3618 public void execute(final ReadGraphImpl graph, IntSet set) {
3619 // final HashSet<Resource> result = new HashSet<Resource>();
3620 // set.forEach(new TIntProcedure() {
3623 // public boolean execute(int type) {
3624 // result.add(querySupport.getResource(type));
3630 procedure.execute(graph, set);
3631 } catch (Throwable t2) {
3632 Logger.defaultLogError(t2);
3634 // impl.state.barrier.dec(this);
3638 public void exception(ReadGraphImpl graph, Throwable t) {
3640 procedure.exception(graph, t);
3641 } catch (Throwable t2) {
3642 Logger.defaultLogError(t2);
3644 // impl.state.barrier.dec(this);
3649 int sId = querySupport.getId(subject);
3651 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3652 // else impl.state.barrier.inc(null, null);
3655 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3656 } catch (DatabaseException e) {
3657 Logger.defaultLogError(e);
3662 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3663 return getValue(impl, querySupport.getId(subject));
3666 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3667 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3671 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3673 assert(subject != null);
3674 assert(procedure != null);
3676 int sId = querySupport.getId(subject);
3678 // if(procedure != null) {
3680 final ListenerBase listener = getListenerBase(procedure);
3682 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3684 AtomicBoolean first = new AtomicBoolean(true);
3687 public void execute(ReadGraphImpl graph, byte[] result) {
3689 if(first.compareAndSet(true, false)) {
3690 procedure.execute(graph, result);
3691 // impl.state.barrier.dec(this);
3693 procedure.execute(impl.newRestart(graph), result);
3695 } catch (Throwable t2) {
3696 Logger.defaultLogError(t2);
3701 public void exception(ReadGraphImpl graph, Throwable t) {
3703 if(first.compareAndSet(true, false)) {
3704 procedure.exception(graph, t);
3705 // impl.state.barrier.dec(this);
3707 procedure.exception(impl.newRestart(graph), t);
3709 } catch (Throwable t2) {
3710 Logger.defaultLogError(t2);
3716 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3717 // else impl.state.barrier.inc(null, null);
3720 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3721 } catch (DatabaseException e) {
3722 throw new IllegalStateException("Internal error");
3727 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3731 // throw new IllegalStateException("Internal error");
3736 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3738 assert(subject != null);
3739 assert(procedure != null);
3741 final ListenerBase listener = getListenerBase(procedure);
3743 if(impl.parent != null || listener != null) {
3745 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3747 AtomicBoolean first = new AtomicBoolean(true);
3750 public void execute(ReadGraphImpl graph, byte[] result) {
3752 if(first.compareAndSet(true, false)) {
3753 procedure.execute(graph, result);
3754 // impl.state.barrier.dec(this);
3756 procedure.execute(impl.newRestart(graph), result);
3758 } catch (Throwable t2) {
3759 Logger.defaultLogError(t2);
3764 public void exception(ReadGraphImpl graph, Throwable t) {
3766 if(first.compareAndSet(true, false)) {
3767 procedure.exception(graph, t);
3768 // impl.state.barrier.dec(this);
3770 procedure.exception(impl.newRestart(graph), t);
3772 } catch (Throwable t2) {
3773 Logger.defaultLogError(t2);
3779 int sId = querySupport.getId(subject);
3781 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3782 // else impl.state.barrier.inc(null, null);
3785 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3786 } catch (DatabaseException e) {
3787 Logger.defaultLogError(e);
3792 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3795 public void execute(ReadGraphImpl graph, byte[] result) {
3797 procedure.execute(graph, result);
3802 public void exception(ReadGraphImpl graph, Throwable t) {
3804 procedure.exception(graph, t);
3810 int sId = querySupport.getId(subject);
3813 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3814 } catch (DatabaseException e) {
3815 Logger.defaultLogError(e);
3823 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3825 assert(relation != null);
3826 assert(procedure != null);
3828 final ListenerBase listener = getListenerBase(procedure);
3830 IntProcedure ip = new IntProcedure() {
3832 private int result = 0;
3834 final AtomicBoolean found = new AtomicBoolean(false);
3835 final AtomicBoolean done = new AtomicBoolean(false);
3838 public void finished(ReadGraphImpl graph) {
3840 // Shall fire exactly once!
3841 if(done.compareAndSet(false, true)) {
3844 procedure.exception(graph, new NoInverseException(""));
3845 // impl.state.barrier.dec(this);
3847 procedure.execute(graph, querySupport.getResource(result));
3848 // impl.state.barrier.dec(this);
3850 } catch (Throwable t) {
3851 Logger.defaultLogError(t);
3858 public void execute(ReadGraphImpl graph, int i) {
3860 if(found.compareAndSet(false, true)) {
3863 // Shall fire exactly once!
3864 if(done.compareAndSet(false, true)) {
3866 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3867 // impl.state.barrier.dec(this);
3868 } catch (Throwable t) {
3869 Logger.defaultLogError(t);
3877 public void exception(ReadGraphImpl graph, Throwable t) {
3878 // Shall fire exactly once!
3879 if(done.compareAndSet(false, true)) {
3881 procedure.exception(graph, t);
3882 // impl.state.barrier.dec(this);
3883 } catch (Throwable t2) {
3884 Logger.defaultLogError(t2);
3891 int sId = querySupport.getId(relation);
3893 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3894 // else impl.state.barrier.inc(null, null);
3897 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3898 } catch (DatabaseException e) {
3899 Logger.defaultLogError(e);
3905 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3908 assert(procedure != null);
3910 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3913 public void execute(ReadGraphImpl graph, Integer result) {
3915 procedure.execute(graph, querySupport.getResource(result));
3916 } catch (Throwable t2) {
3917 Logger.defaultLogError(t2);
3919 // impl.state.barrier.dec(this);
3923 public void exception(ReadGraphImpl graph, Throwable t) {
3926 procedure.exception(graph, t);
3927 } catch (Throwable t2) {
3928 Logger.defaultLogError(t2);
3930 // impl.state.barrier.dec(this);
3935 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3936 // else impl.state.barrier.inc(null, null);
3938 forResource(impl, id, impl.parent, ip);
3943 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3946 assert(procedure != null);
3948 // impl.state.barrier.inc();
3951 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
3954 public void execute(ReadGraphImpl graph, Integer result) {
3956 procedure.execute(graph, querySupport.getResource(result));
3957 } catch (Throwable t2) {
3958 Logger.defaultLogError(t2);
3960 // impl.state.barrier.dec();
3964 public void exception(ReadGraphImpl graph, Throwable t) {
3966 procedure.exception(graph, t);
3967 } catch (Throwable t2) {
3968 Logger.defaultLogError(t2);
3970 // impl.state.barrier.dec();
3974 } catch (DatabaseException e) {
3975 Logger.defaultLogError(e);
3981 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
3983 assert(subject != null);
3984 assert(procedure != null);
3986 final ListenerBase listener = getListenerBase(procedure);
3989 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
3990 procedure.execute(impl, !result.isEmpty());
3991 } catch (DatabaseException e) {
3992 procedure.exception(impl, e);
3998 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
4000 assert(subject != null);
4001 assert(predicate != null);
4002 assert(procedure != null);
4004 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
4006 boolean found = false;
4009 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4014 synchronized public void finished(AsyncReadGraph graph) {
4016 procedure.execute(graph, found);
4017 } catch (Throwable t2) {
4018 Logger.defaultLogError(t2);
4020 // impl.state.barrier.dec(this);
4024 public void exception(AsyncReadGraph graph, Throwable t) {
4026 procedure.exception(graph, t);
4027 } catch (Throwable t2) {
4028 Logger.defaultLogError(t2);
4030 // impl.state.barrier.dec(this);
4035 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
4036 // else impl.state.barrier.inc(null, null);
4038 forEachObject(impl, subject, predicate, ip);
4043 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
4045 assert(subject != null);
4046 assert(predicate != null);
4047 assert(procedure != null);
4049 // impl.state.barrier.inc();
4051 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
4053 boolean found = false;
4056 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4057 if(resource.equals(object)) found = true;
4061 synchronized public void finished(AsyncReadGraph graph) {
4063 procedure.execute(graph, found);
4064 } catch (Throwable t2) {
4065 Logger.defaultLogError(t2);
4067 // impl.state.barrier.dec();
4071 public void exception(AsyncReadGraph graph, Throwable t) {
4073 procedure.exception(graph, t);
4074 } catch (Throwable t2) {
4075 Logger.defaultLogError(t2);
4077 // impl.state.barrier.dec();
4085 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4087 assert(subject != null);
4088 assert(procedure != null);
4090 final ListenerBase listener = getListenerBase(procedure);
4092 // impl.state.barrier.inc();
4095 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
4098 public void execute(ReadGraphImpl graph, byte[] object) {
4099 boolean result = object != null;
4101 procedure.execute(graph, result);
4102 } catch (Throwable t2) {
4103 Logger.defaultLogError(t2);
4105 // impl.state.barrier.dec();
4109 public void exception(ReadGraphImpl graph, Throwable t) {
4111 procedure.exception(graph, t);
4112 } catch (Throwable t2) {
4113 Logger.defaultLogError(t2);
4115 // impl.state.barrier.dec();
4119 } catch (DatabaseException e) {
4120 Logger.defaultLogError(e);
4126 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4128 assert(subject != null);
4129 assert(procedure != null);
4131 final ListenerBase listener = getListenerBase(procedure);
4135 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
4138 public void exception(ReadGraphImpl graph, Throwable t) {
4140 procedure.exception(graph, t);
4141 } catch (Throwable t2) {
4142 Logger.defaultLogError(t2);
4144 // impl.state.barrier.dec();
4148 public void execute(ReadGraphImpl graph, int i) {
4150 procedure.execute(graph, querySupport.getResource(i));
4151 } catch (Throwable t2) {
4152 Logger.defaultLogError(t2);
4157 public void finished(ReadGraphImpl graph) {
4159 procedure.finished(graph);
4160 } catch (Throwable t2) {
4161 Logger.defaultLogError(t2);
4163 // impl.state.barrier.dec();
4167 } catch (DatabaseException e) {
4168 Logger.defaultLogError(e);
4174 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
4176 // assert(request != null);
4177 // assert(procedure != null);
4179 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
4184 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
4186 // assert(graph != null);
4187 // assert(request != null);
4189 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
4190 // if(entry != null && entry.isReady()) {
4191 // return (T)entry.get(graph, this, null);
4193 // return request.perform(graph);
4198 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
4200 // assert(graph != null);
4201 // assert(request != null);
4203 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
4204 // if(entry != null && entry.isReady()) {
4205 // if(entry.isExcepted()) {
4206 // Throwable t = (Throwable)entry.getResult();
4207 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4208 // else throw new DatabaseException(t);
4210 // return (T)entry.getResult();
4214 // final DataContainer<T> result = new DataContainer<T>();
4215 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
4217 // request.register(graph, new Listener<T>() {
4220 // public void exception(Throwable t) {
4221 // exception.set(t);
4225 // public void execute(T t) {
4230 // public boolean isDisposed() {
4236 // Throwable t = exception.get();
4238 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4239 // else throw new DatabaseException(t);
4242 // return result.get();
4249 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
4251 // assert(graph != null);
4252 // assert(request != null);
4254 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
4255 // if(entry != null && entry.isReady()) {
4256 // if(entry.isExcepted()) {
4257 // procedure.exception(graph, (Throwable)entry.getResult());
4259 // procedure.execute(graph, (T)entry.getResult());
4262 // request.perform(graph, procedure);
4268 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4270 assert(request != null);
4271 assert(procedure != null);
4275 queryMultiRead(impl, request, parent, listener, procedure);
4277 } catch (DatabaseException e) {
4279 throw new IllegalStateException(e);
4286 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4288 assert(request != null);
4289 assert(procedure != null);
4291 // impl.state.barrier.inc();
4293 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4295 public void execute(AsyncReadGraph graph, T result) {
4298 procedure.execute(graph, result);
4299 } catch (Throwable t2) {
4300 Logger.defaultLogError(t2);
4305 public void finished(AsyncReadGraph graph) {
4308 procedure.finished(graph);
4309 } catch (Throwable t2) {
4310 Logger.defaultLogError(t2);
4313 // impl.state.barrier.dec();
4318 public String toString() {
4319 return procedure.toString();
4323 public void exception(AsyncReadGraph graph, Throwable t) {
4326 procedure.exception(graph, t);
4327 } catch (Throwable t2) {
4328 Logger.defaultLogError(t2);
4331 // impl.state.barrier.dec();
4340 final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {
4342 assert(request != null);
4343 assert(procedure != null);
4347 queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4350 public String toString() {
4351 return procedure.toString();
4355 public void execute(AsyncReadGraph graph, T result) {
4357 procedure.execute(result);
4358 } catch (Throwable t2) {
4359 Logger.defaultLogError(t2);
4364 public void exception(AsyncReadGraph graph, Throwable throwable) {
4366 procedure.exception(throwable);
4367 } catch (Throwable t2) {
4368 Logger.defaultLogError(t2);
4374 } catch (DatabaseException e) {
4376 throw new IllegalStateException(e);
4383 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4385 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4390 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4392 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4397 public VirtualGraph getValueProvider(Resource subject) {
4399 return querySupport.getValueProvider(querySupport.getId(subject));
4403 public boolean resumeTasks(ReadGraphImpl graph) {
4405 return querySupport.resume(graph);
4409 public boolean isImmutable(int resourceId) {
4410 return querySupport.isImmutable(resourceId);
4413 public boolean isImmutable(Resource resource) {
4414 ResourceImpl impl = (ResourceImpl)resource;
4415 return isImmutable(impl.id);
4420 public Layer0 getL0(ReadGraph graph) {
4422 L0 = Layer0.getInstance(graph);