1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
35 import org.simantics.databoard.Bindings;
36 import org.simantics.db.AsyncReadGraph;
37 import org.simantics.db.DevelopmentKeys;
38 import org.simantics.db.DirectStatements;
39 import org.simantics.db.ReadGraph;
40 import org.simantics.db.RelationInfo;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
46 import org.simantics.db.common.utils.Logger;
47 import org.simantics.db.exception.DatabaseException;
48 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
49 import org.simantics.db.exception.NoInverseException;
50 import org.simantics.db.exception.ResourceNotFoundException;
51 import org.simantics.db.impl.ResourceImpl;
52 import org.simantics.db.impl.graph.BarrierTracing;
53 import org.simantics.db.impl.graph.ReadGraphImpl;
54 import org.simantics.db.impl.graph.ReadGraphSupport;
55 import org.simantics.db.impl.procedure.IntProcedureAdapter;
56 import org.simantics.db.impl.procedure.InternalProcedure;
57 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
58 import org.simantics.db.impl.support.ResourceSupport;
59 import org.simantics.db.procedure.AsyncMultiListener;
60 import org.simantics.db.procedure.AsyncMultiProcedure;
61 import org.simantics.db.procedure.AsyncProcedure;
62 import org.simantics.db.procedure.AsyncSetListener;
63 import org.simantics.db.procedure.ListenerBase;
64 import org.simantics.db.procedure.MultiProcedure;
65 import org.simantics.db.procedure.StatementProcedure;
66 import org.simantics.db.procedure.SyncMultiProcedure;
67 import org.simantics.db.request.AsyncMultiRead;
68 import org.simantics.db.request.ExternalRead;
69 import org.simantics.db.request.MultiRead;
70 import org.simantics.db.request.RequestFlags;
71 import org.simantics.layer0.Layer0;
72 import org.simantics.utils.DataContainer;
73 import org.simantics.utils.Development;
74 import org.simantics.utils.datastructures.Pair;
75 import org.simantics.utils.datastructures.collections.CollectionUtils;
76 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
78 import gnu.trove.procedure.TIntProcedure;
79 import gnu.trove.procedure.TLongProcedure;
80 import gnu.trove.procedure.TObjectProcedure;
81 import gnu.trove.set.hash.THashSet;
82 import gnu.trove.set.hash.TIntHashSet;
84 @SuppressWarnings({"rawtypes", "unchecked"})
85 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
87 public static int indent = 0;
92 public int boundQueries = 0;
95 final private int functionalRelation;
97 final private int superrelationOf;
99 final private int instanceOf;
101 final private int inverseOf;
103 final private int asserts;
105 final private int hasPredicate;
107 final private int hasPredicateInverse;
109 final private int hasObject;
111 final private int inherits;
113 final private int subrelationOf;
115 final private int rootLibrary;
118 * A cache for the root library resource. Initialized in
119 * {@link #getRootLibraryResource()}.
121 private volatile ResourceImpl rootLibraryResource;
123 final private int library;
125 final private int consistsOf;
127 final private int hasName;
129 AtomicInteger sleepers = new AtomicInteger(0);
131 boolean updating = false;
134 final public QueryCache cache;
135 final public QuerySupport querySupport;
136 final public Session session;
137 final public ResourceSupport resourceSupport;
139 final public Semaphore requests = new Semaphore(1);
141 final public QueryListening listening = new QueryListening(this);
143 QueryThread[] executors;
145 public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
149 INIT, RUN, SLEEP, DISPOSED
153 public ThreadState[] threadStates;
155 final Object querySupportLock;
157 public Long modificationCounter = 0L;
159 public void close() {
162 public SessionTask getSubTask(ReadGraphImpl impl) {
163 synchronized(querySupportLock) {
165 while(index < freeScheduling.size()) {
166 SessionTask task = freeScheduling.get(index);
167 if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) {
168 queueLength.decrementAndGet();
169 return freeScheduling.remove(index);
178 * We are running errands while waiting for requests to complete.
179 * We can only run work that is part of the current root request to avoid any deadlocks
181 public boolean performPending(ReadGraphImpl graph) {
182 SessionTask task = getSubTask(graph);
184 task.run(QueryProcessor.thread.get());
190 final public void scheduleNow(SessionTask request) {
191 schedule(request, false);
194 final public void scheduleLater(SessionTask request) {
195 schedule(request, true);
198 AtomicInteger queueLength = new AtomicInteger(0);
200 final public void schedule(SessionTask request, boolean late) {
202 int queueLengthEstimate = queueLength.get();
203 if(!late && queueLengthEstimate > 80) {
204 request.run(thread.get());
208 assert(request != null);
210 synchronized(querySupportLock) {
212 if(BarrierTracing.BOOKKEEPING) {
213 Exception current = new Exception();
214 Exception previous = BarrierTracing.tasks.put(request, current);
215 if(previous != null) {
216 previous.printStackTrace();
217 current.printStackTrace();
222 int pos = request.position - 1;
223 if(pos < freeScheduling.size()) {
224 freeScheduling.add(pos, request);
225 queueLength.incrementAndGet();
228 freeScheduling.addLast(request);
229 queueLength.incrementAndGet();
234 if(request.getLevel() < 4) {
235 if(freeScheduling.size() < 100) {
236 freeScheduling.addFirst(request);
237 queueLength.incrementAndGet();
240 request.run(thread.get());
243 if(freeScheduling.size() < 20) {
244 freeScheduling.addFirst(request);
245 queueLength.incrementAndGet();
248 request.run(thread.get());
260 final public int THREAD_MASK;
262 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
264 public static abstract class SessionTask {
266 public final ReadGraphImpl graph;
267 private int counter = 0;
268 protected int position = 1;
269 private Exception trace;
271 public SessionTask(ReadGraphImpl graph) {
273 if(graph != null) graph.asyncBarrier.inc();
276 public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) {
277 if(r1 == null || r2 == null) return false;
278 return r1.getTopLevelGraph() == r2.getTopLevelGraph();
281 public abstract void run0(int thread);
283 public final void run(int thread) {
285 if(BarrierTracing.BOOKKEEPING) {
286 trace.printStackTrace();
287 new Exception().printStackTrace();
289 throw new IllegalStateException("Multiple invocations of SessionTask!");
291 if(BarrierTracing.BOOKKEEPING) {
292 trace = new Exception();
295 if(graph != null) graph.asyncBarrier.dec();
298 public boolean maybeReady() {
303 public String toString() {
305 return "SessionTask[no graph]";
307 return "SessionTask[" + graph.parent + "]";
310 public int getLevel() {
311 if(graph == null) return 0;
312 else return graph.getLevel();
317 public static abstract class SessionRead extends SessionTask {
319 final public Semaphore notify;
320 final public DataContainer<Throwable> throwable;
322 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
324 this.throwable = throwable;
325 this.notify = notify;
330 long waitingTime = 0;
333 static int koss2 = 0;
335 public boolean resume(ReadGraphImpl graph) {
336 return executors[0].runSynchronized();
339 //private WeakReference<GarbageTracker> garbageTracker;
341 private class GarbageTracker {
344 protected void finalize() throws Throwable {
346 // System.err.println("GarbageTracker");
348 // garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
356 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
357 throws DatabaseException {
359 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
362 THREAD_MASK = threads - 1;
365 cache = new QueryCache(core, threads);
366 session = querySupport.getSession();
367 resourceSupport = querySupport.getSupport();
368 querySupportLock = core.getLock();
370 executors = new QueryThread[THREADS];
371 // queues = new ArrayList[THREADS];
372 // threadLocks = new ReentrantLock[THREADS];
373 // threadConditions = new Condition[THREADS];
374 threadStates = new ThreadState[THREADS];
375 // ownTasks = new ArrayList[THREADS];
376 // ownSyncTasks = new ArrayList[THREADS];
377 // delayQueues = new ArrayList[THREADS * THREADS];
379 // freeSchedule = new AtomicInteger(0);
381 // for (int i = 0; i < THREADS * THREADS; i++) {
382 // delayQueues[i] = new ArrayList<SessionTask>();
385 for (int i = 0; i < THREADS; i++) {
387 // tasks[i] = new ArrayList<Runnable>();
388 // ownTasks[i] = new ArrayList<SessionTask>();
389 // ownSyncTasks[i] = new ArrayList<SessionTask>();
390 // queues[i] = new ArrayList<SessionTask>();
391 // threadLocks[i] = new ReentrantLock();
392 // threadConditions[i] = threadLocks[i].newCondition();
393 // limits[i] = false;
394 threadStates[i] = ThreadState.INIT;
398 for (int i = 0; i < THREADS; i++) {
402 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
404 threadSet.add(executors[i]);
409 for (int i = 0; i < THREADS; i++) {
410 executors[i].start();
413 // Make sure that query threads are up and running
414 while(sleepers.get() != THREADS) {
417 } catch (InterruptedException e) {
422 rootLibrary = core.getBuiltin("http:/");
423 boolean builtinsInstalled = rootLibrary != 0;
425 if (builtinsInstalled) {
426 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
427 assert (functionalRelation != 0);
429 functionalRelation = 0;
431 if (builtinsInstalled) {
432 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
433 assert (instanceOf != 0);
437 if (builtinsInstalled) {
438 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
439 assert (inverseOf != 0);
444 if (builtinsInstalled) {
445 inherits = core.getBuiltin(Layer0.URIs.Inherits);
446 assert (inherits != 0);
450 if (builtinsInstalled) {
451 asserts = core.getBuiltin(Layer0.URIs.Asserts);
452 assert (asserts != 0);
456 if (builtinsInstalled) {
457 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
458 assert (hasPredicate != 0);
462 if (builtinsInstalled) {
463 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
464 assert (hasPredicateInverse != 0);
466 hasPredicateInverse = 0;
468 if (builtinsInstalled) {
469 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
470 assert (hasObject != 0);
474 if (builtinsInstalled) {
475 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
476 assert (subrelationOf != 0);
480 if (builtinsInstalled) {
481 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
482 assert (superrelationOf != 0);
486 if (builtinsInstalled) {
487 library = core.getBuiltin(Layer0.URIs.Library);
488 assert (library != 0);
492 if (builtinsInstalled) {
493 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
494 assert (consistsOf != 0);
498 if (builtinsInstalled) {
499 hasName = core.getBuiltin(Layer0.URIs.HasName);
500 assert (hasName != 0);
506 final public void releaseWrite(ReadGraphImpl graph) {
507 propagateChangesInQueryCache(graph);
508 modificationCounter++;
511 final public int getId(final Resource r) {
512 return querySupport.getId(r);
515 public QuerySupport getCore() {
519 public int getFunctionalRelation() {
520 return functionalRelation;
523 public int getInherits() {
527 public int getInstanceOf() {
531 public int getInverseOf() {
535 public int getSubrelationOf() {
536 return subrelationOf;
539 public int getSuperrelationOf() {
540 return superrelationOf;
543 public int getAsserts() {
547 public int getHasPredicate() {
551 public int getHasPredicateInverse() {
552 return hasPredicateInverse;
555 public int getHasObject() {
559 public int getRootLibrary() {
563 public Resource getRootLibraryResource() {
564 if (rootLibraryResource == null) {
565 // Synchronization is not needed here, it doesn't matter if multiple
566 // threads simultaneously set rootLibraryResource once.
567 int root = getRootLibrary();
569 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
570 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
572 return rootLibraryResource;
575 public int getLibrary() {
579 public int getConsistsOf() {
583 public int getHasName() {
587 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
591 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
594 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
596 if (result != null && result != 0) {
597 procedure.execute(graph, result);
601 // Fall back to using the fixed builtins.
602 // result = querySupport.getBuiltin(id);
603 // if (result != 0) {
604 // procedure.execute(graph, result);
609 // result = querySupport.getRandomAccessReference(id);
610 // } catch (ResourceNotFoundException e) {
611 // procedure.exception(graph, e);
616 procedure.execute(graph, result);
618 procedure.exception(graph, new ResourceNotFoundException(id));
624 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
625 procedure.exception(graph, t);
629 } catch (DatabaseException e) {
633 procedure.exception(graph, e);
635 } catch (DatabaseException e1) {
637 Logger.defaultLogError(e1);
645 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
647 Integer result = querySupport.getBuiltin(id);
649 procedure.execute(graph, result);
651 procedure.exception(graph, new ResourceNotFoundException(id));
656 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
659 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
660 } catch (DatabaseException e) {
661 throw new IllegalStateException(e);
666 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
670 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
671 } catch (DatabaseException e) {
672 throw new IllegalStateException(e);
677 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
678 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
682 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
684 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
688 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
690 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
694 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
696 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
700 boolean isBound(ExternalReadEntry<?> entry) {
701 if(entry.hasParents()) return true;
702 else if(listening.hasListener(entry)) return true;
706 static class Dummy implements InternalProcedure<Object>, IntProcedure {
709 public void execute(ReadGraphImpl graph, int i) {
713 public void finished(ReadGraphImpl graph) {
717 public void execute(ReadGraphImpl graph, Object result) {
721 public void exception(ReadGraphImpl graph, Throwable throwable) {
726 private static final Dummy dummy = new Dummy();
729 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
731 if (DebugPolicy.PERFORM)
732 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
735 assert (!collecting);
737 assert(query.assertNotDiscarded());
739 registerDependencies(graph, query, parent, listener, procedure, false);
741 // FRESH, REFUTED, EXCEPTED go here
742 if (!query.isReady()) {
747 query.computeForEach(graph, this, (Procedure)dummy, true);
748 return query.get(graph, this, null);
754 return query.get(graph, this, procedure);
762 interface QueryCollectorSupport {
763 public CacheCollectionResult allCaches();
764 public Collection<CacheEntry> getRootList();
765 public int getCurrentSize();
766 public int calculateCurrentSize();
767 public CacheEntryBase iterate(int level);
768 public void remove();
769 public void setLevel(CacheEntryBase entry, int level);
770 public boolean start(boolean flush);
773 interface QueryCollector {
775 public void collect(int youngTarget, int allowedTimeInMs);
779 class QueryCollectorSupportImpl implements QueryCollectorSupport {
781 private static final boolean DEBUG = false;
782 private static final double ITERATION_RATIO = 0.2;
784 private CacheCollectionResult iteration = new CacheCollectionResult();
785 private boolean fresh = true;
786 private boolean needDataInStart = true;
788 QueryCollectorSupportImpl() {
792 public CacheCollectionResult allCaches() {
793 CacheCollectionResult result = new CacheCollectionResult();
794 QueryProcessor.this.allCaches(result);
799 public boolean start(boolean flush) {
800 // We need new data from query maps
802 if(needDataInStart || flush) {
803 // Last run ended after processing all queries => refresh data
804 restart(flush ? 0.0 : ITERATION_RATIO);
806 // continue with previous big data
808 // Notify caller about iteration situation
809 return iteration.isAtStart();
812 private void restart(double targetRatio) {
814 needDataInStart = true;
816 long start = System.nanoTime();
819 // We need new data from query maps
821 int iterationSize = iteration.size()+1;
822 int diff = calculateCurrentSize()-iterationSize;
824 double ratio = (double)diff / (double)iterationSize;
825 boolean dirty = Math.abs(ratio) >= targetRatio;
828 iteration = allCaches();
830 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
831 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
832 System.err.print(" " + iteration.levels[i].size());
833 System.err.println("");
840 needDataInStart = false;
842 // We are returning here within the same GC round - reuse the cache table
851 public CacheEntryBase iterate(int level) {
853 CacheEntryBase entry = iteration.next(level);
855 restart(ITERATION_RATIO);
859 while(entry != null && entry.isDiscarded()) {
860 entry = iteration.next(level);
868 public void remove() {
873 public void setLevel(CacheEntryBase entry, int level) {
874 iteration.setLevel(entry, level);
877 public Collection<CacheEntry> getRootList() {
878 return cache.getRootList();
882 public int calculateCurrentSize() {
883 return cache.calculateCurrentSize();
887 public int getCurrentSize() {
892 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
894 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
895 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
897 public int querySize() {
901 public void gc(int youngTarget, int allowedTimeInMs) {
903 collector.collect(youngTarget, allowedTimeInMs);
908 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
910 if(entry.isDiscarded()) return;
911 if(workarea.containsKey(entry)) return;
913 Iterable<CacheEntry> parents = entry.getParents(this);
914 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
915 for(CacheEntry e : parents) {
916 if(e.isDiscarded()) continue;
918 processParentReport(e, workarea);
920 workarea.put(entry, ps);
924 public synchronized String reportQueryActivity(File file) throws IOException {
926 System.err.println("reportQueries " + file.getAbsolutePath());
931 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
933 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
934 Collections.reverse(entries);
936 for(Pair<String,Integer> entry : entries) {
937 b.println(entry.first + ": " + entry.second);
942 Development.histogram.clear();
948 public synchronized String reportQueries(File file) throws IOException {
950 System.err.println("reportQueries " + file.getAbsolutePath());
955 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
957 long start = System.nanoTime();
959 // ArrayList<CacheEntry> all = ;
961 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
962 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
963 for(CacheEntryBase entry : caches) {
964 processParentReport(entry, workarea);
967 // for(CacheEntry e : all) System.err.println("entry: " + e);
969 long duration = System.nanoTime() - start;
970 System.err.println("Query root set in " + 1e-9*duration + "s.");
972 start = System.nanoTime();
974 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
978 for(CacheEntry entry : workarea.keySet()) {
979 boolean listener = listening.hasListenerAfterDisposing(entry);
980 boolean hasParents = entry.getParents(this).iterator().hasNext();
983 flagMap.put(entry, 0);
984 } else if (!hasParents) {
986 flagMap.put(entry, 1);
989 flagMap.put(entry, 2);
1002 long start2 = System.nanoTime();
1004 int boundCounter = 0;
1005 int unboundCounter = 0;
1006 int unknownCounter = 0;
1008 for(CacheEntry<?> entry : workarea.keySet()) {
1010 //System.err.println("process " + entry);
1012 int flags = flagMap.get(entry);
1013 int bindStatus = flags & 3;
1015 if(bindStatus == 0) boundCounter++;
1016 else if(bindStatus == 1) unboundCounter++;
1017 else if(bindStatus == 2) unknownCounter++;
1019 if(bindStatus < 2) continue;
1022 for(CacheEntry parent : entry.getParents(this)) {
1024 if(parent.isDiscarded()) flagMap.put(parent, 1);
1026 int flags2 = flagMap.get(parent);
1027 int bindStatus2 = flags2 & 3;
1028 // Parent is bound => child is bound
1029 if(bindStatus2 == 0) {
1033 // Parent is unknown => child is unknown
1034 else if (bindStatus2 == 2) {
1041 flagMap.put(entry, newStatus);
1045 duration = System.nanoTime() - start2;
1046 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1047 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1049 } while(!done && loops++ < 20);
1053 for(CacheEntry entry : workarea.keySet()) {
1055 int bindStatus = flagMap.get(entry);
1056 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1062 duration = System.nanoTime() - start;
1063 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1065 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1067 for(CacheEntry entry : workarea.keySet()) {
1068 Class<?> clazz = entry.getClass();
1069 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).id.getClass();
1070 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).id.getClass();
1071 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).id.getClass();
1072 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).id.getClass();
1073 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).id.getClass();
1074 Integer c = counts.get(clazz);
1075 if(c == null) counts.put(clazz, -1);
1076 else counts.put(clazz, c-1);
1079 b.print("// Simantics DB client query report file\n");
1080 b.print("// This file contains the following information\n");
1081 b.print("// -The amount of cached query instances per query class\n");
1082 b.print("// -The sizes of retained child sets\n");
1083 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1084 b.print("// -Followed by status, where\n");
1085 b.print("// -0=bound\n");
1086 b.print("// -1=free\n");
1087 b.print("// -2=unknown\n");
1088 b.print("// -L=has listener\n");
1089 b.print("// -List of children for each query (search for 'C <query name>')\n");
1091 b.print("----------------------------------------\n");
1093 b.print("// Queries by class\n");
1094 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1095 b.print(-p.second + " " + p.first.getName() + "\n");
1098 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1099 for(CacheEntry e : workarea.keySet())
1102 boolean changed = true;
1104 while(changed && iter++<50) {
1108 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1109 for(CacheEntry e : workarea.keySet())
1112 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1113 Integer c = hist.get(e.getKey());
1114 for(CacheEntry p : e.getValue()) {
1115 Integer i = newHist.get(p);
1116 newHist.put(p, i+c);
1119 for(CacheEntry e : workarea.keySet()) {
1120 Integer value = newHist.get(e);
1121 Integer old = hist.get(e);
1122 if(!value.equals(old)) {
1124 // System.err.println("hist " + e + ": " + old + " => " + value);
1129 System.err.println("Retained set iteration " + iter);
1133 b.print("// Queries by retained set\n");
1134 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1135 b.print("" + -p.second + " " + p.first + "\n");
1138 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1140 b.print("// Entry parent listing\n");
1141 for(CacheEntry entry : workarea.keySet()) {
1142 int status = flagMap.get(entry);
1143 boolean hasListener = listening.hasListenerAfterDisposing(entry);
1144 b.print("Q " + entry.toString());
1146 b.print(" (L" + status + ")");
1149 b.print(" (" + status + ")");
1152 for(CacheEntry parent : workarea.get(entry)) {
1153 Collection<CacheEntry> inv = inverse.get(parent);
1155 inv = new ArrayList<CacheEntry>();
1156 inverse.put(parent, inv);
1159 b.print(" " + parent.toString());
1164 b.print("// Entry child listing\n");
1165 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1166 b.print("C " + entry.getKey().toString());
1168 for(CacheEntry child : entry.getValue()) {
1169 Integer h = hist.get(child);
1173 b.print(" <no children>");
1175 b.print(" " + child.toString());
1180 b.print("#queries: " + workarea.keySet().size() + "\n");
1181 b.print("#listeners: " + listeners + "\n");
1185 return "Dumped " + workarea.keySet().size() + " queries.";
1189 boolean removeQuery(CacheEntry entry) {
1191 // This entry has been removed before. No need to do anything here.
1192 if(entry.isDiscarded()) return false;
1194 assert (!entry.isDiscarded());
1196 Query query = entry.getQuery();
1198 query.removeEntry(this);
1203 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1214 * @return true if this entry is being listened
1216 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1220 CacheEntry entry = e.entry;
1223 * If the dependency graph forms a DAG, some entries are inserted in the
1224 * todo list many times. They only need to be processed once though.
1226 if (entry.isDiscarded()) {
1227 if (Development.DEVELOPMENT) {
1228 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1229 System.err.print("D");
1230 for (int i = 0; i < e.indent; i++)
1231 System.err.print(" ");
1232 System.err.println(entry.getQuery());
1235 // System.err.println(" => DISCARDED");
1239 // if (entry.isRefuted()) {
1240 // if (Development.DEVELOPMENT) {
1241 // if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1242 // System.err.print("R");
1243 // for (int i = 0; i < e.indent; i++)
1244 // System.err.print(" ");
1245 // System.err.println(entry.getQuery());
1251 if (entry.isExcepted()) {
1252 if (Development.DEVELOPMENT) {
1253 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1254 System.err.print("E");
1259 if (entry.isPending()) {
1260 if (Development.DEVELOPMENT) {
1261 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1262 System.err.print("P");
1269 if (Development.DEVELOPMENT) {
1270 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1271 System.err.print("U ");
1272 for (int i = 0; i < e.indent; i++)
1273 System.err.print(" ");
1274 System.err.print(entry.getQuery());
1278 Query query = entry.getQuery();
1279 int type = query.type();
1281 boolean hasListener = listening.hasListener(entry);
1283 if (Development.DEVELOPMENT) {
1284 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1285 if(listening.hasListener(entry)) {
1286 System.err.println(" (L)");
1288 System.err.println("");
1293 if(entry.isPending() || entry.isExcepted()) {
1296 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1298 immediates.put(entry, entry);
1313 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1315 immediates.put(entry, entry);
1329 // System.err.println(" => FOO " + type);
1332 ArrayList<ListenerEntry> entries = listening.listeners.get(entry);
1333 if(entries != null) {
1334 for (ListenerEntry le : entries) {
1335 listening.scheduleListener(le);
1340 // If invalid, update parents
1341 if (type == RequestFlags.INVALIDATE) {
1342 listening.updateParents(e.indent, entry, todo);
1350 * @param av1 an array (guaranteed)
1351 * @param av2 any object
1352 * @return <code>true</code> if the two arrays are equal
1354 private final boolean arrayEquals(Object av1, Object av2) {
1357 Class<?> c1 = av1.getClass().getComponentType();
1358 Class<?> c2 = av2.getClass().getComponentType();
1359 if (c2 == null || !c1.equals(c2))
1361 boolean p1 = c1.isPrimitive();
1362 boolean p2 = c2.isPrimitive();
1366 return Arrays.equals((Object[]) av1, (Object[]) av2);
1367 if (boolean.class.equals(c1))
1368 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1369 else if (byte.class.equals(c1))
1370 return Arrays.equals((byte[]) av1, (byte[]) av2);
1371 else if (int.class.equals(c1))
1372 return Arrays.equals((int[]) av1, (int[]) av2);
1373 else if (long.class.equals(c1))
1374 return Arrays.equals((long[]) av1, (long[]) av2);
1375 else if (float.class.equals(c1))
1376 return Arrays.equals((float[]) av1, (float[]) av2);
1377 else if (double.class.equals(c1))
1378 return Arrays.equals((double[]) av1, (double[]) av2);
1379 throw new RuntimeException("??? Contact application querySupport.");
1384 final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1388 Query query = entry.getQuery();
1390 if (Development.DEVELOPMENT) {
1391 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_RECOMPUTE, Bindings.BOOLEAN)) {
1392 System.err.println("R " + query);
1396 entry.prepareRecompute(querySupport);
1398 ReadGraphImpl parentGraph = graph.forRecompute(entry);
1400 query.recompute(parentGraph);
1402 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1404 Object newValue = entry.getResult();
1406 if (ListenerEntry.NO_VALUE == oldValue) {
1407 if (Development.DEVELOPMENT) {
1408 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1409 System.out.println("C " + query);
1410 System.out.println("- " + oldValue);
1411 System.out.println("- " + newValue);
1417 boolean changed = false;
1419 if (newValue != null) {
1420 if (newValue.getClass().isArray()) {
1421 changed = !arrayEquals(newValue, oldValue);
1423 changed = !newValue.equals(oldValue);
1426 changed = (oldValue != null);
1428 if (Development.DEVELOPMENT) {
1429 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1430 System.err.println("C " + query);
1431 System.err.println("- " + oldValue);
1432 System.err.println("- " + newValue);
1436 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1438 } catch (Throwable t) {
1440 Logger.defaultLogError(t);
1442 return ListenerEntry.NO_VALUE;
1451 * @return true if this entry still has listeners
1453 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1455 assert (!cache.collecting);
1459 boolean hadListeners = false;
1460 boolean listenersUnknown = false;
1464 assert(entry != null);
1465 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1466 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1467 todo.add(new UpdateEntry(null, entry, 0));
1471 // Walk the tree and collect immediate updates
1472 while (!todo.isEmpty()) {
1473 UpdateEntry e = todo.pop();
1474 hadListeners |= updateQuery(e, todo, immediates);
1477 if(immediates.isEmpty()) break;
1479 // Evaluate all immediate updates and collect parents to update
1480 for(CacheEntry immediate : immediates.values()) {
1482 if(immediate.isDiscarded()) {
1486 if(immediate.isExcepted()) {
1488 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1489 if (newValue != ListenerEntry.NOT_CHANGED)
1490 listening.updateParents(0, immediate, todo);
1494 Object oldValue = immediate.getResult();
1495 Object newValue = compareTo(graph, immediate, oldValue);
1497 if (newValue != ListenerEntry.NOT_CHANGED) {
1498 listening.updateParents(0, immediate, todo);
1500 // If not changed, keep the old value
1501 immediate.setResult(oldValue);
1502 immediate.setReady();
1503 listenersUnknown = true;
1513 } catch (Throwable t) {
1514 Logger.defaultLogError(t);
1520 return hadListeners | listenersUnknown;
1524 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1525 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1526 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1527 // Maybe use a mutex from util.concurrent?
1528 private Object primitiveUpdateLock = new Object();
1529 private THashSet scheduledPrimitiveUpdates = new THashSet();
1531 private ArrayList<CacheEntry> refutations = new ArrayList<>();
1533 private void markForUpdate(ReadGraphImpl graph, CacheEntry e) {
1538 private void updateRefutations(ReadGraphImpl graph) {
1540 for(CacheEntry e : refutations)
1543 refutations.clear();
1547 public void propagateChangesInQueryCache(final ReadGraphImpl graph) {
1549 // Make sure that listening has performed its work
1552 cache.dirty = false;
1555 if (Development.DEVELOPMENT) {
1556 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1557 System.err.println("== Query update ==");
1561 // Special case - one statement
1562 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1564 long arg0 = scheduledObjectUpdates.getFirst();
1566 final int subject = (int)(arg0 >>> 32);
1567 final int predicate = (int)(arg0 & 0xffffffff);
1569 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1570 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1571 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1573 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1574 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1575 if(principalTypes != null) markForUpdate(graph, principalTypes);
1576 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1577 if(types != null) markForUpdate(graph, types);
1580 if(predicate == subrelationOf) {
1581 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1582 if(superRelations != null) markForUpdate(graph, superRelations);
1585 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1586 if(dp != null) markForUpdate(graph, dp);
1587 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1588 if(os != null) markForUpdate(graph, os);
1590 updateRefutations(graph);
1592 scheduledObjectUpdates.clear();
1594 if (Development.DEVELOPMENT) {
1595 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1596 System.err.println("== Query update ends ==");
1604 // Special case - one value
1605 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1607 int arg0 = scheduledValueUpdates.getFirst();
1609 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1610 if(valueQuery != null) markForUpdate(graph, valueQuery);
1612 updateRefutations(graph);
1614 scheduledValueUpdates.clear();
1616 if (Development.DEVELOPMENT) {
1617 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1618 System.err.println("== Query update ends ==");
1626 final TIntHashSet predicates = new TIntHashSet();
1627 final TIntHashSet orderedSets = new TIntHashSet();
1629 THashSet primitiveUpdates;
1630 synchronized (primitiveUpdateLock) {
1631 primitiveUpdates = scheduledPrimitiveUpdates;
1632 scheduledPrimitiveUpdates = new THashSet();
1635 scheduledValueUpdates.forEach(new TIntProcedure() {
1638 public boolean execute(int arg0) {
1639 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1640 if(valueQuery != null) markForUpdate(graph, valueQuery);
1646 scheduledInvalidates.forEach(new TIntProcedure() {
1649 public boolean execute(int resource) {
1651 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1652 if(valueQuery != null) markForUpdate(graph, valueQuery);
1654 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1655 if(principalTypes != null) markForUpdate(graph, principalTypes);
1656 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1657 if(types != null) markForUpdate(graph, types);
1659 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1660 if(superRelations != null) markForUpdate(graph, superRelations);
1662 predicates.add(resource);
1669 scheduledObjectUpdates.forEach(new TLongProcedure() {
1672 public boolean execute(long arg0) {
1674 final int subject = (int)(arg0 >>> 32);
1675 final int predicate = (int)(arg0 & 0xffffffff);
1677 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1678 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1679 if(principalTypes != null) markForUpdate(graph, principalTypes);
1680 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1681 if(types != null) markForUpdate(graph, types);
1684 if(predicate == subrelationOf) {
1685 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1686 if(superRelations != null) markForUpdate(graph, superRelations);
1689 predicates.add(subject);
1690 orderedSets.add(predicate);
1698 predicates.forEach(new TIntProcedure() {
1701 public boolean execute(final int subject) {
1703 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1704 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1705 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1707 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1708 if(entry != null) markForUpdate(graph, entry);
1716 orderedSets.forEach(new TIntProcedure() {
1719 public boolean execute(int orderedSet) {
1721 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1722 if(entry != null) markForUpdate(graph, entry);
1730 updateRefutations(graph);
1732 primitiveUpdates.forEach(new TObjectProcedure() {
1735 public boolean execute(Object arg0) {
1737 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1738 if (query != null) {
1739 boolean listening = update(graph, query);
1740 if (!listening && !query.hasParents()) {
1741 cache.externalReadEntryMap.remove(arg0);
1750 scheduledValueUpdates.clear();
1751 scheduledObjectUpdates.clear();
1752 scheduledInvalidates.clear();
1754 if (Development.DEVELOPMENT) {
1755 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1756 System.err.println("== Query update ends ==");
1762 public void updateValue(final int resource) {
1763 scheduledValueUpdates.add(resource);
1767 public void updateStatements(final int resource, final int predicate) {
1768 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
1772 private int lastInvalidate = 0;
1774 public void invalidateResource(final int resource) {
1775 if(lastInvalidate == resource) return;
1776 scheduledValueUpdates.add(resource);
1777 lastInvalidate = resource;
1781 public void updatePrimitive(final ExternalRead primitive) {
1783 // External reads may be updated from arbitrary threads.
1784 // Synchronize to prevent race-conditions.
1785 synchronized (primitiveUpdateLock) {
1786 scheduledPrimitiveUpdates.add(primitive);
1788 querySupport.dirtyPrimitives();
1793 public synchronized String toString() {
1794 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
1798 protected void doDispose() {
1800 requests.release(Integer.MAX_VALUE / 2);
1802 for(int index = 0; index < THREADS; index++) {
1803 executors[index].dispose();
1807 for(int i=0;i<100;i++) {
1809 boolean alive = false;
1810 for(int index = 0; index < THREADS; index++) {
1811 alive |= executors[index].isAlive();
1816 } catch (InterruptedException e) {
1817 Logger.defaultLogError(e);
1822 // Then start interrupting
1823 for(int i=0;i<100;i++) {
1825 boolean alive = false;
1826 for(int index = 0; index < THREADS; index++) {
1827 alive |= executors[index].isAlive();
1830 for(int index = 0; index < THREADS; index++) {
1831 executors[index].interrupt();
1835 // // Then just destroy
1836 // for(int index = 0; index < THREADS; index++) {
1837 // executors[index].destroy();
1840 for(int index = 0; index < THREADS; index++) {
1842 executors[index].join(5000);
1843 } catch (InterruptedException e) {
1844 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
1846 executors[index] = null;
1851 public int getHits() {
1855 public int getMisses() {
1856 return cache.misses;
1859 public int getSize() {
1863 public Set<Long> getReferencedClusters() {
1864 HashSet<Long> result = new HashSet<Long>();
1865 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
1866 Objects query = (Objects) entry.getQuery();
1867 result.add(querySupport.getClusterId(query.r1()));
1869 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
1870 DirectPredicates query = (DirectPredicates) entry.getQuery();
1871 result.add(querySupport.getClusterId(query.id));
1873 for (CacheEntry entry : cache.valueQueryMap.values()) {
1874 ValueQuery query = (ValueQuery) entry.getQuery();
1875 result.add(querySupport.getClusterId(query.id));
1880 public void assertDone() {
1883 CacheCollectionResult allCaches(CacheCollectionResult result) {
1885 return cache.allCaches(result);
1889 public void printDiagnostics() {
1892 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
1893 querySupport.requestCluster(graph, clusterId, runnable);
1896 public int clean() {
1897 collector.collect(0, Integer.MAX_VALUE);
1901 public void clean(final Collection<ExternalRead<?>> requests) {
1902 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
1903 Iterator<ExternalRead<?>> iterator = requests.iterator();
1905 public CacheCollectionResult allCaches() {
1906 throw new UnsupportedOperationException();
1909 public CacheEntryBase iterate(int level) {
1910 if(iterator.hasNext()) {
1911 ExternalRead<?> request = iterator.next();
1912 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1913 if (entry != null) return entry;
1914 else return iterate(level);
1916 iterator = requests.iterator();
1921 public void remove() {
1922 throw new UnsupportedOperationException();
1925 public void setLevel(CacheEntryBase entry, int level) {
1926 throw new UnsupportedOperationException();
1929 public Collection<CacheEntry> getRootList() {
1930 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
1931 for (ExternalRead<?> request : requests) {
1932 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1939 public int getCurrentSize() {
1943 public int calculateCurrentSize() {
1944 // This tells the collector to attempt collecting everything.
1945 return Integer.MAX_VALUE;
1948 public boolean start(boolean flush) {
1952 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
1955 public void scanPending() {
1957 cache.scanPending();
1961 public ReadGraphImpl graphForVirtualRequest() {
1962 return ReadGraphImpl.createAsync(this);
1966 private HashMap<Resource, Class<?>> builtinValues;
1968 public Class<?> getBuiltinValue(Resource r) {
1969 if(builtinValues == null) initBuiltinValues();
1970 return builtinValues.get(r);
1973 Exception callerException = null;
1975 public interface AsyncBarrier {
1978 // public void inc(String debug);
1979 // public void dec(String debug);
1982 // final public QueryProcessor processor;
1983 // final public QuerySupport support;
1985 // boolean disposed = false;
1987 private void initBuiltinValues() {
1989 Layer0 b = getSession().peekService(Layer0.class);
1990 if(b == null) return;
1992 builtinValues = new HashMap<Resource, Class<?>>();
1994 builtinValues.put(b.String, String.class);
1995 builtinValues.put(b.Double, Double.class);
1996 builtinValues.put(b.Float, Float.class);
1997 builtinValues.put(b.Long, Long.class);
1998 builtinValues.put(b.Integer, Integer.class);
1999 builtinValues.put(b.Byte, Byte.class);
2000 builtinValues.put(b.Boolean, Boolean.class);
2002 builtinValues.put(b.StringArray, String[].class);
2003 builtinValues.put(b.DoubleArray, double[].class);
2004 builtinValues.put(b.FloatArray, float[].class);
2005 builtinValues.put(b.LongArray, long[].class);
2006 builtinValues.put(b.IntegerArray, int[].class);
2007 builtinValues.put(b.ByteArray, byte[].class);
2008 builtinValues.put(b.BooleanArray, boolean[].class);
2012 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
2014 // if (null == provider2) {
2015 // this.processor = null;
2019 // this.processor = provider2;
2020 // support = provider2.getCore();
2021 // initBuiltinValues();
2025 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2026 // return new ReadGraphSupportImpl(impl.processor);
2030 final public Session getSession() {
2034 final public ResourceSupport getResourceSupport() {
2035 return resourceSupport;
2039 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2043 for(Resource predicate : getPredicates(impl, subject))
2044 procedure.execute(impl, predicate);
2046 procedure.finished(impl);
2048 } catch (Throwable e) {
2049 procedure.exception(impl, e);
2055 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2057 throw new UnsupportedOperationException();
2059 // assert(subject != null);
2060 // assert(procedure != null);
2062 // final ListenerBase listener = getListenerBase(procedure);
2065 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2068 // public void execute(ReadGraphImpl graph, int i) {
2070 // procedure.execute(querySupport.getResource(i));
2071 // } catch (Throwable t2) {
2072 // Logger.defaultLogError(t2);
2077 // public void finished(ReadGraphImpl graph) {
2079 // procedure.finished();
2080 // } catch (Throwable t2) {
2081 // Logger.defaultLogError(t2);
2083 //// impl.state.barrier.dec();
2087 // public void exception(ReadGraphImpl graph, Throwable t) {
2089 // procedure.exception(t);
2090 // } catch (Throwable t2) {
2091 // Logger.defaultLogError(t2);
2093 //// impl.state.barrier.dec();
2097 // } catch (DatabaseException e) {
2098 // Logger.defaultLogError(e);
2104 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2105 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2109 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2110 final Resource predicate, final MultiProcedure<Statement> procedure) {
2112 assert(subject != null);
2113 assert(predicate != null);
2114 assert(procedure != null);
2116 final ListenerBase listener = getListenerBase(procedure);
2118 // impl.state.barrier.inc();
2121 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2124 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2126 procedure.execute(querySupport.getStatement(s, p, o));
2127 } catch (Throwable t2) {
2128 Logger.defaultLogError(t2);
2133 public void finished(ReadGraphImpl graph) {
2135 procedure.finished();
2136 } catch (Throwable t2) {
2137 Logger.defaultLogError(t2);
2139 // impl.state.barrier.dec();
2143 public void exception(ReadGraphImpl graph, Throwable t) {
2145 procedure.exception(t);
2146 } catch (Throwable t2) {
2147 Logger.defaultLogError(t2);
2149 // impl.state.barrier.dec();
2153 } catch (DatabaseException e) {
2154 Logger.defaultLogError(e);
2160 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2161 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2163 assert(subject != null);
2164 assert(predicate != null);
2165 assert(procedure != null);
2167 final ListenerBase listener = getListenerBase(procedure);
2169 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2171 boolean first = true;
2174 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2177 procedure.execute(graph, querySupport.getStatement(s, p, o));
2179 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2181 } catch (Throwable t2) {
2182 Logger.defaultLogError(t2);
2187 public void finished(ReadGraphImpl graph) {
2192 procedure.finished(graph);
2193 // impl.state.barrier.dec(this);
2195 procedure.finished(impl.newRestart(graph));
2197 } catch (Throwable t2) {
2198 Logger.defaultLogError(t2);
2204 public void exception(ReadGraphImpl graph, Throwable t) {
2209 procedure.exception(graph, t);
2210 // impl.state.barrier.dec(this);
2212 procedure.exception(impl.newRestart(graph), t);
2214 } catch (Throwable t2) {
2215 Logger.defaultLogError(t2);
2222 int sId = querySupport.getId(subject);
2223 int pId = querySupport.getId(predicate);
2225 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2226 // else impl.state.barrier.inc(null, null);
2229 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2230 } catch (DatabaseException e) {
2231 Logger.defaultLogError(e);
2237 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2238 final Resource predicate, final StatementProcedure procedure) {
2240 assert(subject != null);
2241 assert(predicate != null);
2242 assert(procedure != null);
2244 final ListenerBase listener = getListenerBase(procedure);
2246 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2248 boolean first = true;
2251 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2254 procedure.execute(graph, s, p, o);
2256 procedure.execute(impl.newRestart(graph), s, p, o);
2258 } catch (Throwable t2) {
2259 Logger.defaultLogError(t2);
2264 public void finished(ReadGraphImpl graph) {
2269 procedure.finished(graph);
2270 // impl.state.barrier.dec(this);
2272 procedure.finished(impl.newRestart(graph));
2274 } catch (Throwable t2) {
2275 Logger.defaultLogError(t2);
2281 public void exception(ReadGraphImpl graph, Throwable t) {
2286 procedure.exception(graph, t);
2287 // impl.state.barrier.dec(this);
2289 procedure.exception(impl.newRestart(graph), t);
2291 } catch (Throwable t2) {
2292 Logger.defaultLogError(t2);
2299 int sId = querySupport.getId(subject);
2300 int pId = querySupport.getId(predicate);
2302 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2303 // else impl.state.barrier.inc(null, null);
2306 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2307 } catch (DatabaseException e) {
2308 Logger.defaultLogError(e);
2314 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2316 assert(subject != null);
2317 assert(predicate != null);
2318 assert(procedure != null);
2320 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2322 private Set<Statement> current = null;
2323 private Set<Statement> run = new HashSet<Statement>();
2326 public void execute(AsyncReadGraph graph, Statement result) {
2328 boolean found = false;
2330 if(current != null) {
2332 found = current.remove(result);
2336 if(!found) procedure.add(graph, result);
2343 public void finished(AsyncReadGraph graph) {
2345 if(current != null) {
2346 for(Statement r : current) procedure.remove(graph, r);
2351 run = new HashSet<Statement>();
2356 public void exception(AsyncReadGraph graph, Throwable t) {
2357 procedure.exception(graph, t);
2361 public boolean isDisposed() {
2362 return procedure.isDisposed();
2370 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2371 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2373 assert(subject != null);
2374 assert(predicate != null);
2375 assert(procedure != null);
2377 final ListenerBase listener = getListenerBase(procedure);
2379 // impl.state.barrier.inc();
2382 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2385 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2387 procedure.execute(graph, querySupport.getStatement(s, p, o));
2388 } catch (Throwable t2) {
2389 Logger.defaultLogError(t2);
2394 public void finished(ReadGraphImpl graph) {
2396 procedure.finished(graph);
2397 } catch (Throwable t2) {
2398 Logger.defaultLogError(t2);
2400 // impl.state.barrier.dec();
2404 public void exception(ReadGraphImpl graph, Throwable t) {
2406 procedure.exception(graph, t);
2407 } catch (Throwable t2) {
2408 Logger.defaultLogError(t2);
2410 // impl.state.barrier.dec();
2414 } catch (DatabaseException e) {
2415 Logger.defaultLogError(e);
2420 private static ListenerBase getListenerBase(Object procedure) {
2421 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2426 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2428 assert(subject != null);
2429 assert(predicate != null);
2430 assert(procedure != null);
2432 final ListenerBase listener = getListenerBase(procedure);
2434 // impl.state.barrier.inc();
2437 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2440 public void execute(ReadGraphImpl graph, int i) {
2442 procedure.execute(querySupport.getResource(i));
2443 } catch (Throwable t2) {
2444 Logger.defaultLogError(t2);
2449 public void finished(ReadGraphImpl graph) {
2451 procedure.finished();
2452 } catch (Throwable t2) {
2453 Logger.defaultLogError(t2);
2455 // impl.state.barrier.dec();
2459 public void exception(ReadGraphImpl graph, Throwable t) {
2460 System.out.println("forEachObject exception " + t);
2462 procedure.exception(t);
2463 } catch (Throwable t2) {
2464 Logger.defaultLogError(t2);
2466 // impl.state.barrier.dec();
2470 } catch (DatabaseException e) {
2471 Logger.defaultLogError(e);
2477 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
2479 assert(subject != null);
2480 assert(procedure != null);
2482 final ListenerBase listener = getListenerBase(procedure);
2484 int sId = querySupport.getId(subject);
2487 QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
2490 public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
2491 procedure.execute(graph, result);
2495 public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
2496 procedure.exception(graph, throwable);
2500 } catch (DatabaseException e) {
2501 Logger.defaultLogError(e);
2506 final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
2508 // assert(subject != null);
2509 // assert(procedure != null);
2511 // final ListenerBase listener = getListenerBase(procedure);
2513 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2515 return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
2520 // final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2522 // assert(subject != null);
2523 // assert(procedure != null);
2525 // final ListenerBase listener = getListenerBase(procedure);
2527 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2531 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2534 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2536 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2538 private Resource single = null;
2541 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2542 if(single == null) {
2545 single = INVALID_RESOURCE;
2550 public synchronized void finished(AsyncReadGraph graph) {
2551 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2552 else procedure.execute(graph, single);
2556 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2557 procedure.exception(graph, throwable);
2564 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2566 final int sId = querySupport.getId(subject);
2567 final int pId = querySupport.getId(predicate);
2570 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2571 } catch (DatabaseException e) {
2572 Logger.defaultLogError(e);
2577 static class Runner2Procedure implements IntProcedure {
2579 public int single = 0;
2580 public Throwable t = null;
2582 public void clear() {
2588 public void execute(ReadGraphImpl graph, int i) {
2589 if(single == 0) single = i;
2594 public void finished(ReadGraphImpl graph) {
2595 if(single == -1) single = 0;
2599 public void exception(ReadGraphImpl graph, Throwable throwable) {
2604 public int get() throws DatabaseException {
2606 if(t instanceof DatabaseException) throw (DatabaseException)t;
2607 else throw new DatabaseException(t);
2614 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2616 final int sId = querySupport.getId(subject);
2617 final int pId = querySupport.getId(predicate);
2619 Runner2Procedure proc = new Runner2Procedure();
2620 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2625 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2627 assert(subject != null);
2628 assert(predicate != null);
2630 final ListenerBase listener = getListenerBase(procedure);
2632 if(impl.parent != null || listener != null) {
2634 IntProcedure ip = new IntProcedure() {
2636 AtomicBoolean first = new AtomicBoolean(true);
2639 public void execute(ReadGraphImpl graph, int i) {
2642 procedure.execute(impl, querySupport.getResource(i));
2644 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2646 } catch (Throwable t2) {
2647 Logger.defaultLogError(t2);
2653 public void finished(ReadGraphImpl graph) {
2655 if(first.compareAndSet(true, false)) {
2656 procedure.finished(impl);
2657 // impl.state.barrier.dec(this);
2659 procedure.finished(impl.newRestart(graph));
2661 } catch (Throwable t2) {
2662 Logger.defaultLogError(t2);
2667 public void exception(ReadGraphImpl graph, Throwable t) {
2669 procedure.exception(graph, t);
2670 } catch (Throwable t2) {
2671 Logger.defaultLogError(t2);
2673 // impl.state.barrier.dec(this);
2677 public String toString() {
2678 return "forEachObject with " + procedure;
2683 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2684 // else impl.state.barrier.inc(null, null);
2686 forEachObject(impl, subject, predicate, listener, ip);
2690 IntProcedure ip = new IntProcedure() {
2693 public void execute(ReadGraphImpl graph, int i) {
2694 procedure.execute(graph, querySupport.getResource(i));
2698 public void finished(ReadGraphImpl graph) {
2699 procedure.finished(graph);
2703 public void exception(ReadGraphImpl graph, Throwable t) {
2704 procedure.exception(graph, t);
2708 public String toString() {
2709 return "forEachObject with " + procedure;
2714 forEachObject(impl, subject, predicate, listener, ip);
2721 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2723 assert(subject != null);
2724 assert(predicate != null);
2725 assert(procedure != null);
2727 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2729 private Set<Resource> current = null;
2730 private Set<Resource> run = new HashSet<Resource>();
2733 public void execute(AsyncReadGraph graph, Resource result) {
2735 boolean found = false;
2737 if(current != null) {
2739 found = current.remove(result);
2743 if(!found) procedure.add(graph, result);
2750 public void finished(AsyncReadGraph graph) {
2752 if(current != null) {
2753 for(Resource r : current) procedure.remove(graph, r);
2758 run = new HashSet<Resource>();
2763 public boolean isDisposed() {
2764 return procedure.isDisposed();
2768 public void exception(AsyncReadGraph graph, Throwable t) {
2769 procedure.exception(graph, t);
2773 public String toString() {
2774 return "forObjectSet " + procedure;
2782 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
2784 assert(subject != null);
2785 assert(procedure != null);
2787 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
2789 private Set<Resource> current = null;
2790 private Set<Resource> run = new HashSet<Resource>();
2793 public void execute(AsyncReadGraph graph, Resource result) {
2795 boolean found = false;
2797 if(current != null) {
2799 found = current.remove(result);
2803 if(!found) procedure.add(graph, result);
2810 public void finished(AsyncReadGraph graph) {
2812 if(current != null) {
2813 for(Resource r : current) procedure.remove(graph, r);
2818 run = new HashSet<Resource>();
2823 public boolean isDisposed() {
2824 return procedure.isDisposed();
2828 public void exception(AsyncReadGraph graph, Throwable t) {
2829 procedure.exception(graph, t);
2833 public String toString() {
2834 return "forPredicateSet " + procedure;
2842 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
2844 assert(subject != null);
2845 assert(procedure != null);
2847 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
2849 private Set<Resource> current = null;
2850 private Set<Resource> run = new HashSet<Resource>();
2853 public void execute(AsyncReadGraph graph, Resource result) {
2855 boolean found = false;
2857 if(current != null) {
2859 found = current.remove(result);
2863 if(!found) procedure.add(graph, result);
2870 public void finished(AsyncReadGraph graph) {
2872 if(current != null) {
2873 for(Resource r : current) procedure.remove(graph, r);
2878 run = new HashSet<Resource>();
2883 public boolean isDisposed() {
2884 return procedure.isDisposed();
2888 public void exception(AsyncReadGraph graph, Throwable t) {
2889 procedure.exception(graph, t);
2893 public String toString() {
2894 return "forPrincipalTypeSet " + procedure;
2902 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2904 assert(subject != null);
2905 assert(predicate != null);
2906 assert(procedure != null);
2908 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2910 private Set<Resource> current = null;
2911 private Set<Resource> run = new HashSet<Resource>();
2914 public void execute(AsyncReadGraph graph, Resource result) {
2916 boolean found = false;
2918 if(current != null) {
2920 found = current.remove(result);
2924 if(!found) procedure.add(graph, result);
2931 public void finished(AsyncReadGraph graph) {
2933 if(current != null) {
2934 for(Resource r : current) procedure.remove(graph, r);
2939 run = new HashSet<Resource>();
2944 public boolean isDisposed() {
2945 return procedure.isDisposed();
2949 public void exception(AsyncReadGraph graph, Throwable t) {
2950 procedure.exception(graph, t);
2954 public String toString() {
2955 return "forObjectSet " + procedure;
2963 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2965 assert(subject != null);
2966 assert(predicate != null);
2967 assert(procedure != null);
2969 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2971 private Set<Statement> current = null;
2972 private Set<Statement> run = new HashSet<Statement>();
2975 public void execute(AsyncReadGraph graph, Statement result) {
2977 boolean found = false;
2979 if(current != null) {
2981 found = current.remove(result);
2985 if(!found) procedure.add(graph, result);
2992 public void finished(AsyncReadGraph graph) {
2994 if(current != null) {
2995 for(Statement s : current) procedure.remove(graph, s);
3000 run = new HashSet<Statement>();
3005 public boolean isDisposed() {
3006 return procedure.isDisposed();
3010 public void exception(AsyncReadGraph graph, Throwable t) {
3011 procedure.exception(graph, t);
3015 public String toString() {
3016 return "forStatementSet " + procedure;
3024 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
3025 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3027 assert(subject != null);
3028 assert(predicate != null);
3029 assert(procedure != null);
3031 final ListenerBase listener = getListenerBase(procedure);
3034 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3037 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3039 procedure.execute(graph, querySupport.getResource(o));
3040 } catch (Throwable t2) {
3041 Logger.defaultLogError(t2);
3046 public void finished(ReadGraphImpl graph) {
3048 procedure.finished(graph);
3049 } catch (Throwable t2) {
3050 Logger.defaultLogError(t2);
3052 // impl.state.barrier.dec();
3056 public void exception(ReadGraphImpl graph, Throwable t) {
3058 procedure.exception(graph, t);
3059 } catch (Throwable t2) {
3060 Logger.defaultLogError(t2);
3062 // impl.state.barrier.dec();
3066 } catch (DatabaseException e) {
3067 Logger.defaultLogError(e);
3073 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3075 assert(subject != null);
3076 assert(procedure != null);
3078 final ListenerBase listener = getListenerBase(procedure);
3080 IntProcedure ip = new IntProcedure() {
3083 public void execute(ReadGraphImpl graph, int i) {
3085 procedure.execute(graph, querySupport.getResource(i));
3086 } catch (Throwable t2) {
3087 Logger.defaultLogError(t2);
3092 public void finished(ReadGraphImpl graph) {
3094 procedure.finished(graph);
3095 } catch (Throwable t2) {
3096 Logger.defaultLogError(t2);
3098 // impl.state.barrier.dec(this);
3102 public void exception(ReadGraphImpl graph, Throwable t) {
3104 procedure.exception(graph, t);
3105 } catch (Throwable t2) {
3106 Logger.defaultLogError(t2);
3108 // impl.state.barrier.dec(this);
3113 int sId = querySupport.getId(subject);
3115 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3116 // else impl.state.barrier.inc(null, null);
3119 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3120 } catch (DatabaseException e) {
3121 Logger.defaultLogError(e);
3127 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3129 assert(subject != null);
3130 assert(procedure != null);
3132 final ListenerBase listener = getListenerBase(procedure);
3134 // impl.state.barrier.inc();
3137 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3140 public void execute(ReadGraphImpl graph, int i) {
3142 procedure.execute(querySupport.getResource(i));
3143 } catch (Throwable t2) {
3144 Logger.defaultLogError(t2);
3149 public void finished(ReadGraphImpl graph) {
3151 procedure.finished();
3152 } catch (Throwable t2) {
3153 Logger.defaultLogError(t2);
3155 // impl.state.barrier.dec();
3159 public void exception(ReadGraphImpl graph, Throwable t) {
3161 procedure.exception(t);
3162 } catch (Throwable t2) {
3163 Logger.defaultLogError(t2);
3165 // impl.state.barrier.dec();
3169 } catch (DatabaseException e) {
3170 Logger.defaultLogError(e);
3174 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3176 assert(subject != null);
3177 assert(procedure != null);
3179 final ListenerBase listener = getListenerBase(procedure);
3180 assert(listener == null);
3182 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3185 public void execute(final ReadGraphImpl graph, IntSet set) {
3186 procedure.execute(graph, set);
3190 public void exception(ReadGraphImpl graph, Throwable t) {
3191 procedure.exception(graph, t);
3196 int sId = querySupport.getId(subject);
3199 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3200 } catch (DatabaseException e) {
3201 Logger.defaultLogError(e);
3207 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3209 assert(subject != null);
3211 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3216 final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3218 assert(subject != null);
3220 return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null);
3225 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3227 assert(subject != null);
3228 assert(procedure != null);
3230 final ListenerBase listener = getListenerBase(procedure);
3233 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3235 AtomicBoolean first = new AtomicBoolean(true);
3238 public void execute(final ReadGraphImpl graph, IntSet set) {
3239 // final HashSet<Resource> result = new HashSet<Resource>();
3240 // set.forEach(new TIntProcedure() {
3243 // public boolean execute(int type) {
3244 // result.add(querySupport.getResource(type));
3250 if(first.compareAndSet(true, false)) {
3251 procedure.execute(graph, set);
3252 // impl.state.barrier.dec();
3254 procedure.execute(impl.newRestart(graph), set);
3256 } catch (Throwable t2) {
3257 Logger.defaultLogError(t2);
3262 public void exception(ReadGraphImpl graph, Throwable t) {
3264 if(first.compareAndSet(true, false)) {
3265 procedure.exception(graph, t);
3266 // impl.state.barrier.dec();
3268 procedure.exception(impl.newRestart(graph), t);
3270 } catch (Throwable t2) {
3271 Logger.defaultLogError(t2);
3276 } catch (DatabaseException e) {
3277 Logger.defaultLogError(e);
3283 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3285 assert(subject != null);
3286 assert(procedure != null);
3288 final ListenerBase listener = getListenerBase(procedure);
3290 IntProcedure ip = new IntProcedureAdapter() {
3293 public void execute(final ReadGraphImpl graph, int superRelation) {
3295 procedure.execute(graph, querySupport.getResource(superRelation));
3296 } catch (Throwable t2) {
3297 Logger.defaultLogError(t2);
3302 public void finished(final ReadGraphImpl graph) {
3304 procedure.finished(graph);
3305 } catch (Throwable t2) {
3306 Logger.defaultLogError(t2);
3308 // impl.state.barrier.dec(this);
3313 public void exception(ReadGraphImpl graph, Throwable t) {
3315 procedure.exception(graph, t);
3316 } catch (Throwable t2) {
3317 Logger.defaultLogError(t2);
3319 // impl.state.barrier.dec(this);
3324 int sId = querySupport.getId(subject);
3326 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3327 // else impl.state.barrier.inc(null, null);
3330 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3331 } catch (DatabaseException e) {
3332 Logger.defaultLogError(e);
3335 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3340 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3342 assert(subject != null);
3343 assert(procedure != null);
3345 final ListenerBase listener = getListenerBase(procedure);
3347 // impl.state.barrier.inc();
3349 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3354 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3356 assert(subject != null);
3357 assert(procedure != null);
3359 final ListenerBase listener = getListenerBase(procedure);
3361 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3364 public void execute(final ReadGraphImpl graph, IntSet set) {
3365 // final HashSet<Resource> result = new HashSet<Resource>();
3366 // set.forEach(new TIntProcedure() {
3369 // public boolean execute(int type) {
3370 // result.add(querySupport.getResource(type));
3376 procedure.execute(graph, set);
3377 } catch (Throwable t2) {
3378 Logger.defaultLogError(t2);
3380 // impl.state.barrier.dec(this);
3384 public void exception(ReadGraphImpl graph, Throwable t) {
3386 procedure.exception(graph, t);
3387 } catch (Throwable t2) {
3388 Logger.defaultLogError(t2);
3390 // impl.state.barrier.dec(this);
3395 int sId = querySupport.getId(subject);
3397 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3398 // else impl.state.barrier.inc(null, null);
3401 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3402 } catch (DatabaseException e) {
3403 Logger.defaultLogError(e);
3408 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3409 return getValue(impl, querySupport.getId(subject));
3412 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3413 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3417 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3419 assert(subject != null);
3420 assert(procedure != null);
3422 int sId = querySupport.getId(subject);
3424 // if(procedure != null) {
3426 final ListenerBase listener = getListenerBase(procedure);
3428 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3430 AtomicBoolean first = new AtomicBoolean(true);
3433 public void execute(ReadGraphImpl graph, byte[] result) {
3435 if(first.compareAndSet(true, false)) {
3436 procedure.execute(graph, result);
3437 // impl.state.barrier.dec(this);
3439 procedure.execute(impl.newRestart(graph), result);
3441 } catch (Throwable t2) {
3442 Logger.defaultLogError(t2);
3447 public void exception(ReadGraphImpl graph, Throwable t) {
3449 if(first.compareAndSet(true, false)) {
3450 procedure.exception(graph, t);
3451 // impl.state.barrier.dec(this);
3453 procedure.exception(impl.newRestart(graph), t);
3455 } catch (Throwable t2) {
3456 Logger.defaultLogError(t2);
3462 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3463 // else impl.state.barrier.inc(null, null);
3466 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3467 } catch (DatabaseException e) {
3468 throw new IllegalStateException("Internal error");
3473 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3477 // throw new IllegalStateException("Internal error");
3482 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3484 assert(subject != null);
3485 assert(procedure != null);
3487 final ListenerBase listener = getListenerBase(procedure);
3489 if(impl.parent != null || listener != null) {
3491 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3493 AtomicBoolean first = new AtomicBoolean(true);
3496 public void execute(ReadGraphImpl graph, byte[] result) {
3498 if(first.compareAndSet(true, false)) {
3499 procedure.execute(graph, result);
3500 // impl.state.barrier.dec(this);
3502 procedure.execute(impl.newRestart(graph), result);
3504 } catch (Throwable t2) {
3505 Logger.defaultLogError(t2);
3510 public void exception(ReadGraphImpl graph, Throwable t) {
3512 if(first.compareAndSet(true, false)) {
3513 procedure.exception(graph, t);
3514 // impl.state.barrier.dec(this);
3516 procedure.exception(impl.newRestart(graph), t);
3518 } catch (Throwable t2) {
3519 Logger.defaultLogError(t2);
3525 int sId = querySupport.getId(subject);
3527 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3528 // else impl.state.barrier.inc(null, null);
3531 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3532 } catch (DatabaseException e) {
3533 Logger.defaultLogError(e);
3538 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3541 public void execute(ReadGraphImpl graph, byte[] result) {
3543 procedure.execute(graph, result);
3548 public void exception(ReadGraphImpl graph, Throwable t) {
3550 procedure.exception(graph, t);
3556 int sId = querySupport.getId(subject);
3559 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3560 } catch (DatabaseException e) {
3561 Logger.defaultLogError(e);
3569 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3571 assert(relation != null);
3572 assert(procedure != null);
3574 final ListenerBase listener = getListenerBase(procedure);
3576 IntProcedure ip = new IntProcedure() {
3578 private int result = 0;
3580 final AtomicBoolean found = new AtomicBoolean(false);
3581 final AtomicBoolean done = new AtomicBoolean(false);
3584 public void finished(ReadGraphImpl graph) {
3586 // Shall fire exactly once!
3587 if(done.compareAndSet(false, true)) {
3590 procedure.exception(graph, new NoInverseException(""));
3591 // impl.state.barrier.dec(this);
3593 procedure.execute(graph, querySupport.getResource(result));
3594 // impl.state.barrier.dec(this);
3596 } catch (Throwable t) {
3597 Logger.defaultLogError(t);
3604 public void execute(ReadGraphImpl graph, int i) {
3606 if(found.compareAndSet(false, true)) {
3609 // Shall fire exactly once!
3610 if(done.compareAndSet(false, true)) {
3612 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3613 // impl.state.barrier.dec(this);
3614 } catch (Throwable t) {
3615 Logger.defaultLogError(t);
3623 public void exception(ReadGraphImpl graph, Throwable t) {
3624 // Shall fire exactly once!
3625 if(done.compareAndSet(false, true)) {
3627 procedure.exception(graph, t);
3628 // impl.state.barrier.dec(this);
3629 } catch (Throwable t2) {
3630 Logger.defaultLogError(t2);
3637 int sId = querySupport.getId(relation);
3639 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3640 // else impl.state.barrier.inc(null, null);
3643 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3644 } catch (DatabaseException e) {
3645 Logger.defaultLogError(e);
3651 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3654 assert(procedure != null);
3656 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3659 public void execute(ReadGraphImpl graph, Integer result) {
3661 procedure.execute(graph, querySupport.getResource(result));
3662 } catch (Throwable t2) {
3663 Logger.defaultLogError(t2);
3665 // impl.state.barrier.dec(this);
3669 public void exception(ReadGraphImpl graph, Throwable t) {
3672 procedure.exception(graph, t);
3673 } catch (Throwable t2) {
3674 Logger.defaultLogError(t2);
3676 // impl.state.barrier.dec(this);
3681 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3682 // else impl.state.barrier.inc(null, null);
3684 forResource(impl, id, impl.parent, ip);
3689 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3692 assert(procedure != null);
3694 // impl.state.barrier.inc();
3697 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
3700 public void execute(ReadGraphImpl graph, Integer result) {
3702 procedure.execute(graph, querySupport.getResource(result));
3703 } catch (Throwable t2) {
3704 Logger.defaultLogError(t2);
3706 // impl.state.barrier.dec();
3710 public void exception(ReadGraphImpl graph, Throwable t) {
3712 procedure.exception(graph, t);
3713 } catch (Throwable t2) {
3714 Logger.defaultLogError(t2);
3716 // impl.state.barrier.dec();
3720 } catch (DatabaseException e) {
3721 Logger.defaultLogError(e);
3727 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
3729 assert(subject != null);
3730 assert(procedure != null);
3732 final ListenerBase listener = getListenerBase(procedure);
3735 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
3736 procedure.execute(impl, !result.isEmpty());
3737 } catch (DatabaseException e) {
3738 procedure.exception(impl, e);
3744 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
3746 assert(subject != null);
3747 assert(predicate != null);
3748 assert(procedure != null);
3750 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
3752 boolean found = false;
3755 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
3760 synchronized public void finished(AsyncReadGraph graph) {
3762 procedure.execute(graph, found);
3763 } catch (Throwable t2) {
3764 Logger.defaultLogError(t2);
3766 // impl.state.barrier.dec(this);
3770 public void exception(AsyncReadGraph graph, Throwable t) {
3772 procedure.exception(graph, t);
3773 } catch (Throwable t2) {
3774 Logger.defaultLogError(t2);
3776 // impl.state.barrier.dec(this);
3781 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
3782 // else impl.state.barrier.inc(null, null);
3784 forEachObject(impl, subject, predicate, ip);
3789 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
3791 assert(subject != null);
3792 assert(predicate != null);
3793 assert(procedure != null);
3795 // impl.state.barrier.inc();
3797 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
3799 boolean found = false;
3802 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
3803 if(resource.equals(object)) found = true;
3807 synchronized public void finished(AsyncReadGraph graph) {
3809 procedure.execute(graph, found);
3810 } catch (Throwable t2) {
3811 Logger.defaultLogError(t2);
3813 // impl.state.barrier.dec();
3817 public void exception(AsyncReadGraph graph, Throwable t) {
3819 procedure.exception(graph, t);
3820 } catch (Throwable t2) {
3821 Logger.defaultLogError(t2);
3823 // impl.state.barrier.dec();
3831 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
3833 assert(subject != null);
3834 assert(procedure != null);
3836 final ListenerBase listener = getListenerBase(procedure);
3838 // impl.state.barrier.inc();
3841 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
3844 public void execute(ReadGraphImpl graph, byte[] object) {
3845 boolean result = object != null;
3847 procedure.execute(graph, result);
3848 } catch (Throwable t2) {
3849 Logger.defaultLogError(t2);
3851 // impl.state.barrier.dec();
3855 public void exception(ReadGraphImpl graph, Throwable t) {
3857 procedure.exception(graph, t);
3858 } catch (Throwable t2) {
3859 Logger.defaultLogError(t2);
3861 // impl.state.barrier.dec();
3865 } catch (DatabaseException e) {
3866 Logger.defaultLogError(e);
3872 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3874 assert(subject != null);
3875 assert(procedure != null);
3877 final ListenerBase listener = getListenerBase(procedure);
3881 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3884 public void exception(ReadGraphImpl graph, Throwable t) {
3886 procedure.exception(graph, t);
3887 } catch (Throwable t2) {
3888 Logger.defaultLogError(t2);
3890 // impl.state.barrier.dec();
3894 public void execute(ReadGraphImpl graph, int i) {
3896 procedure.execute(graph, querySupport.getResource(i));
3897 } catch (Throwable t2) {
3898 Logger.defaultLogError(t2);
3903 public void finished(ReadGraphImpl graph) {
3905 procedure.finished(graph);
3906 } catch (Throwable t2) {
3907 Logger.defaultLogError(t2);
3909 // impl.state.barrier.dec();
3913 } catch (DatabaseException e) {
3914 Logger.defaultLogError(e);
3920 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
3922 // assert(request != null);
3923 // assert(procedure != null);
3925 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
3930 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
3932 // assert(graph != null);
3933 // assert(request != null);
3935 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
3936 // if(entry != null && entry.isReady()) {
3937 // return (T)entry.get(graph, this, null);
3939 // return request.perform(graph);
3944 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
3946 // assert(graph != null);
3947 // assert(request != null);
3949 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
3950 // if(entry != null && entry.isReady()) {
3951 // if(entry.isExcepted()) {
3952 // Throwable t = (Throwable)entry.getResult();
3953 // if(t instanceof DatabaseException) throw (DatabaseException)t;
3954 // else throw new DatabaseException(t);
3956 // return (T)entry.getResult();
3960 // final DataContainer<T> result = new DataContainer<T>();
3961 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3963 // request.register(graph, new Listener<T>() {
3966 // public void exception(Throwable t) {
3967 // exception.set(t);
3971 // public void execute(T t) {
3976 // public boolean isDisposed() {
3982 // Throwable t = exception.get();
3984 // if(t instanceof DatabaseException) throw (DatabaseException)t;
3985 // else throw new DatabaseException(t);
3988 // return result.get();
3995 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
3997 // assert(graph != null);
3998 // assert(request != null);
4000 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
4001 // if(entry != null && entry.isReady()) {
4002 // if(entry.isExcepted()) {
4003 // procedure.exception(graph, (Throwable)entry.getResult());
4005 // procedure.execute(graph, (T)entry.getResult());
4008 // request.perform(graph, procedure);
4014 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final SyncMultiProcedure<T> procedure, ListenerBase listener) {
4016 assert(request != null);
4017 assert(procedure != null);
4021 queryMultiRead(impl, request, parent, listener, procedure);
4023 } catch (DatabaseException e) {
4025 throw new IllegalStateException(e);
4032 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4034 assert(request != null);
4035 assert(procedure != null);
4037 // impl.state.barrier.inc();
4039 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4041 public void execute(AsyncReadGraph graph, T result) {
4044 procedure.execute(graph, result);
4045 } catch (Throwable t2) {
4046 Logger.defaultLogError(t2);
4051 public void finished(AsyncReadGraph graph) {
4054 procedure.finished(graph);
4055 } catch (Throwable t2) {
4056 Logger.defaultLogError(t2);
4059 // impl.state.barrier.dec();
4064 public String toString() {
4065 return procedure.toString();
4069 public void exception(AsyncReadGraph graph, Throwable t) {
4072 procedure.exception(graph, t);
4073 } catch (Throwable t2) {
4074 Logger.defaultLogError(t2);
4077 // impl.state.barrier.dec();
4086 // final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
4088 // assert(request != null);
4089 // assert(procedure != null);
4093 // queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4096 // public String toString() {
4097 // return procedure.toString();
4101 // public void execute(AsyncReadGraph graph, T result) {
4103 // procedure.execute(result);
4104 // } catch (Throwable t2) {
4105 // Logger.defaultLogError(t2);
4110 // public void exception(AsyncReadGraph graph, Throwable throwable) {
4112 // procedure.exception(throwable);
4113 // } catch (Throwable t2) {
4114 // Logger.defaultLogError(t2);
4120 // } catch (DatabaseException e) {
4122 // throw new IllegalStateException(e);
4129 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4131 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4136 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4138 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4143 public VirtualGraph getValueProvider(Resource subject) {
4145 return querySupport.getValueProvider(querySupport.getId(subject));
4149 public boolean resumeTasks(ReadGraphImpl graph) {
4151 return querySupport.resume(graph);
4155 public boolean isImmutable(int resourceId) {
4156 return querySupport.isImmutable(resourceId);
4159 public boolean isImmutable(Resource resource) {
4160 ResourceImpl impl = (ResourceImpl)resource;
4161 return isImmutable(impl.id);
4166 public Layer0 getL0(ReadGraph graph) {
4168 L0 = Layer0.getInstance(graph);
4173 public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
4174 protected Integer initialValue() {