1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
35 import org.simantics.databoard.Bindings;
36 import org.simantics.db.AsyncReadGraph;
37 import org.simantics.db.DevelopmentKeys;
38 import org.simantics.db.DirectStatements;
39 import org.simantics.db.ReadGraph;
40 import org.simantics.db.RelationInfo;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
46 import org.simantics.db.common.utils.Logger;
47 import org.simantics.db.exception.DatabaseException;
48 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
49 import org.simantics.db.exception.NoInverseException;
50 import org.simantics.db.exception.ResourceNotFoundException;
51 import org.simantics.db.impl.ResourceImpl;
52 import org.simantics.db.impl.graph.BarrierTracing;
53 import org.simantics.db.impl.graph.ReadGraphImpl;
54 import org.simantics.db.impl.graph.ReadGraphSupport;
55 import org.simantics.db.impl.procedure.IntProcedureAdapter;
56 import org.simantics.db.impl.procedure.InternalProcedure;
57 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
58 import org.simantics.db.impl.support.ResourceSupport;
59 import org.simantics.db.procedure.AsyncMultiListener;
60 import org.simantics.db.procedure.AsyncMultiProcedure;
61 import org.simantics.db.procedure.AsyncProcedure;
62 import org.simantics.db.procedure.AsyncSetListener;
63 import org.simantics.db.procedure.ListenerBase;
64 import org.simantics.db.procedure.MultiProcedure;
65 import org.simantics.db.procedure.StatementProcedure;
66 import org.simantics.db.procedure.SyncMultiProcedure;
67 import org.simantics.db.request.AsyncMultiRead;
68 import org.simantics.db.request.ExternalRead;
69 import org.simantics.db.request.MultiRead;
70 import org.simantics.db.request.RequestFlags;
71 import org.simantics.layer0.Layer0;
72 import org.simantics.utils.DataContainer;
73 import org.simantics.utils.Development;
74 import org.simantics.utils.datastructures.Pair;
75 import org.simantics.utils.datastructures.collections.CollectionUtils;
76 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
78 import gnu.trove.procedure.TIntProcedure;
79 import gnu.trove.procedure.TLongProcedure;
80 import gnu.trove.procedure.TObjectProcedure;
81 import gnu.trove.set.hash.THashSet;
82 import gnu.trove.set.hash.TIntHashSet;
84 @SuppressWarnings({"rawtypes", "unchecked"})
85 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
87 public static int indent = 0;
92 public int boundQueries = 0;
95 final private int functionalRelation;
97 final private int superrelationOf;
99 final private int instanceOf;
101 final private int inverseOf;
103 final private int asserts;
105 final private int hasPredicate;
107 final private int hasPredicateInverse;
109 final private int hasObject;
111 final private int inherits;
113 final private int subrelationOf;
115 final private int rootLibrary;
118 * A cache for the root library resource. Initialized in
119 * {@link #getRootLibraryResource()}.
121 private volatile ResourceImpl rootLibraryResource;
123 final private int library;
125 final private int consistsOf;
127 final private int hasName;
129 AtomicInteger sleepers = new AtomicInteger(0);
131 boolean updating = false;
134 final public QueryCache cache;
135 final public QuerySupport querySupport;
136 final public Session session;
137 final public ResourceSupport resourceSupport;
139 final public Semaphore requests = new Semaphore(1);
141 final public QueryListening listening = new QueryListening(this);
143 QueryThread[] executors;
145 public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
147 public LinkedList<SessionTask> topLevelTasks = new LinkedList<SessionTask>();
151 INIT, RUN, SLEEP, DISPOSED
155 public ThreadState[] threadStates;
157 final Object querySupportLock;
159 public Long modificationCounter = 0L;
161 public void close() {
164 public SessionTask getSubTask(ReadGraphImpl parent) {
165 synchronized(querySupportLock) {
167 while(index < freeScheduling.size()) {
168 SessionTask task = freeScheduling.get(index);
169 if(task.isSubtask(parent) && task.maybeReady()) {
170 return freeScheduling.remove(index);
179 * We are running errands while waiting for requests to complete.
180 * We can only run work that is part of the current root request to avoid any deadlocks
182 public boolean performPending(ReadGraphImpl under) {
183 SessionTask task = getSubTask(under);
185 task.run(thread.get());
191 final public void scheduleNow(SessionTask request) {
192 SessionTask toExecute = scheduleOrReturnForExecution(request);
193 if(toExecute != null)
194 toExecute.run(thread.get());
197 final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
199 assert(request != null);
201 synchronized(querySupportLock) {
203 LinkedList<SessionTask> queue = request.rootGraph != null ? freeScheduling : topLevelTasks;
205 if(BarrierTracing.BOOKKEEPING) {
206 Exception current = new Exception();
207 Exception previous = BarrierTracing.tasks.put(request, current);
208 if(previous != null) {
209 previous.printStackTrace();
210 current.printStackTrace();
214 queue.addFirst(request);
225 final public int THREAD_MASK;
227 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
229 public static abstract class SessionTask {
231 final protected ReadGraphImpl rootGraph;
232 private int counter = 0;
233 protected int position = 1;
234 private Exception trace;
236 public SessionTask() {
240 public SessionTask(ReadGraphImpl rootGraph) {
241 this.rootGraph = rootGraph;
244 public boolean isSubtask(ReadGraphImpl graph) {
245 return graph.isParent(rootGraph);
248 public abstract void run0(int thread);
250 public final void run(int thread) {
252 if(BarrierTracing.BOOKKEEPING) {
253 trace.printStackTrace();
254 new Exception().printStackTrace();
256 throw new IllegalStateException("Multiple invocations of SessionTask!");
258 if(BarrierTracing.BOOKKEEPING) {
259 trace = new Exception();
262 //if(graph != null && graph.asyncBarrier != null) graph.asyncBarrier.dec();
265 public boolean maybeReady() {
270 public String toString() {
272 return "SessionTask[no graph]";
274 // return "SessionTask[" + graph.parent + "]";
277 // public int getLevel() {
278 // if(graph == null) return 0;
279 // else return graph.getLevel();
284 public static abstract class SessionRead extends SessionTask {
286 final public Semaphore notify;
287 final public DataContainer<Throwable> throwable;
289 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
291 this.throwable = throwable;
292 this.notify = notify;
297 long waitingTime = 0;
300 static int koss2 = 0;
302 public boolean resume(ReadGraphImpl graph) {
303 return executors[0].runSynchronized();
306 //private WeakReference<GarbageTracker> garbageTracker;
308 private class GarbageTracker {
311 protected void finalize() throws Throwable {
313 // System.err.println("GarbageTracker");
315 // garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
323 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
324 throws DatabaseException {
326 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
329 THREAD_MASK = threads - 1;
332 cache = new QueryCache(core, threads);
333 session = querySupport.getSession();
334 resourceSupport = querySupport.getSupport();
335 querySupportLock = core.getLock();
337 executors = new QueryThread[THREADS];
338 // queues = new ArrayList[THREADS];
339 // threadLocks = new ReentrantLock[THREADS];
340 // threadConditions = new Condition[THREADS];
341 threadStates = new ThreadState[THREADS];
342 // ownTasks = new ArrayList[THREADS];
343 // ownSyncTasks = new ArrayList[THREADS];
344 // delayQueues = new ArrayList[THREADS * THREADS];
346 // freeSchedule = new AtomicInteger(0);
348 // for (int i = 0; i < THREADS * THREADS; i++) {
349 // delayQueues[i] = new ArrayList<SessionTask>();
352 for (int i = 0; i < THREADS; i++) {
354 // tasks[i] = new ArrayList<Runnable>();
355 // ownTasks[i] = new ArrayList<SessionTask>();
356 // ownSyncTasks[i] = new ArrayList<SessionTask>();
357 // queues[i] = new ArrayList<SessionTask>();
358 // threadLocks[i] = new ReentrantLock();
359 // threadConditions[i] = threadLocks[i].newCondition();
360 // limits[i] = false;
361 threadStates[i] = ThreadState.INIT;
365 for (int i = 0; i < THREADS; i++) {
369 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
371 threadSet.add(executors[i]);
376 for (int i = 0; i < THREADS; i++) {
377 executors[i].start();
380 // Make sure that query threads are up and running
381 while(sleepers.get() != THREADS) {
384 } catch (InterruptedException e) {
389 rootLibrary = core.getBuiltin("http:/");
390 boolean builtinsInstalled = rootLibrary != 0;
392 if (builtinsInstalled) {
393 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
394 assert (functionalRelation != 0);
396 functionalRelation = 0;
398 if (builtinsInstalled) {
399 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
400 assert (instanceOf != 0);
404 if (builtinsInstalled) {
405 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
406 assert (inverseOf != 0);
411 if (builtinsInstalled) {
412 inherits = core.getBuiltin(Layer0.URIs.Inherits);
413 assert (inherits != 0);
417 if (builtinsInstalled) {
418 asserts = core.getBuiltin(Layer0.URIs.Asserts);
419 assert (asserts != 0);
423 if (builtinsInstalled) {
424 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
425 assert (hasPredicate != 0);
429 if (builtinsInstalled) {
430 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
431 assert (hasPredicateInverse != 0);
433 hasPredicateInverse = 0;
435 if (builtinsInstalled) {
436 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
437 assert (hasObject != 0);
441 if (builtinsInstalled) {
442 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
443 assert (subrelationOf != 0);
447 if (builtinsInstalled) {
448 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
449 assert (superrelationOf != 0);
453 if (builtinsInstalled) {
454 library = core.getBuiltin(Layer0.URIs.Library);
455 assert (library != 0);
459 if (builtinsInstalled) {
460 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
461 assert (consistsOf != 0);
465 if (builtinsInstalled) {
466 hasName = core.getBuiltin(Layer0.URIs.HasName);
467 assert (hasName != 0);
473 final public void releaseWrite(ReadGraphImpl graph) {
474 propagateChangesInQueryCache(graph);
475 modificationCounter++;
478 final public int getId(final Resource r) {
479 return querySupport.getId(r);
482 public QuerySupport getCore() {
486 public int getFunctionalRelation() {
487 return functionalRelation;
490 public int getInherits() {
494 public int getInstanceOf() {
498 public int getInverseOf() {
502 public int getSubrelationOf() {
503 return subrelationOf;
506 public int getSuperrelationOf() {
507 return superrelationOf;
510 public int getAsserts() {
514 public int getHasPredicate() {
518 public int getHasPredicateInverse() {
519 return hasPredicateInverse;
522 public int getHasObject() {
526 public int getRootLibrary() {
530 public Resource getRootLibraryResource() {
531 if (rootLibraryResource == null) {
532 // Synchronization is not needed here, it doesn't matter if multiple
533 // threads simultaneously set rootLibraryResource once.
534 int root = getRootLibrary();
536 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
537 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
539 return rootLibraryResource;
542 public int getLibrary() {
546 public int getConsistsOf() {
550 public int getHasName() {
554 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
558 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
561 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
563 if (result != null && result != 0) {
564 procedure.execute(graph, result);
568 // Fall back to using the fixed builtins.
569 // result = querySupport.getBuiltin(id);
570 // if (result != 0) {
571 // procedure.execute(graph, result);
576 // result = querySupport.getRandomAccessReference(id);
577 // } catch (ResourceNotFoundException e) {
578 // procedure.exception(graph, e);
583 procedure.execute(graph, result);
585 procedure.exception(graph, new ResourceNotFoundException(id));
591 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
592 procedure.exception(graph, t);
596 } catch (DatabaseException e) {
600 procedure.exception(graph, e);
602 } catch (DatabaseException e1) {
604 Logger.defaultLogError(e1);
612 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
614 Integer result = querySupport.getBuiltin(id);
616 procedure.execute(graph, result);
618 procedure.exception(graph, new ResourceNotFoundException(id));
623 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
626 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
627 } catch (DatabaseException e) {
628 throw new IllegalStateException(e);
633 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
637 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
638 } catch (DatabaseException e) {
639 throw new IllegalStateException(e);
644 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
645 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
649 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
651 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
655 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
657 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
661 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
663 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
667 boolean isBound(ExternalReadEntry<?> entry) {
668 if(entry.hasParents()) return true;
669 else if(listening.hasListener(entry)) return true;
673 static class Dummy implements InternalProcedure<Object>, IntProcedure {
676 public void execute(ReadGraphImpl graph, int i) {
680 public void finished(ReadGraphImpl graph) {
684 public void execute(ReadGraphImpl graph, Object result) {
688 public void exception(ReadGraphImpl graph, Throwable throwable) {
693 private static final Dummy dummy = new Dummy();
696 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
698 if (DebugPolicy.PERFORM)
699 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
702 assert (!collecting);
704 assert(query.assertNotDiscarded());
706 registerDependencies(graph, query, parent, listener, procedure, false);
708 // FRESH, REFUTED, EXCEPTED go here
709 if (!query.isReady()) {
714 query.computeForEach(graph, this, (Procedure)dummy, true);
715 return query.get(graph, this, null);
721 return query.get(graph, this, procedure);
729 interface QueryCollectorSupport {
730 public CacheCollectionResult allCaches();
731 public Collection<CacheEntry> getRootList();
732 public int getCurrentSize();
733 public int calculateCurrentSize();
734 public CacheEntryBase iterate(int level);
735 public void remove();
736 public void setLevel(CacheEntryBase entry, int level);
737 public boolean start(boolean flush);
740 interface QueryCollector {
742 public void collect(int youngTarget, int allowedTimeInMs);
746 class QueryCollectorSupportImpl implements QueryCollectorSupport {
748 private static final boolean DEBUG = false;
749 private static final double ITERATION_RATIO = 0.2;
751 private CacheCollectionResult iteration = new CacheCollectionResult();
752 private boolean fresh = true;
753 private boolean needDataInStart = true;
755 QueryCollectorSupportImpl() {
759 public CacheCollectionResult allCaches() {
760 CacheCollectionResult result = new CacheCollectionResult();
761 QueryProcessor.this.allCaches(result);
766 public boolean start(boolean flush) {
767 // We need new data from query maps
769 if(needDataInStart || flush) {
770 // Last run ended after processing all queries => refresh data
771 restart(flush ? 0.0 : ITERATION_RATIO);
773 // continue with previous big data
775 // Notify caller about iteration situation
776 return iteration.isAtStart();
779 private void restart(double targetRatio) {
781 needDataInStart = true;
783 long start = System.nanoTime();
786 // We need new data from query maps
788 int iterationSize = iteration.size()+1;
789 int diff = calculateCurrentSize()-iterationSize;
791 double ratio = (double)diff / (double)iterationSize;
792 boolean dirty = Math.abs(ratio) >= targetRatio;
795 iteration = allCaches();
797 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
798 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
799 System.err.print(" " + iteration.levels[i].size());
800 System.err.println("");
807 needDataInStart = false;
809 // We are returning here within the same GC round - reuse the cache table
818 public CacheEntryBase iterate(int level) {
820 CacheEntryBase entry = iteration.next(level);
822 restart(ITERATION_RATIO);
826 while(entry != null && entry.isDiscarded()) {
827 entry = iteration.next(level);
835 public void remove() {
840 public void setLevel(CacheEntryBase entry, int level) {
841 iteration.setLevel(entry, level);
844 public Collection<CacheEntry> getRootList() {
845 return cache.getRootList();
849 public int calculateCurrentSize() {
850 return cache.calculateCurrentSize();
854 public int getCurrentSize() {
859 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
861 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
862 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
864 public int querySize() {
868 public void gc(int youngTarget, int allowedTimeInMs) {
870 collector.collect(youngTarget, allowedTimeInMs);
875 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
877 if(entry.isDiscarded()) return;
878 if(workarea.containsKey(entry)) return;
880 Iterable<CacheEntry> parents = entry.getParents(this);
881 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
882 for(CacheEntry e : parents) {
883 if(e.isDiscarded()) continue;
885 processParentReport(e, workarea);
887 workarea.put(entry, ps);
891 public synchronized String reportQueryActivity(File file) throws IOException {
893 System.err.println("reportQueries " + file.getAbsolutePath());
898 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
900 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
901 Collections.reverse(entries);
903 for(Pair<String,Integer> entry : entries) {
904 b.println(entry.first + ": " + entry.second);
909 Development.histogram.clear();
915 public synchronized String reportQueries(File file) throws IOException {
917 System.err.println("reportQueries " + file.getAbsolutePath());
922 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
924 long start = System.nanoTime();
926 // ArrayList<CacheEntry> all = ;
928 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
929 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
930 for(CacheEntryBase entry : caches) {
931 processParentReport(entry, workarea);
934 // for(CacheEntry e : all) System.err.println("entry: " + e);
936 long duration = System.nanoTime() - start;
937 System.err.println("Query root set in " + 1e-9*duration + "s.");
939 start = System.nanoTime();
941 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
945 for(CacheEntry entry : workarea.keySet()) {
946 boolean listener = listening.hasListenerAfterDisposing(entry);
947 boolean hasParents = entry.getParents(this).iterator().hasNext();
950 flagMap.put(entry, 0);
951 } else if (!hasParents) {
953 flagMap.put(entry, 1);
956 flagMap.put(entry, 2);
969 long start2 = System.nanoTime();
971 int boundCounter = 0;
972 int unboundCounter = 0;
973 int unknownCounter = 0;
975 for(CacheEntry<?> entry : workarea.keySet()) {
977 //System.err.println("process " + entry);
979 int flags = flagMap.get(entry);
980 int bindStatus = flags & 3;
982 if(bindStatus == 0) boundCounter++;
983 else if(bindStatus == 1) unboundCounter++;
984 else if(bindStatus == 2) unknownCounter++;
986 if(bindStatus < 2) continue;
989 for(CacheEntry parent : entry.getParents(this)) {
991 if(parent.isDiscarded()) flagMap.put(parent, 1);
993 int flags2 = flagMap.get(parent);
994 int bindStatus2 = flags2 & 3;
995 // Parent is bound => child is bound
996 if(bindStatus2 == 0) {
1000 // Parent is unknown => child is unknown
1001 else if (bindStatus2 == 2) {
1008 flagMap.put(entry, newStatus);
1012 duration = System.nanoTime() - start2;
1013 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1014 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1016 } while(!done && loops++ < 20);
1020 for(CacheEntry entry : workarea.keySet()) {
1022 int bindStatus = flagMap.get(entry);
1023 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1029 duration = System.nanoTime() - start;
1030 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1032 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1034 for(CacheEntry entry : workarea.keySet()) {
1035 Class<?> clazz = entry.getClass();
1036 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).id.getClass();
1037 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).id.getClass();
1038 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).id.getClass();
1039 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).id.getClass();
1040 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).id.getClass();
1041 Integer c = counts.get(clazz);
1042 if(c == null) counts.put(clazz, -1);
1043 else counts.put(clazz, c-1);
1046 b.print("// Simantics DB client query report file\n");
1047 b.print("// This file contains the following information\n");
1048 b.print("// -The amount of cached query instances per query class\n");
1049 b.print("// -The sizes of retained child sets\n");
1050 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1051 b.print("// -Followed by status, where\n");
1052 b.print("// -0=bound\n");
1053 b.print("// -1=free\n");
1054 b.print("// -2=unknown\n");
1055 b.print("// -L=has listener\n");
1056 b.print("// -List of children for each query (search for 'C <query name>')\n");
1058 b.print("----------------------------------------\n");
1060 b.print("// Queries by class\n");
1061 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1062 b.print(-p.second + " " + p.first.getName() + "\n");
1065 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1066 for(CacheEntry e : workarea.keySet())
1069 boolean changed = true;
1071 while(changed && iter++<50) {
1075 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1076 for(CacheEntry e : workarea.keySet())
1079 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1080 Integer c = hist.get(e.getKey());
1081 for(CacheEntry p : e.getValue()) {
1082 Integer i = newHist.get(p);
1083 newHist.put(p, i+c);
1086 for(CacheEntry e : workarea.keySet()) {
1087 Integer value = newHist.get(e);
1088 Integer old = hist.get(e);
1089 if(!value.equals(old)) {
1091 // System.err.println("hist " + e + ": " + old + " => " + value);
1096 System.err.println("Retained set iteration " + iter);
1100 b.print("// Queries by retained set\n");
1101 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1102 b.print("" + -p.second + " " + p.first + "\n");
1105 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1107 b.print("// Entry parent listing\n");
1108 for(CacheEntry entry : workarea.keySet()) {
1109 int status = flagMap.get(entry);
1110 boolean hasListener = listening.hasListenerAfterDisposing(entry);
1111 b.print("Q " + entry.toString());
1113 b.print(" (L" + status + ")");
1116 b.print(" (" + status + ")");
1119 for(CacheEntry parent : workarea.get(entry)) {
1120 Collection<CacheEntry> inv = inverse.get(parent);
1122 inv = new ArrayList<CacheEntry>();
1123 inverse.put(parent, inv);
1126 b.print(" " + parent.toString());
1131 b.print("// Entry child listing\n");
1132 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1133 b.print("C " + entry.getKey().toString());
1135 for(CacheEntry child : entry.getValue()) {
1136 Integer h = hist.get(child);
1140 b.print(" <no children>");
1142 b.print(" " + child.toString());
1147 b.print("#queries: " + workarea.keySet().size() + "\n");
1148 b.print("#listeners: " + listeners + "\n");
1152 return "Dumped " + workarea.keySet().size() + " queries.";
1156 boolean removeQuery(CacheEntry entry) {
1158 // This entry has been removed before. No need to do anything here.
1159 if(entry.isDiscarded()) return false;
1161 assert (!entry.isDiscarded());
1163 Query query = entry.getQuery();
1165 query.removeEntry(this);
1170 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1181 * @return true if this entry is being listened
1183 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1187 CacheEntry entry = e.entry;
1190 * If the dependency graph forms a DAG, some entries are inserted in the
1191 * todo list many times. They only need to be processed once though.
1193 if (entry.isDiscarded()) {
1194 if (Development.DEVELOPMENT) {
1195 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1196 System.err.print("D");
1197 for (int i = 0; i < e.indent; i++)
1198 System.err.print(" ");
1199 System.err.println(entry.getQuery());
1202 // System.err.println(" => DISCARDED");
1206 // if (entry.isRefuted()) {
1207 // if (Development.DEVELOPMENT) {
1208 // if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1209 // System.err.print("R");
1210 // for (int i = 0; i < e.indent; i++)
1211 // System.err.print(" ");
1212 // System.err.println(entry.getQuery());
1218 if (entry.isExcepted()) {
1219 if (Development.DEVELOPMENT) {
1220 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1221 System.err.print("E");
1226 if (entry.isPending()) {
1227 if (Development.DEVELOPMENT) {
1228 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1229 System.err.print("P");
1236 if (Development.DEVELOPMENT) {
1237 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1238 System.err.print("U ");
1239 for (int i = 0; i < e.indent; i++)
1240 System.err.print(" ");
1241 System.err.print(entry.getQuery());
1245 Query query = entry.getQuery();
1246 int type = query.type();
1248 boolean hasListener = listening.hasListener(entry);
1250 if (Development.DEVELOPMENT) {
1251 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1252 if(listening.hasListener(entry)) {
1253 System.err.println(" (L)");
1255 System.err.println("");
1260 if(entry.isPending() || entry.isExcepted()) {
1263 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1265 immediates.put(entry, entry);
1280 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1282 immediates.put(entry, entry);
1296 // System.err.println(" => FOO " + type);
1299 ArrayList<ListenerEntry> entries = listening.listeners.get(entry);
1300 if(entries != null) {
1301 for (ListenerEntry le : entries) {
1302 listening.scheduleListener(le);
1307 // If invalid, update parents
1308 if (type == RequestFlags.INVALIDATE) {
1309 listening.updateParents(e.indent, entry, todo);
1317 * @param av1 an array (guaranteed)
1318 * @param av2 any object
1319 * @return <code>true</code> if the two arrays are equal
1321 private final boolean arrayEquals(Object av1, Object av2) {
1324 Class<?> c1 = av1.getClass().getComponentType();
1325 Class<?> c2 = av2.getClass().getComponentType();
1326 if (c2 == null || !c1.equals(c2))
1328 boolean p1 = c1.isPrimitive();
1329 boolean p2 = c2.isPrimitive();
1333 return Arrays.equals((Object[]) av1, (Object[]) av2);
1334 if (boolean.class.equals(c1))
1335 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1336 else if (byte.class.equals(c1))
1337 return Arrays.equals((byte[]) av1, (byte[]) av2);
1338 else if (int.class.equals(c1))
1339 return Arrays.equals((int[]) av1, (int[]) av2);
1340 else if (long.class.equals(c1))
1341 return Arrays.equals((long[]) av1, (long[]) av2);
1342 else if (float.class.equals(c1))
1343 return Arrays.equals((float[]) av1, (float[]) av2);
1344 else if (double.class.equals(c1))
1345 return Arrays.equals((double[]) av1, (double[]) av2);
1346 throw new RuntimeException("??? Contact application querySupport.");
1351 final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1355 Query query = entry.getQuery();
1357 if (Development.DEVELOPMENT) {
1358 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_RECOMPUTE, Bindings.BOOLEAN)) {
1359 System.err.println("R " + query);
1363 entry.prepareRecompute(querySupport);
1365 ReadGraphImpl parentGraph = graph.forRecompute(entry);
1367 query.recompute(parentGraph);
1369 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1371 Object newValue = entry.getResult();
1373 if (ListenerEntry.NO_VALUE == oldValue) {
1374 if (Development.DEVELOPMENT) {
1375 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1376 System.out.println("C " + query);
1377 System.out.println("- " + oldValue);
1378 System.out.println("- " + newValue);
1384 boolean changed = false;
1386 if (newValue != null) {
1387 if (newValue.getClass().isArray()) {
1388 changed = !arrayEquals(newValue, oldValue);
1390 changed = !newValue.equals(oldValue);
1393 changed = (oldValue != null);
1395 if (Development.DEVELOPMENT) {
1396 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1397 System.err.println("C " + query);
1398 System.err.println("- " + oldValue);
1399 System.err.println("- " + newValue);
1403 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1405 } catch (Throwable t) {
1407 Logger.defaultLogError(t);
1409 return ListenerEntry.NO_VALUE;
1418 * @return true if this entry still has listeners
1420 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1422 assert (!cache.collecting);
1426 boolean hadListeners = false;
1427 boolean listenersUnknown = false;
1431 assert(entry != null);
1432 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1433 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1434 todo.add(new UpdateEntry(null, entry, 0));
1438 // Walk the tree and collect immediate updates
1439 while (!todo.isEmpty()) {
1440 UpdateEntry e = todo.pop();
1441 hadListeners |= updateQuery(e, todo, immediates);
1444 if(immediates.isEmpty()) break;
1446 // Evaluate all immediate updates and collect parents to update
1447 for(CacheEntry immediate : immediates.values()) {
1449 if(immediate.isDiscarded()) {
1453 if(immediate.isExcepted()) {
1455 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1456 if (newValue != ListenerEntry.NOT_CHANGED)
1457 listening.updateParents(0, immediate, todo);
1461 Object oldValue = immediate.getResult();
1462 Object newValue = compareTo(graph, immediate, oldValue);
1464 if (newValue != ListenerEntry.NOT_CHANGED) {
1465 listening.updateParents(0, immediate, todo);
1467 // If not changed, keep the old value
1468 immediate.setResult(oldValue);
1469 immediate.setReady();
1470 listenersUnknown = true;
1480 } catch (Throwable t) {
1481 Logger.defaultLogError(t);
1487 return hadListeners | listenersUnknown;
1491 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1492 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1493 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1494 // Maybe use a mutex from util.concurrent?
1495 private Object primitiveUpdateLock = new Object();
1496 private THashSet scheduledPrimitiveUpdates = new THashSet();
1498 private ArrayList<CacheEntry> refutations = new ArrayList<>();
1500 private void markForUpdate(ReadGraphImpl graph, CacheEntry e) {
1505 private void updateRefutations(ReadGraphImpl graph) {
1507 for(CacheEntry e : refutations)
1510 refutations.clear();
1514 public void propagateChangesInQueryCache(final ReadGraphImpl graph) {
1516 // Make sure that listening has performed its work
1519 cache.dirty = false;
1522 if (Development.DEVELOPMENT) {
1523 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1524 System.err.println("== Query update ==");
1528 // Special case - one statement
1529 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1531 long arg0 = scheduledObjectUpdates.getFirst();
1533 final int subject = (int)(arg0 >>> 32);
1534 final int predicate = (int)(arg0 & 0xffffffff);
1536 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1537 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1538 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1540 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1541 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1542 if(principalTypes != null) markForUpdate(graph, principalTypes);
1543 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1544 if(types != null) markForUpdate(graph, types);
1547 if(predicate == subrelationOf) {
1548 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1549 if(superRelations != null) markForUpdate(graph, superRelations);
1552 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1553 if(dp != null) markForUpdate(graph, dp);
1554 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1555 if(os != null) markForUpdate(graph, os);
1557 updateRefutations(graph);
1559 scheduledObjectUpdates.clear();
1561 if (Development.DEVELOPMENT) {
1562 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1563 System.err.println("== Query update ends ==");
1571 // Special case - one value
1572 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1574 int arg0 = scheduledValueUpdates.getFirst();
1576 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1577 if(valueQuery != null) markForUpdate(graph, valueQuery);
1579 updateRefutations(graph);
1581 scheduledValueUpdates.clear();
1583 if (Development.DEVELOPMENT) {
1584 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1585 System.err.println("== Query update ends ==");
1593 final TIntHashSet predicates = new TIntHashSet();
1594 final TIntHashSet orderedSets = new TIntHashSet();
1596 THashSet primitiveUpdates;
1597 synchronized (primitiveUpdateLock) {
1598 primitiveUpdates = scheduledPrimitiveUpdates;
1599 scheduledPrimitiveUpdates = new THashSet();
1602 scheduledValueUpdates.forEach(new TIntProcedure() {
1605 public boolean execute(int arg0) {
1606 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1607 if(valueQuery != null) markForUpdate(graph, valueQuery);
1613 scheduledInvalidates.forEach(new TIntProcedure() {
1616 public boolean execute(int resource) {
1618 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1619 if(valueQuery != null) markForUpdate(graph, valueQuery);
1621 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1622 if(principalTypes != null) markForUpdate(graph, principalTypes);
1623 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1624 if(types != null) markForUpdate(graph, types);
1626 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1627 if(superRelations != null) markForUpdate(graph, superRelations);
1629 predicates.add(resource);
1636 scheduledObjectUpdates.forEach(new TLongProcedure() {
1639 public boolean execute(long arg0) {
1641 final int subject = (int)(arg0 >>> 32);
1642 final int predicate = (int)(arg0 & 0xffffffff);
1644 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1645 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1646 if(principalTypes != null) markForUpdate(graph, principalTypes);
1647 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1648 if(types != null) markForUpdate(graph, types);
1651 if(predicate == subrelationOf) {
1652 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1653 if(superRelations != null) markForUpdate(graph, superRelations);
1656 predicates.add(subject);
1657 orderedSets.add(predicate);
1665 predicates.forEach(new TIntProcedure() {
1668 public boolean execute(final int subject) {
1670 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1671 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1672 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1674 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1675 if(entry != null) markForUpdate(graph, entry);
1683 orderedSets.forEach(new TIntProcedure() {
1686 public boolean execute(int orderedSet) {
1688 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1689 if(entry != null) markForUpdate(graph, entry);
1697 updateRefutations(graph);
1699 primitiveUpdates.forEach(new TObjectProcedure() {
1702 public boolean execute(Object arg0) {
1704 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1705 if (query != null) {
1706 boolean listening = update(graph, query);
1707 if (!listening && !query.hasParents()) {
1708 cache.externalReadEntryMap.remove(arg0);
1717 scheduledValueUpdates.clear();
1718 scheduledObjectUpdates.clear();
1719 scheduledInvalidates.clear();
1721 if (Development.DEVELOPMENT) {
1722 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1723 System.err.println("== Query update ends ==");
1729 public void updateValue(final int resource) {
1730 scheduledValueUpdates.add(resource);
1734 public void updateStatements(final int resource, final int predicate) {
1735 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
1739 private int lastInvalidate = 0;
1741 public void invalidateResource(final int resource) {
1742 if(lastInvalidate == resource) return;
1743 scheduledValueUpdates.add(resource);
1744 lastInvalidate = resource;
1748 public void updatePrimitive(final ExternalRead primitive) {
1750 // External reads may be updated from arbitrary threads.
1751 // Synchronize to prevent race-conditions.
1752 synchronized (primitiveUpdateLock) {
1753 scheduledPrimitiveUpdates.add(primitive);
1755 querySupport.dirtyPrimitives();
1760 public synchronized String toString() {
1761 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
1765 protected void doDispose() {
1767 requests.release(Integer.MAX_VALUE / 2);
1769 for(int index = 0; index < THREADS; index++) {
1770 executors[index].dispose();
1774 for(int i=0;i<100;i++) {
1776 boolean alive = false;
1777 for(int index = 0; index < THREADS; index++) {
1778 alive |= executors[index].isAlive();
1783 } catch (InterruptedException e) {
1784 Logger.defaultLogError(e);
1789 // Then start interrupting
1790 for(int i=0;i<100;i++) {
1792 boolean alive = false;
1793 for(int index = 0; index < THREADS; index++) {
1794 alive |= executors[index].isAlive();
1797 for(int index = 0; index < THREADS; index++) {
1798 executors[index].interrupt();
1802 // // Then just destroy
1803 // for(int index = 0; index < THREADS; index++) {
1804 // executors[index].destroy();
1807 for(int index = 0; index < THREADS; index++) {
1809 executors[index].join(5000);
1810 } catch (InterruptedException e) {
1811 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
1813 executors[index] = null;
1818 public int getHits() {
1822 public int getMisses() {
1823 return cache.misses;
1826 public int getSize() {
1830 public Set<Long> getReferencedClusters() {
1831 HashSet<Long> result = new HashSet<Long>();
1832 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
1833 Objects query = (Objects) entry.getQuery();
1834 result.add(querySupport.getClusterId(query.r1()));
1836 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
1837 DirectPredicates query = (DirectPredicates) entry.getQuery();
1838 result.add(querySupport.getClusterId(query.id));
1840 for (CacheEntry entry : cache.valueQueryMap.values()) {
1841 ValueQuery query = (ValueQuery) entry.getQuery();
1842 result.add(querySupport.getClusterId(query.id));
1847 public void assertDone() {
1850 CacheCollectionResult allCaches(CacheCollectionResult result) {
1852 return cache.allCaches(result);
1856 public void printDiagnostics() {
1859 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
1860 querySupport.requestCluster(graph, clusterId, runnable);
1863 public int clean() {
1864 collector.collect(0, Integer.MAX_VALUE);
1868 public void clean(final Collection<ExternalRead<?>> requests) {
1869 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
1870 Iterator<ExternalRead<?>> iterator = requests.iterator();
1872 public CacheCollectionResult allCaches() {
1873 throw new UnsupportedOperationException();
1876 public CacheEntryBase iterate(int level) {
1877 if(iterator.hasNext()) {
1878 ExternalRead<?> request = iterator.next();
1879 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1880 if (entry != null) return entry;
1881 else return iterate(level);
1883 iterator = requests.iterator();
1888 public void remove() {
1889 throw new UnsupportedOperationException();
1892 public void setLevel(CacheEntryBase entry, int level) {
1893 throw new UnsupportedOperationException();
1896 public Collection<CacheEntry> getRootList() {
1897 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
1898 for (ExternalRead<?> request : requests) {
1899 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1906 public int getCurrentSize() {
1910 public int calculateCurrentSize() {
1911 // This tells the collector to attempt collecting everything.
1912 return Integer.MAX_VALUE;
1915 public boolean start(boolean flush) {
1919 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
1922 public void scanPending() {
1924 cache.scanPending();
1928 public ReadGraphImpl graphForVirtualRequest() {
1929 return ReadGraphImpl.createAsync(this);
1933 private HashMap<Resource, Class<?>> builtinValues;
1935 public Class<?> getBuiltinValue(Resource r) {
1936 if(builtinValues == null) initBuiltinValues();
1937 return builtinValues.get(r);
1940 Exception callerException = null;
1942 public interface AsyncBarrier {
1945 // public void inc(String debug);
1946 // public void dec(String debug);
1949 // final public QueryProcessor processor;
1950 // final public QuerySupport support;
1952 // boolean disposed = false;
1954 private void initBuiltinValues() {
1956 Layer0 b = getSession().peekService(Layer0.class);
1957 if(b == null) return;
1959 builtinValues = new HashMap<Resource, Class<?>>();
1961 builtinValues.put(b.String, String.class);
1962 builtinValues.put(b.Double, Double.class);
1963 builtinValues.put(b.Float, Float.class);
1964 builtinValues.put(b.Long, Long.class);
1965 builtinValues.put(b.Integer, Integer.class);
1966 builtinValues.put(b.Byte, Byte.class);
1967 builtinValues.put(b.Boolean, Boolean.class);
1969 builtinValues.put(b.StringArray, String[].class);
1970 builtinValues.put(b.DoubleArray, double[].class);
1971 builtinValues.put(b.FloatArray, float[].class);
1972 builtinValues.put(b.LongArray, long[].class);
1973 builtinValues.put(b.IntegerArray, int[].class);
1974 builtinValues.put(b.ByteArray, byte[].class);
1975 builtinValues.put(b.BooleanArray, boolean[].class);
1979 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
1981 // if (null == provider2) {
1982 // this.processor = null;
1986 // this.processor = provider2;
1987 // support = provider2.getCore();
1988 // initBuiltinValues();
1992 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
1993 // return new ReadGraphSupportImpl(impl.processor);
1997 final public Session getSession() {
2001 final public ResourceSupport getResourceSupport() {
2002 return resourceSupport;
2006 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2010 for(Resource predicate : getPredicates(impl, subject))
2011 procedure.execute(impl, predicate);
2013 procedure.finished(impl);
2015 } catch (Throwable e) {
2016 procedure.exception(impl, e);
2022 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2024 throw new UnsupportedOperationException();
2026 // assert(subject != null);
2027 // assert(procedure != null);
2029 // final ListenerBase listener = getListenerBase(procedure);
2032 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2035 // public void execute(ReadGraphImpl graph, int i) {
2037 // procedure.execute(querySupport.getResource(i));
2038 // } catch (Throwable t2) {
2039 // Logger.defaultLogError(t2);
2044 // public void finished(ReadGraphImpl graph) {
2046 // procedure.finished();
2047 // } catch (Throwable t2) {
2048 // Logger.defaultLogError(t2);
2050 //// impl.state.barrier.dec();
2054 // public void exception(ReadGraphImpl graph, Throwable t) {
2056 // procedure.exception(t);
2057 // } catch (Throwable t2) {
2058 // Logger.defaultLogError(t2);
2060 //// impl.state.barrier.dec();
2064 // } catch (DatabaseException e) {
2065 // Logger.defaultLogError(e);
2071 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2072 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2076 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2077 final Resource predicate, final MultiProcedure<Statement> procedure) {
2079 assert(subject != null);
2080 assert(predicate != null);
2081 assert(procedure != null);
2083 final ListenerBase listener = getListenerBase(procedure);
2085 // impl.state.barrier.inc();
2088 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2091 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2093 procedure.execute(querySupport.getStatement(s, p, o));
2094 } catch (Throwable t2) {
2095 Logger.defaultLogError(t2);
2100 public void finished(ReadGraphImpl graph) {
2102 procedure.finished();
2103 } catch (Throwable t2) {
2104 Logger.defaultLogError(t2);
2106 // impl.state.barrier.dec();
2110 public void exception(ReadGraphImpl graph, Throwable t) {
2112 procedure.exception(t);
2113 } catch (Throwable t2) {
2114 Logger.defaultLogError(t2);
2116 // impl.state.barrier.dec();
2120 } catch (DatabaseException e) {
2121 Logger.defaultLogError(e);
2127 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2128 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2130 assert(subject != null);
2131 assert(predicate != null);
2132 assert(procedure != null);
2134 final ListenerBase listener = getListenerBase(procedure);
2136 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2138 boolean first = true;
2141 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2144 procedure.execute(graph, querySupport.getStatement(s, p, o));
2146 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2148 } catch (Throwable t2) {
2149 Logger.defaultLogError(t2);
2154 public void finished(ReadGraphImpl graph) {
2159 procedure.finished(graph);
2160 // impl.state.barrier.dec(this);
2162 procedure.finished(impl.newRestart(graph));
2164 } catch (Throwable t2) {
2165 Logger.defaultLogError(t2);
2171 public void exception(ReadGraphImpl graph, Throwable t) {
2176 procedure.exception(graph, t);
2177 // impl.state.barrier.dec(this);
2179 procedure.exception(impl.newRestart(graph), t);
2181 } catch (Throwable t2) {
2182 Logger.defaultLogError(t2);
2189 int sId = querySupport.getId(subject);
2190 int pId = querySupport.getId(predicate);
2192 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2193 // else impl.state.barrier.inc(null, null);
2196 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2197 } catch (DatabaseException e) {
2198 Logger.defaultLogError(e);
2204 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2205 final Resource predicate, final StatementProcedure procedure) {
2207 assert(subject != null);
2208 assert(predicate != null);
2209 assert(procedure != null);
2211 final ListenerBase listener = getListenerBase(procedure);
2213 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2215 boolean first = true;
2218 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2221 procedure.execute(graph, s, p, o);
2223 procedure.execute(impl.newRestart(graph), s, p, o);
2225 } catch (Throwable t2) {
2226 Logger.defaultLogError(t2);
2231 public void finished(ReadGraphImpl graph) {
2236 procedure.finished(graph);
2237 // impl.state.barrier.dec(this);
2239 procedure.finished(impl.newRestart(graph));
2241 } catch (Throwable t2) {
2242 Logger.defaultLogError(t2);
2248 public void exception(ReadGraphImpl graph, Throwable t) {
2253 procedure.exception(graph, t);
2254 // impl.state.barrier.dec(this);
2256 procedure.exception(impl.newRestart(graph), t);
2258 } catch (Throwable t2) {
2259 Logger.defaultLogError(t2);
2266 int sId = querySupport.getId(subject);
2267 int pId = querySupport.getId(predicate);
2269 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2270 // else impl.state.barrier.inc(null, null);
2273 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2274 } catch (DatabaseException e) {
2275 Logger.defaultLogError(e);
2281 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2283 assert(subject != null);
2284 assert(predicate != null);
2285 assert(procedure != null);
2287 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2289 private Set<Statement> current = null;
2290 private Set<Statement> run = new HashSet<Statement>();
2293 public void execute(AsyncReadGraph graph, Statement result) {
2295 boolean found = false;
2297 if(current != null) {
2299 found = current.remove(result);
2303 if(!found) procedure.add(graph, result);
2310 public void finished(AsyncReadGraph graph) {
2312 if(current != null) {
2313 for(Statement r : current) procedure.remove(graph, r);
2318 run = new HashSet<Statement>();
2323 public void exception(AsyncReadGraph graph, Throwable t) {
2324 procedure.exception(graph, t);
2328 public boolean isDisposed() {
2329 return procedure.isDisposed();
2337 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2338 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2340 assert(subject != null);
2341 assert(predicate != null);
2342 assert(procedure != null);
2344 final ListenerBase listener = getListenerBase(procedure);
2346 // impl.state.barrier.inc();
2349 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2352 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2354 procedure.execute(graph, querySupport.getStatement(s, p, o));
2355 } catch (Throwable t2) {
2356 Logger.defaultLogError(t2);
2361 public void finished(ReadGraphImpl graph) {
2363 procedure.finished(graph);
2364 } catch (Throwable t2) {
2365 Logger.defaultLogError(t2);
2367 // impl.state.barrier.dec();
2371 public void exception(ReadGraphImpl graph, Throwable t) {
2373 procedure.exception(graph, t);
2374 } catch (Throwable t2) {
2375 Logger.defaultLogError(t2);
2377 // impl.state.barrier.dec();
2381 } catch (DatabaseException e) {
2382 Logger.defaultLogError(e);
2387 private static ListenerBase getListenerBase(Object procedure) {
2388 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2393 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2395 assert(subject != null);
2396 assert(predicate != null);
2397 assert(procedure != null);
2399 final ListenerBase listener = getListenerBase(procedure);
2401 // impl.state.barrier.inc();
2404 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2407 public void execute(ReadGraphImpl graph, int i) {
2409 procedure.execute(querySupport.getResource(i));
2410 } catch (Throwable t2) {
2411 Logger.defaultLogError(t2);
2416 public void finished(ReadGraphImpl graph) {
2418 procedure.finished();
2419 } catch (Throwable t2) {
2420 Logger.defaultLogError(t2);
2422 // impl.state.barrier.dec();
2426 public void exception(ReadGraphImpl graph, Throwable t) {
2427 System.out.println("forEachObject exception " + t);
2429 procedure.exception(t);
2430 } catch (Throwable t2) {
2431 Logger.defaultLogError(t2);
2433 // impl.state.barrier.dec();
2437 } catch (DatabaseException e) {
2438 Logger.defaultLogError(e);
2444 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
2446 assert(subject != null);
2447 assert(procedure != null);
2449 final ListenerBase listener = getListenerBase(procedure);
2451 int sId = querySupport.getId(subject);
2454 QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
2457 public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
2458 procedure.execute(graph, result);
2462 public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
2463 procedure.exception(graph, throwable);
2467 } catch (DatabaseException e) {
2468 Logger.defaultLogError(e);
2473 final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
2475 // assert(subject != null);
2476 // assert(procedure != null);
2478 // final ListenerBase listener = getListenerBase(procedure);
2480 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2482 return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
2487 // final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2489 // assert(subject != null);
2490 // assert(procedure != null);
2492 // final ListenerBase listener = getListenerBase(procedure);
2494 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2498 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2501 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2503 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2505 private Resource single = null;
2508 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2509 if(single == null) {
2512 single = INVALID_RESOURCE;
2517 public synchronized void finished(AsyncReadGraph graph) {
2518 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2519 else procedure.execute(graph, single);
2523 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2524 procedure.exception(graph, throwable);
2531 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2533 final int sId = querySupport.getId(subject);
2534 final int pId = querySupport.getId(predicate);
2537 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2538 } catch (DatabaseException e) {
2539 Logger.defaultLogError(e);
2544 static class Runner2Procedure implements IntProcedure {
2546 public int single = 0;
2547 public Throwable t = null;
2549 public void clear() {
2555 public void execute(ReadGraphImpl graph, int i) {
2556 if(single == 0) single = i;
2561 public void finished(ReadGraphImpl graph) {
2562 if(single == -1) single = 0;
2566 public void exception(ReadGraphImpl graph, Throwable throwable) {
2571 public int get() throws DatabaseException {
2573 if(t instanceof DatabaseException) throw (DatabaseException)t;
2574 else throw new DatabaseException(t);
2581 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2583 final int sId = querySupport.getId(subject);
2584 final int pId = querySupport.getId(predicate);
2586 Runner2Procedure proc = new Runner2Procedure();
2587 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2592 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2594 assert(subject != null);
2595 assert(predicate != null);
2597 final ListenerBase listener = getListenerBase(procedure);
2599 if(impl.parent != null || listener != null) {
2601 IntProcedure ip = new IntProcedure() {
2603 AtomicBoolean first = new AtomicBoolean(true);
2606 public void execute(ReadGraphImpl graph, int i) {
2609 procedure.execute(impl, querySupport.getResource(i));
2611 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2613 } catch (Throwable t2) {
2614 Logger.defaultLogError(t2);
2620 public void finished(ReadGraphImpl graph) {
2622 if(first.compareAndSet(true, false)) {
2623 procedure.finished(impl);
2624 // impl.state.barrier.dec(this);
2626 procedure.finished(impl.newRestart(graph));
2628 } catch (Throwable t2) {
2629 Logger.defaultLogError(t2);
2634 public void exception(ReadGraphImpl graph, Throwable t) {
2636 procedure.exception(graph, t);
2637 } catch (Throwable t2) {
2638 Logger.defaultLogError(t2);
2640 // impl.state.barrier.dec(this);
2644 public String toString() {
2645 return "forEachObject with " + procedure;
2650 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2651 // else impl.state.barrier.inc(null, null);
2653 forEachObject(impl, subject, predicate, listener, ip);
2657 IntProcedure ip = new IntProcedure() {
2660 public void execute(ReadGraphImpl graph, int i) {
2661 procedure.execute(graph, querySupport.getResource(i));
2665 public void finished(ReadGraphImpl graph) {
2666 procedure.finished(graph);
2670 public void exception(ReadGraphImpl graph, Throwable t) {
2671 procedure.exception(graph, t);
2675 public String toString() {
2676 return "forEachObject with " + procedure;
2681 forEachObject(impl, subject, predicate, listener, ip);
2688 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2690 assert(subject != null);
2691 assert(predicate != null);
2692 assert(procedure != null);
2694 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2696 private Set<Resource> current = null;
2697 private Set<Resource> run = new HashSet<Resource>();
2700 public void execute(AsyncReadGraph graph, Resource result) {
2702 boolean found = false;
2704 if(current != null) {
2706 found = current.remove(result);
2710 if(!found) procedure.add(graph, result);
2717 public void finished(AsyncReadGraph graph) {
2719 if(current != null) {
2720 for(Resource r : current) procedure.remove(graph, r);
2725 run = new HashSet<Resource>();
2730 public boolean isDisposed() {
2731 return procedure.isDisposed();
2735 public void exception(AsyncReadGraph graph, Throwable t) {
2736 procedure.exception(graph, t);
2740 public String toString() {
2741 return "forObjectSet " + procedure;
2749 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
2751 assert(subject != null);
2752 assert(procedure != null);
2754 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
2756 private Set<Resource> current = null;
2757 private Set<Resource> run = new HashSet<Resource>();
2760 public void execute(AsyncReadGraph graph, Resource result) {
2762 boolean found = false;
2764 if(current != null) {
2766 found = current.remove(result);
2770 if(!found) procedure.add(graph, result);
2777 public void finished(AsyncReadGraph graph) {
2779 if(current != null) {
2780 for(Resource r : current) procedure.remove(graph, r);
2785 run = new HashSet<Resource>();
2790 public boolean isDisposed() {
2791 return procedure.isDisposed();
2795 public void exception(AsyncReadGraph graph, Throwable t) {
2796 procedure.exception(graph, t);
2800 public String toString() {
2801 return "forPredicateSet " + procedure;
2809 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
2811 assert(subject != null);
2812 assert(procedure != null);
2814 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
2816 private Set<Resource> current = null;
2817 private Set<Resource> run = new HashSet<Resource>();
2820 public void execute(AsyncReadGraph graph, Resource result) {
2822 boolean found = false;
2824 if(current != null) {
2826 found = current.remove(result);
2830 if(!found) procedure.add(graph, result);
2837 public void finished(AsyncReadGraph graph) {
2839 if(current != null) {
2840 for(Resource r : current) procedure.remove(graph, r);
2845 run = new HashSet<Resource>();
2850 public boolean isDisposed() {
2851 return procedure.isDisposed();
2855 public void exception(AsyncReadGraph graph, Throwable t) {
2856 procedure.exception(graph, t);
2860 public String toString() {
2861 return "forPrincipalTypeSet " + procedure;
2869 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2871 assert(subject != null);
2872 assert(predicate != null);
2873 assert(procedure != null);
2875 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2877 private Set<Resource> current = null;
2878 private Set<Resource> run = new HashSet<Resource>();
2881 public void execute(AsyncReadGraph graph, Resource result) {
2883 boolean found = false;
2885 if(current != null) {
2887 found = current.remove(result);
2891 if(!found) procedure.add(graph, result);
2898 public void finished(AsyncReadGraph graph) {
2900 if(current != null) {
2901 for(Resource r : current) procedure.remove(graph, r);
2906 run = new HashSet<Resource>();
2911 public boolean isDisposed() {
2912 return procedure.isDisposed();
2916 public void exception(AsyncReadGraph graph, Throwable t) {
2917 procedure.exception(graph, t);
2921 public String toString() {
2922 return "forObjectSet " + procedure;
2930 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2932 assert(subject != null);
2933 assert(predicate != null);
2934 assert(procedure != null);
2936 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2938 private Set<Statement> current = null;
2939 private Set<Statement> run = new HashSet<Statement>();
2942 public void execute(AsyncReadGraph graph, Statement result) {
2944 boolean found = false;
2946 if(current != null) {
2948 found = current.remove(result);
2952 if(!found) procedure.add(graph, result);
2959 public void finished(AsyncReadGraph graph) {
2961 if(current != null) {
2962 for(Statement s : current) procedure.remove(graph, s);
2967 run = new HashSet<Statement>();
2972 public boolean isDisposed() {
2973 return procedure.isDisposed();
2977 public void exception(AsyncReadGraph graph, Throwable t) {
2978 procedure.exception(graph, t);
2982 public String toString() {
2983 return "forStatementSet " + procedure;
2991 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
2992 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2994 assert(subject != null);
2995 assert(predicate != null);
2996 assert(procedure != null);
2998 final ListenerBase listener = getListenerBase(procedure);
3001 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3004 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3006 procedure.execute(graph, querySupport.getResource(o));
3007 } catch (Throwable t2) {
3008 Logger.defaultLogError(t2);
3013 public void finished(ReadGraphImpl graph) {
3015 procedure.finished(graph);
3016 } catch (Throwable t2) {
3017 Logger.defaultLogError(t2);
3019 // impl.state.barrier.dec();
3023 public void exception(ReadGraphImpl graph, Throwable t) {
3025 procedure.exception(graph, t);
3026 } catch (Throwable t2) {
3027 Logger.defaultLogError(t2);
3029 // impl.state.barrier.dec();
3033 } catch (DatabaseException e) {
3034 Logger.defaultLogError(e);
3040 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3042 assert(subject != null);
3043 assert(procedure != null);
3045 final ListenerBase listener = getListenerBase(procedure);
3047 IntProcedure ip = new IntProcedure() {
3050 public void execute(ReadGraphImpl graph, int i) {
3052 procedure.execute(graph, querySupport.getResource(i));
3053 } catch (Throwable t2) {
3054 Logger.defaultLogError(t2);
3059 public void finished(ReadGraphImpl graph) {
3061 procedure.finished(graph);
3062 } catch (Throwable t2) {
3063 Logger.defaultLogError(t2);
3065 // impl.state.barrier.dec(this);
3069 public void exception(ReadGraphImpl graph, Throwable t) {
3071 procedure.exception(graph, t);
3072 } catch (Throwable t2) {
3073 Logger.defaultLogError(t2);
3075 // impl.state.barrier.dec(this);
3080 int sId = querySupport.getId(subject);
3082 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3083 // else impl.state.barrier.inc(null, null);
3086 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3087 } catch (DatabaseException e) {
3088 Logger.defaultLogError(e);
3094 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3096 assert(subject != null);
3097 assert(procedure != null);
3099 final ListenerBase listener = getListenerBase(procedure);
3101 // impl.state.barrier.inc();
3104 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3107 public void execute(ReadGraphImpl graph, int i) {
3109 procedure.execute(querySupport.getResource(i));
3110 } catch (Throwable t2) {
3111 Logger.defaultLogError(t2);
3116 public void finished(ReadGraphImpl graph) {
3118 procedure.finished();
3119 } catch (Throwable t2) {
3120 Logger.defaultLogError(t2);
3122 // impl.state.barrier.dec();
3126 public void exception(ReadGraphImpl graph, Throwable t) {
3128 procedure.exception(t);
3129 } catch (Throwable t2) {
3130 Logger.defaultLogError(t2);
3132 // impl.state.barrier.dec();
3136 } catch (DatabaseException e) {
3137 Logger.defaultLogError(e);
3141 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3143 assert(subject != null);
3144 assert(procedure != null);
3146 final ListenerBase listener = getListenerBase(procedure);
3147 assert(listener == null);
3149 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3152 public void execute(final ReadGraphImpl graph, IntSet set) {
3153 procedure.execute(graph, set);
3157 public void exception(ReadGraphImpl graph, Throwable t) {
3158 procedure.exception(graph, t);
3163 int sId = querySupport.getId(subject);
3166 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3167 } catch (DatabaseException e) {
3168 Logger.defaultLogError(e);
3174 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3176 assert(subject != null);
3178 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3183 final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3185 assert(subject != null);
3187 return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null);
3192 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3194 assert(subject != null);
3195 assert(procedure != null);
3197 final ListenerBase listener = getListenerBase(procedure);
3200 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3202 AtomicBoolean first = new AtomicBoolean(true);
3205 public void execute(final ReadGraphImpl graph, IntSet set) {
3206 // final HashSet<Resource> result = new HashSet<Resource>();
3207 // set.forEach(new TIntProcedure() {
3210 // public boolean execute(int type) {
3211 // result.add(querySupport.getResource(type));
3217 if(first.compareAndSet(true, false)) {
3218 procedure.execute(graph, set);
3219 // impl.state.barrier.dec();
3221 procedure.execute(impl.newRestart(graph), set);
3223 } catch (Throwable t2) {
3224 Logger.defaultLogError(t2);
3229 public void exception(ReadGraphImpl graph, Throwable t) {
3231 if(first.compareAndSet(true, false)) {
3232 procedure.exception(graph, t);
3233 // impl.state.barrier.dec();
3235 procedure.exception(impl.newRestart(graph), t);
3237 } catch (Throwable t2) {
3238 Logger.defaultLogError(t2);
3243 } catch (DatabaseException e) {
3244 Logger.defaultLogError(e);
3250 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3252 assert(subject != null);
3253 assert(procedure != null);
3255 final ListenerBase listener = getListenerBase(procedure);
3257 IntProcedure ip = new IntProcedureAdapter() {
3260 public void execute(final ReadGraphImpl graph, int superRelation) {
3262 procedure.execute(graph, querySupport.getResource(superRelation));
3263 } catch (Throwable t2) {
3264 Logger.defaultLogError(t2);
3269 public void finished(final ReadGraphImpl graph) {
3271 procedure.finished(graph);
3272 } catch (Throwable t2) {
3273 Logger.defaultLogError(t2);
3275 // impl.state.barrier.dec(this);
3280 public void exception(ReadGraphImpl graph, Throwable t) {
3282 procedure.exception(graph, t);
3283 } catch (Throwable t2) {
3284 Logger.defaultLogError(t2);
3286 // impl.state.barrier.dec(this);
3291 int sId = querySupport.getId(subject);
3293 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3294 // else impl.state.barrier.inc(null, null);
3297 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3298 } catch (DatabaseException e) {
3299 Logger.defaultLogError(e);
3302 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3307 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3309 assert(subject != null);
3310 assert(procedure != null);
3312 final ListenerBase listener = getListenerBase(procedure);
3314 // impl.state.barrier.inc();
3316 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3321 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3323 assert(subject != null);
3324 assert(procedure != null);
3326 final ListenerBase listener = getListenerBase(procedure);
3328 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3331 public void execute(final ReadGraphImpl graph, IntSet set) {
3332 // final HashSet<Resource> result = new HashSet<Resource>();
3333 // set.forEach(new TIntProcedure() {
3336 // public boolean execute(int type) {
3337 // result.add(querySupport.getResource(type));
3343 procedure.execute(graph, set);
3344 } catch (Throwable t2) {
3345 Logger.defaultLogError(t2);
3347 // impl.state.barrier.dec(this);
3351 public void exception(ReadGraphImpl graph, Throwable t) {
3353 procedure.exception(graph, t);
3354 } catch (Throwable t2) {
3355 Logger.defaultLogError(t2);
3357 // impl.state.barrier.dec(this);
3362 int sId = querySupport.getId(subject);
3364 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3365 // else impl.state.barrier.inc(null, null);
3368 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3369 } catch (DatabaseException e) {
3370 Logger.defaultLogError(e);
3375 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3376 return getValue(impl, querySupport.getId(subject));
3379 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3380 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3384 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3386 assert(subject != null);
3387 assert(procedure != null);
3389 int sId = querySupport.getId(subject);
3391 // if(procedure != null) {
3393 final ListenerBase listener = getListenerBase(procedure);
3395 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3397 AtomicBoolean first = new AtomicBoolean(true);
3400 public void execute(ReadGraphImpl graph, byte[] result) {
3402 if(first.compareAndSet(true, false)) {
3403 procedure.execute(graph, result);
3404 // impl.state.barrier.dec(this);
3406 procedure.execute(impl.newRestart(graph), result);
3408 } catch (Throwable t2) {
3409 Logger.defaultLogError(t2);
3414 public void exception(ReadGraphImpl graph, Throwable t) {
3416 if(first.compareAndSet(true, false)) {
3417 procedure.exception(graph, t);
3418 // impl.state.barrier.dec(this);
3420 procedure.exception(impl.newRestart(graph), t);
3422 } catch (Throwable t2) {
3423 Logger.defaultLogError(t2);
3429 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3430 // else impl.state.barrier.inc(null, null);
3433 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3434 } catch (DatabaseException e) {
3435 throw new IllegalStateException("Internal error");
3440 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3444 // throw new IllegalStateException("Internal error");
3449 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3451 assert(subject != null);
3452 assert(procedure != null);
3454 final ListenerBase listener = getListenerBase(procedure);
3456 if(impl.parent != null || listener != null) {
3458 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3460 AtomicBoolean first = new AtomicBoolean(true);
3463 public void execute(ReadGraphImpl graph, byte[] result) {
3465 if(first.compareAndSet(true, false)) {
3466 procedure.execute(graph, result);
3467 // impl.state.barrier.dec(this);
3469 procedure.execute(impl.newRestart(graph), result);
3471 } catch (Throwable t2) {
3472 Logger.defaultLogError(t2);
3477 public void exception(ReadGraphImpl graph, Throwable t) {
3479 if(first.compareAndSet(true, false)) {
3480 procedure.exception(graph, t);
3481 // impl.state.barrier.dec(this);
3483 procedure.exception(impl.newRestart(graph), t);
3485 } catch (Throwable t2) {
3486 Logger.defaultLogError(t2);
3492 int sId = querySupport.getId(subject);
3494 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3495 // else impl.state.barrier.inc(null, null);
3498 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3499 } catch (DatabaseException e) {
3500 Logger.defaultLogError(e);
3505 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3508 public void execute(ReadGraphImpl graph, byte[] result) {
3510 procedure.execute(graph, result);
3515 public void exception(ReadGraphImpl graph, Throwable t) {
3517 procedure.exception(graph, t);
3523 int sId = querySupport.getId(subject);
3526 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3527 } catch (DatabaseException e) {
3528 Logger.defaultLogError(e);
3536 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3538 assert(relation != null);
3539 assert(procedure != null);
3541 final ListenerBase listener = getListenerBase(procedure);
3543 IntProcedure ip = new IntProcedure() {
3545 private int result = 0;
3547 final AtomicBoolean found = new AtomicBoolean(false);
3548 final AtomicBoolean done = new AtomicBoolean(false);
3551 public void finished(ReadGraphImpl graph) {
3553 // Shall fire exactly once!
3554 if(done.compareAndSet(false, true)) {
3557 procedure.exception(graph, new NoInverseException(""));
3558 // impl.state.barrier.dec(this);
3560 procedure.execute(graph, querySupport.getResource(result));
3561 // impl.state.barrier.dec(this);
3563 } catch (Throwable t) {
3564 Logger.defaultLogError(t);
3571 public void execute(ReadGraphImpl graph, int i) {
3573 if(found.compareAndSet(false, true)) {
3576 // Shall fire exactly once!
3577 if(done.compareAndSet(false, true)) {
3579 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3580 // impl.state.barrier.dec(this);
3581 } catch (Throwable t) {
3582 Logger.defaultLogError(t);
3590 public void exception(ReadGraphImpl graph, Throwable t) {
3591 // Shall fire exactly once!
3592 if(done.compareAndSet(false, true)) {
3594 procedure.exception(graph, t);
3595 // impl.state.barrier.dec(this);
3596 } catch (Throwable t2) {
3597 Logger.defaultLogError(t2);
3604 int sId = querySupport.getId(relation);
3606 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3607 // else impl.state.barrier.inc(null, null);
3610 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3611 } catch (DatabaseException e) {
3612 Logger.defaultLogError(e);
3618 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3621 assert(procedure != null);
3623 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3626 public void execute(ReadGraphImpl graph, Integer result) {
3628 procedure.execute(graph, querySupport.getResource(result));
3629 } catch (Throwable t2) {
3630 Logger.defaultLogError(t2);
3632 // impl.state.barrier.dec(this);
3636 public void exception(ReadGraphImpl graph, Throwable t) {
3639 procedure.exception(graph, t);
3640 } catch (Throwable t2) {
3641 Logger.defaultLogError(t2);
3643 // impl.state.barrier.dec(this);
3648 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3649 // else impl.state.barrier.inc(null, null);
3651 forResource(impl, id, impl.parent, ip);
3656 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3659 assert(procedure != null);
3661 // impl.state.barrier.inc();
3664 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
3667 public void execute(ReadGraphImpl graph, Integer result) {
3669 procedure.execute(graph, querySupport.getResource(result));
3670 } catch (Throwable t2) {
3671 Logger.defaultLogError(t2);
3673 // impl.state.barrier.dec();
3677 public void exception(ReadGraphImpl graph, Throwable t) {
3679 procedure.exception(graph, t);
3680 } catch (Throwable t2) {
3681 Logger.defaultLogError(t2);
3683 // impl.state.barrier.dec();
3687 } catch (DatabaseException e) {
3688 Logger.defaultLogError(e);
3694 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
3696 assert(subject != null);
3697 assert(procedure != null);
3699 final ListenerBase listener = getListenerBase(procedure);
3702 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
3703 procedure.execute(impl, !result.isEmpty());
3704 } catch (DatabaseException e) {
3705 procedure.exception(impl, e);
3711 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
3713 assert(subject != null);
3714 assert(predicate != null);
3715 assert(procedure != null);
3717 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
3719 boolean found = false;
3722 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
3727 synchronized public void finished(AsyncReadGraph graph) {
3729 procedure.execute(graph, found);
3730 } catch (Throwable t2) {
3731 Logger.defaultLogError(t2);
3733 // impl.state.barrier.dec(this);
3737 public void exception(AsyncReadGraph graph, Throwable t) {
3739 procedure.exception(graph, t);
3740 } catch (Throwable t2) {
3741 Logger.defaultLogError(t2);
3743 // impl.state.barrier.dec(this);
3748 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
3749 // else impl.state.barrier.inc(null, null);
3751 forEachObject(impl, subject, predicate, ip);
3756 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
3758 assert(subject != null);
3759 assert(predicate != null);
3760 assert(procedure != null);
3762 // impl.state.barrier.inc();
3764 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
3766 boolean found = false;
3769 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
3770 if(resource.equals(object)) found = true;
3774 synchronized public void finished(AsyncReadGraph graph) {
3776 procedure.execute(graph, found);
3777 } catch (Throwable t2) {
3778 Logger.defaultLogError(t2);
3780 // impl.state.barrier.dec();
3784 public void exception(AsyncReadGraph graph, Throwable t) {
3786 procedure.exception(graph, t);
3787 } catch (Throwable t2) {
3788 Logger.defaultLogError(t2);
3790 // impl.state.barrier.dec();
3798 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
3800 assert(subject != null);
3801 assert(procedure != null);
3803 final ListenerBase listener = getListenerBase(procedure);
3805 // impl.state.barrier.inc();
3808 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
3811 public void execute(ReadGraphImpl graph, byte[] object) {
3812 boolean result = object != null;
3814 procedure.execute(graph, result);
3815 } catch (Throwable t2) {
3816 Logger.defaultLogError(t2);
3818 // impl.state.barrier.dec();
3822 public void exception(ReadGraphImpl graph, Throwable t) {
3824 procedure.exception(graph, t);
3825 } catch (Throwable t2) {
3826 Logger.defaultLogError(t2);
3828 // impl.state.barrier.dec();
3832 } catch (DatabaseException e) {
3833 Logger.defaultLogError(e);
3839 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3841 assert(subject != null);
3842 assert(procedure != null);
3844 final ListenerBase listener = getListenerBase(procedure);
3848 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3851 public void exception(ReadGraphImpl graph, Throwable t) {
3853 procedure.exception(graph, t);
3854 } catch (Throwable t2) {
3855 Logger.defaultLogError(t2);
3857 // impl.state.barrier.dec();
3861 public void execute(ReadGraphImpl graph, int i) {
3863 procedure.execute(graph, querySupport.getResource(i));
3864 } catch (Throwable t2) {
3865 Logger.defaultLogError(t2);
3870 public void finished(ReadGraphImpl graph) {
3872 procedure.finished(graph);
3873 } catch (Throwable t2) {
3874 Logger.defaultLogError(t2);
3876 // impl.state.barrier.dec();
3880 } catch (DatabaseException e) {
3881 Logger.defaultLogError(e);
3887 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
3889 // assert(request != null);
3890 // assert(procedure != null);
3892 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
3897 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
3899 // assert(graph != null);
3900 // assert(request != null);
3902 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
3903 // if(entry != null && entry.isReady()) {
3904 // return (T)entry.get(graph, this, null);
3906 // return request.perform(graph);
3911 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
3913 // assert(graph != null);
3914 // assert(request != null);
3916 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
3917 // if(entry != null && entry.isReady()) {
3918 // if(entry.isExcepted()) {
3919 // Throwable t = (Throwable)entry.getResult();
3920 // if(t instanceof DatabaseException) throw (DatabaseException)t;
3921 // else throw new DatabaseException(t);
3923 // return (T)entry.getResult();
3927 // final DataContainer<T> result = new DataContainer<T>();
3928 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3930 // request.register(graph, new Listener<T>() {
3933 // public void exception(Throwable t) {
3934 // exception.set(t);
3938 // public void execute(T t) {
3943 // public boolean isDisposed() {
3949 // Throwable t = exception.get();
3951 // if(t instanceof DatabaseException) throw (DatabaseException)t;
3952 // else throw new DatabaseException(t);
3955 // return result.get();
3962 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
3964 // assert(graph != null);
3965 // assert(request != null);
3967 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
3968 // if(entry != null && entry.isReady()) {
3969 // if(entry.isExcepted()) {
3970 // procedure.exception(graph, (Throwable)entry.getResult());
3972 // procedure.execute(graph, (T)entry.getResult());
3975 // request.perform(graph, procedure);
3981 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final SyncMultiProcedure<T> procedure, ListenerBase listener) {
3983 assert(request != null);
3984 assert(procedure != null);
3988 queryMultiRead(impl, request, parent, listener, procedure);
3990 } catch (DatabaseException e) {
3992 throw new IllegalStateException(e);
3999 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4001 assert(request != null);
4002 assert(procedure != null);
4004 // impl.state.barrier.inc();
4006 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4008 public void execute(AsyncReadGraph graph, T result) {
4011 procedure.execute(graph, result);
4012 } catch (Throwable t2) {
4013 Logger.defaultLogError(t2);
4018 public void finished(AsyncReadGraph graph) {
4021 procedure.finished(graph);
4022 } catch (Throwable t2) {
4023 Logger.defaultLogError(t2);
4026 // impl.state.barrier.dec();
4031 public String toString() {
4032 return procedure.toString();
4036 public void exception(AsyncReadGraph graph, Throwable t) {
4039 procedure.exception(graph, t);
4040 } catch (Throwable t2) {
4041 Logger.defaultLogError(t2);
4044 // impl.state.barrier.dec();
4053 // final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
4055 // assert(request != null);
4056 // assert(procedure != null);
4060 // queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4063 // public String toString() {
4064 // return procedure.toString();
4068 // public void execute(AsyncReadGraph graph, T result) {
4070 // procedure.execute(result);
4071 // } catch (Throwable t2) {
4072 // Logger.defaultLogError(t2);
4077 // public void exception(AsyncReadGraph graph, Throwable throwable) {
4079 // procedure.exception(throwable);
4080 // } catch (Throwable t2) {
4081 // Logger.defaultLogError(t2);
4087 // } catch (DatabaseException e) {
4089 // throw new IllegalStateException(e);
4096 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4098 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4103 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4105 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4110 public VirtualGraph getValueProvider(Resource subject) {
4112 return querySupport.getValueProvider(querySupport.getId(subject));
4116 public boolean resumeTasks(ReadGraphImpl graph) {
4118 return querySupport.resume(graph);
4122 public boolean isImmutable(int resourceId) {
4123 return querySupport.isImmutable(resourceId);
4126 public boolean isImmutable(Resource resource) {
4127 ResourceImpl impl = (ResourceImpl)resource;
4128 return isImmutable(impl.id);
4133 public Layer0 getL0(ReadGraph graph) {
4135 L0 = Layer0.getInstance(graph);
4140 public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
4141 protected Integer initialValue() {