1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
35 import org.eclipse.core.runtime.Platform;
36 import org.simantics.databoard.Bindings;
37 import org.simantics.db.AsyncReadGraph;
38 import org.simantics.db.DevelopmentKeys;
39 import org.simantics.db.DirectStatements;
40 import org.simantics.db.ReadGraph;
41 import org.simantics.db.RelationInfo;
42 import org.simantics.db.Resource;
43 import org.simantics.db.Session;
44 import org.simantics.db.Statement;
45 import org.simantics.db.VirtualGraph;
46 import org.simantics.db.common.ByteFileReader;
47 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
48 import org.simantics.db.common.utils.Logger;
49 import org.simantics.db.exception.DatabaseException;
50 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
51 import org.simantics.db.exception.NoInverseException;
52 import org.simantics.db.exception.ResourceNotFoundException;
53 import org.simantics.db.impl.ResourceImpl;
54 import org.simantics.db.impl.graph.BarrierTracing;
55 import org.simantics.db.impl.graph.ReadGraphImpl;
56 import org.simantics.db.impl.graph.ReadGraphSupport;
57 import org.simantics.db.impl.procedure.IntProcedureAdapter;
58 import org.simantics.db.impl.procedure.InternalProcedure;
59 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
60 import org.simantics.db.impl.support.ResourceSupport;
61 import org.simantics.db.procedure.AsyncMultiListener;
62 import org.simantics.db.procedure.AsyncMultiProcedure;
63 import org.simantics.db.procedure.AsyncProcedure;
64 import org.simantics.db.procedure.AsyncSetListener;
65 import org.simantics.db.procedure.ListenerBase;
66 import org.simantics.db.procedure.MultiProcedure;
67 import org.simantics.db.procedure.StatementProcedure;
68 import org.simantics.db.procedure.SyncMultiProcedure;
69 import org.simantics.db.request.AsyncMultiRead;
70 import org.simantics.db.request.ExternalRead;
71 import org.simantics.db.request.MultiRead;
72 import org.simantics.db.request.RequestFlags;
73 import org.simantics.db.service.Bytes;
74 import org.simantics.layer0.Layer0;
75 import org.simantics.utils.DataContainer;
76 import org.simantics.utils.Development;
77 import org.simantics.utils.FileUtils;
78 import org.simantics.utils.datastructures.Pair;
79 import org.simantics.utils.datastructures.collections.CollectionUtils;
80 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
82 import gnu.trove.procedure.TIntProcedure;
83 import gnu.trove.procedure.TLongProcedure;
84 import gnu.trove.procedure.TObjectProcedure;
85 import gnu.trove.set.hash.THashSet;
86 import gnu.trove.set.hash.TIntHashSet;
88 @SuppressWarnings({"rawtypes", "unchecked"})
89 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
91 public static int indent = 0;
96 public int boundQueries = 0;
99 final private int functionalRelation;
101 final private int superrelationOf;
103 final private int instanceOf;
105 final private int inverseOf;
107 final private int asserts;
109 final private int hasPredicate;
111 final private int hasPredicateInverse;
113 final private int hasObject;
115 final private int inherits;
117 final private int subrelationOf;
119 final private int rootLibrary;
122 * A cache for the root library resource. Initialized in
123 * {@link #getRootLibraryResource()}.
125 private volatile ResourceImpl rootLibraryResource;
127 final private int library;
129 final private int consistsOf;
131 final private int hasName;
133 AtomicInteger sleepers = new AtomicInteger(0);
135 boolean updating = false;
138 final public QueryCache cache;
139 final public QuerySupport querySupport;
140 final public Session session;
141 final public ResourceSupport resourceSupport;
143 final public Semaphore requests = new Semaphore(1);
145 final public QueryListening listening = new QueryListening(this);
147 QueryThread[] executors;
151 INIT, RUN, SLEEP, DISPOSED
155 final Scheduling scheduling;
157 public ThreadState[] threadStates;
159 final Object querySupportLock;
161 public Long modificationCounter = 0L;
163 public void close() {
168 * We are running errands while waiting for requests to complete.
169 * We can only run work that is part of the current root request to avoid any deadlocks
171 public boolean performPending(ReadGraphImpl under) {
172 SessionTask task = scheduling.getSubTask(under);
174 task.run(thread.get());
180 final public void scheduleNow(SessionTask request) {
181 SessionTask toExecute = scheduleOrReturnForExecution(request);
182 if(toExecute != null)
183 toExecute.run(thread.get());
186 final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
188 return scheduling.scheduleOrReturnForExecution(request);
194 final public int THREAD_MASK;
196 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
198 public static abstract class SessionTask {
200 final protected ReadGraphImpl rootGraph;
201 private int counter = 0;
202 protected int position = 1;
203 private Exception trace;
205 public SessionTask() {
209 public SessionTask(ReadGraphImpl rootGraph) {
210 this.rootGraph = rootGraph;
213 public boolean isSubtask(ReadGraphImpl graph) {
214 return graph.isParent(rootGraph);
217 public abstract void run0(int thread);
219 public final void run(int thread) {
221 if(BarrierTracing.BOOKKEEPING) {
222 trace.printStackTrace();
223 new Exception().printStackTrace();
225 throw new IllegalStateException("Multiple invocations of SessionTask!");
227 if(BarrierTracing.BOOKKEEPING) {
228 trace = new Exception();
233 public boolean maybeReady() {
238 public String toString() {
239 if(rootGraph == null)
240 return "SessionTask[no graph]";
242 return "SessionTask[" + rootGraph.parent + "]";
247 public static abstract class SessionRead extends SessionTask {
249 final public Semaphore notify;
250 final public DataContainer<Throwable> throwable;
252 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
254 this.throwable = throwable;
255 this.notify = notify;
260 public boolean resume(ReadGraphImpl graph) {
261 return executors[0].runSynchronized();
264 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
265 throws DatabaseException {
268 THREAD_MASK = threads - 1;
270 scheduling = new Scheduling(requests);
273 cache = new QueryCache(core, threads);
274 session = querySupport.getSession();
275 resourceSupport = querySupport.getSupport();
276 querySupportLock = core.getLock();
278 executors = new QueryThread[THREADS];
279 threadStates = new ThreadState[THREADS];
281 for (int i = 0; i < THREADS; i++) {
282 threadStates[i] = ThreadState.INIT;
285 for (int i = 0; i < THREADS; i++) {
289 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
291 threadSet.add(executors[i]);
296 for (int i = 0; i < THREADS; i++) {
297 executors[i].start();
300 // Make sure that query threads are up and running
301 while(sleepers.get() != THREADS) {
304 } catch (InterruptedException e) {
309 rootLibrary = core.getBuiltin("http:/");
310 boolean builtinsInstalled = rootLibrary != 0;
312 if (builtinsInstalled) {
313 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
314 assert (functionalRelation != 0);
316 functionalRelation = 0;
318 if (builtinsInstalled) {
319 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
320 assert (instanceOf != 0);
324 if (builtinsInstalled) {
325 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
326 assert (inverseOf != 0);
331 if (builtinsInstalled) {
332 inherits = core.getBuiltin(Layer0.URIs.Inherits);
333 assert (inherits != 0);
337 if (builtinsInstalled) {
338 asserts = core.getBuiltin(Layer0.URIs.Asserts);
339 assert (asserts != 0);
343 if (builtinsInstalled) {
344 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
345 assert (hasPredicate != 0);
349 if (builtinsInstalled) {
350 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
351 assert (hasPredicateInverse != 0);
353 hasPredicateInverse = 0;
355 if (builtinsInstalled) {
356 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
357 assert (hasObject != 0);
361 if (builtinsInstalled) {
362 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
363 assert (subrelationOf != 0);
367 if (builtinsInstalled) {
368 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
369 assert (superrelationOf != 0);
373 if (builtinsInstalled) {
374 library = core.getBuiltin(Layer0.URIs.Library);
375 assert (library != 0);
379 if (builtinsInstalled) {
380 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
381 assert (consistsOf != 0);
385 if (builtinsInstalled) {
386 hasName = core.getBuiltin(Layer0.URIs.HasName);
387 assert (hasName != 0);
393 final public void releaseWrite(ReadGraphImpl graph) {
394 propagateChangesInQueryCache(graph);
395 modificationCounter++;
398 final public int getId(final Resource r) {
399 return querySupport.getId(r);
402 public QuerySupport getCore() {
406 public int getFunctionalRelation() {
407 return functionalRelation;
410 public int getInherits() {
414 public int getInstanceOf() {
418 public int getInverseOf() {
422 public int getSubrelationOf() {
423 return subrelationOf;
426 public int getSuperrelationOf() {
427 return superrelationOf;
430 public int getAsserts() {
434 public int getHasPredicate() {
438 public int getHasPredicateInverse() {
439 return hasPredicateInverse;
442 public int getHasObject() {
446 public int getRootLibrary() {
450 public Resource getRootLibraryResource() {
451 if (rootLibraryResource == null) {
452 // Synchronization is not needed here, it doesn't matter if multiple
453 // threads simultaneously set rootLibraryResource once.
454 int root = getRootLibrary();
456 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
457 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
459 return rootLibraryResource;
462 public int getLibrary() {
466 public int getConsistsOf() {
470 public int getHasName() {
474 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
478 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
481 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
483 if (result != null && result != 0) {
484 procedure.execute(graph, result);
488 // Fall back to using the fixed builtins.
489 // result = querySupport.getBuiltin(id);
490 // if (result != 0) {
491 // procedure.execute(graph, result);
496 // result = querySupport.getRandomAccessReference(id);
497 // } catch (ResourceNotFoundException e) {
498 // procedure.exception(graph, e);
503 procedure.execute(graph, result);
505 procedure.exception(graph, new ResourceNotFoundException(id));
511 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
512 procedure.exception(graph, t);
516 } catch (DatabaseException e) {
520 procedure.exception(graph, e);
522 } catch (DatabaseException e1) {
524 Logger.defaultLogError(e1);
532 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
534 Integer result = querySupport.getBuiltin(id);
536 procedure.execute(graph, result);
538 procedure.exception(graph, new ResourceNotFoundException(id));
543 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
546 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
547 } catch (DatabaseException e) {
548 throw new IllegalStateException(e);
553 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
557 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
558 } catch (DatabaseException e) {
559 throw new IllegalStateException(e);
564 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
565 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
569 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
571 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
575 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
577 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
581 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
583 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
587 boolean isBound(ExternalReadEntry<?> entry) {
588 if(entry.hasParents()) return true;
589 else if(listening.hasListener(entry)) return true;
593 static class Dummy implements InternalProcedure<Object>, IntProcedure {
596 public void execute(ReadGraphImpl graph, int i) {
600 public void finished(ReadGraphImpl graph) {
604 public void execute(ReadGraphImpl graph, Object result) {
608 public void exception(ReadGraphImpl graph, Throwable throwable) {
613 private static final Dummy dummy = new Dummy();
616 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
618 if (DebugPolicy.PERFORM)
619 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
622 assert (!collecting);
624 assert(query.assertNotDiscarded());
626 registerDependencies(graph, query, parent, listener, procedure, false);
628 // FRESH, REFUTED, EXCEPTED go here
629 if (!query.isReady()) {
634 query.computeForEach(graph, this, (Procedure)dummy, true);
635 return query.get(graph, this, null);
641 return query.get(graph, this, procedure);
649 interface QueryCollectorSupport {
650 public CacheCollectionResult allCaches();
651 public Collection<CacheEntry> getRootList();
652 public int getCurrentSize();
653 public int calculateCurrentSize();
654 public CacheEntryBase iterate(int level);
655 public void remove();
656 public void setLevel(CacheEntryBase entry, int level);
657 public boolean start(boolean flush);
660 interface QueryCollector {
662 public void collect(int youngTarget, int allowedTimeInMs);
666 class QueryCollectorSupportImpl implements QueryCollectorSupport {
668 private static final boolean DEBUG = false;
669 private static final double ITERATION_RATIO = 0.2;
671 private CacheCollectionResult iteration = new CacheCollectionResult();
672 private boolean fresh = true;
673 private boolean needDataInStart = true;
675 QueryCollectorSupportImpl() {
679 public CacheCollectionResult allCaches() {
680 CacheCollectionResult result = new CacheCollectionResult();
681 QueryProcessor.this.allCaches(result);
686 public boolean start(boolean flush) {
687 // We need new data from query maps
689 if(needDataInStart || flush) {
690 // Last run ended after processing all queries => refresh data
691 restart(flush ? 0.0 : ITERATION_RATIO);
693 // continue with previous big data
695 // Notify caller about iteration situation
696 return iteration.isAtStart();
699 private void restart(double targetRatio) {
701 needDataInStart = true;
703 long start = System.nanoTime();
706 // We need new data from query maps
708 int iterationSize = iteration.size()+1;
709 int diff = calculateCurrentSize()-iterationSize;
711 double ratio = (double)diff / (double)iterationSize;
712 boolean dirty = Math.abs(ratio) >= targetRatio;
715 iteration = allCaches();
717 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
718 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
719 System.err.print(" " + iteration.levels[i].size());
720 System.err.println("");
727 needDataInStart = false;
729 // We are returning here within the same GC round - reuse the cache table
738 public CacheEntryBase iterate(int level) {
740 CacheEntryBase entry = iteration.next(level);
742 restart(ITERATION_RATIO);
746 while(entry != null && entry.isDiscarded()) {
747 entry = iteration.next(level);
755 public void remove() {
760 public void setLevel(CacheEntryBase entry, int level) {
761 iteration.setLevel(entry, level);
764 public Collection<CacheEntry> getRootList() {
765 return cache.getRootList();
769 public int calculateCurrentSize() {
770 return cache.calculateCurrentSize();
774 public int getCurrentSize() {
779 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
781 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
782 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
784 public int querySize() {
788 public void gc(int youngTarget, int allowedTimeInMs) {
790 collector.collect(youngTarget, allowedTimeInMs);
795 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
797 if(entry.isDiscarded()) return;
798 if(workarea.containsKey(entry)) return;
800 Iterable<CacheEntry> parents = entry.getParents(this);
801 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
802 for(CacheEntry e : parents) {
803 if(e.isDiscarded()) continue;
805 processParentReport(e, workarea);
807 workarea.put(entry, ps);
811 public synchronized String reportQueryActivity(File file) throws IOException {
813 System.err.println("reportQueries " + file.getAbsolutePath());
818 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
820 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
821 Collections.reverse(entries);
823 for(Pair<String,Integer> entry : entries) {
824 b.println(entry.first + ": " + entry.second);
829 Development.histogram.clear();
835 public synchronized String reportQueries(File file) throws IOException {
837 System.err.println("reportQueries " + file.getAbsolutePath());
842 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
844 long start = System.nanoTime();
846 // ArrayList<CacheEntry> all = ;
848 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
849 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
850 for(CacheEntryBase entry : caches) {
851 processParentReport(entry, workarea);
854 // for(CacheEntry e : all) System.err.println("entry: " + e);
856 long duration = System.nanoTime() - start;
857 System.err.println("Query root set in " + 1e-9*duration + "s.");
859 start = System.nanoTime();
861 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
865 for(CacheEntry entry : workarea.keySet()) {
866 boolean listener = listening.hasListenerAfterDisposing(entry);
867 boolean hasParents = entry.getParents(this).iterator().hasNext();
870 flagMap.put(entry, 0);
871 } else if (!hasParents) {
873 flagMap.put(entry, 1);
876 flagMap.put(entry, 2);
889 long start2 = System.nanoTime();
891 int boundCounter = 0;
892 int unboundCounter = 0;
893 int unknownCounter = 0;
895 for(CacheEntry<?> entry : workarea.keySet()) {
897 //System.err.println("process " + entry);
899 int flags = flagMap.get(entry);
900 int bindStatus = flags & 3;
902 if(bindStatus == 0) boundCounter++;
903 else if(bindStatus == 1) unboundCounter++;
904 else if(bindStatus == 2) unknownCounter++;
906 if(bindStatus < 2) continue;
909 for(CacheEntry parent : entry.getParents(this)) {
911 if(parent.isDiscarded()) flagMap.put(parent, 1);
913 int flags2 = flagMap.get(parent);
914 int bindStatus2 = flags2 & 3;
915 // Parent is bound => child is bound
916 if(bindStatus2 == 0) {
920 // Parent is unknown => child is unknown
921 else if (bindStatus2 == 2) {
928 flagMap.put(entry, newStatus);
932 duration = System.nanoTime() - start2;
933 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
934 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
936 } while(!done && loops++ < 20);
940 for(CacheEntry entry : workarea.keySet()) {
942 int bindStatus = flagMap.get(entry);
943 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
949 duration = System.nanoTime() - start;
950 System.err.println("Query analysis in " + 1e-9*duration + "s.");
952 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
954 for(CacheEntry entry : workarea.keySet()) {
955 Class<?> clazz = entry.getClass();
956 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).id.getClass();
957 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).id.getClass();
958 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).id.getClass();
959 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).id.getClass();
960 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).id.getClass();
961 Integer c = counts.get(clazz);
962 if(c == null) counts.put(clazz, -1);
963 else counts.put(clazz, c-1);
966 b.print("// Simantics DB client query report file\n");
967 b.print("// This file contains the following information\n");
968 b.print("// -The amount of cached query instances per query class\n");
969 b.print("// -The sizes of retained child sets\n");
970 b.print("// -List of parents for each query (search for 'P <query name>')\n");
971 b.print("// -Followed by status, where\n");
972 b.print("// -0=bound\n");
973 b.print("// -1=free\n");
974 b.print("// -2=unknown\n");
975 b.print("// -L=has listener\n");
976 b.print("// -List of children for each query (search for 'C <query name>')\n");
978 b.print("----------------------------------------\n");
980 b.print("// Queries by class\n");
981 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
982 b.print(-p.second + " " + p.first.getName() + "\n");
985 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
986 for(CacheEntry e : workarea.keySet())
989 boolean changed = true;
991 while(changed && iter++<50) {
995 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
996 for(CacheEntry e : workarea.keySet())
999 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1000 Integer c = hist.get(e.getKey());
1001 for(CacheEntry p : e.getValue()) {
1002 Integer i = newHist.get(p);
1003 newHist.put(p, i+c);
1006 for(CacheEntry e : workarea.keySet()) {
1007 Integer value = newHist.get(e);
1008 Integer old = hist.get(e);
1009 if(!value.equals(old)) {
1011 // System.err.println("hist " + e + ": " + old + " => " + value);
1016 System.err.println("Retained set iteration " + iter);
1020 b.print("// Queries by retained set\n");
1021 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1022 b.print("" + -p.second + " " + p.first + "\n");
1025 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1027 b.print("// Entry parent listing\n");
1028 for(CacheEntry entry : workarea.keySet()) {
1029 int status = flagMap.get(entry);
1030 boolean hasListener = listening.hasListenerAfterDisposing(entry);
1031 b.print("Q " + entry.toString());
1033 b.print(" (L" + status + ")");
1036 b.print(" (" + status + ")");
1039 for(CacheEntry parent : workarea.get(entry)) {
1040 Collection<CacheEntry> inv = inverse.get(parent);
1042 inv = new ArrayList<CacheEntry>();
1043 inverse.put(parent, inv);
1046 b.print(" " + parent.toString());
1051 b.print("// Entry child listing\n");
1052 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1053 b.print("C " + entry.getKey().toString());
1055 for(CacheEntry child : entry.getValue()) {
1056 Integer h = hist.get(child);
1060 b.print(" <no children>");
1062 b.print(" " + child.toString());
1067 b.print("#queries: " + workarea.keySet().size() + "\n");
1068 b.print("#listeners: " + listeners + "\n");
1072 return "Dumped " + workarea.keySet().size() + " queries.";
1076 public synchronized void save() throws IOException {
1078 long start = System.nanoTime();
1080 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1081 Map<Long,List<CacheEntryBase>> cachesByCluster = new HashMap<>();
1082 for(CacheEntryBase entry : caches) {
1083 String clazz = entry.classId();
1086 long cluster = entry.cluster(this);
1087 List<CacheEntryBase> queries = cachesByCluster.get(cluster);
1088 if(queries == null) {
1089 queries = new ArrayList<>();
1090 cachesByCluster.put(cluster, queries);
1095 File workspace = Platform.getLocation().toFile();
1096 File dir = new File(workspace, "queryData");
1097 FileUtils.deleteAll(dir);
1101 for(Long cluster : cachesByCluster.keySet()) {
1103 List<CacheEntryBase> queries = cachesByCluster.get(cluster);
1104 QuerySerializer serializer = new QuerySerializer(this);
1106 int pos = serializer.writeUnknownSize();
1107 for(CacheEntryBase entry : queries) {
1108 String clazz = entry.classId();
1112 entry.serialize(serializer);
1114 } catch (IllegalStateException e) {
1115 System.err.println(e.getMessage());
1118 serializer.setUnknownSize(pos, count);
1120 System.err.println(serializer.bytes().length + " bytes for cluster " + cluster);
1121 FileUtils.writeFile(new File(dir, "" + cluster + ".queryData"), serializer.bytes());
1125 long end = System.nanoTime();
1127 System.err.println("saved queries in " + 1e-6*(end-start) + "ms.");
1131 public void restore() throws IOException {
1133 long start = System.nanoTime();
1135 File workspace = Platform.getLocation().toFile();
1136 File dir = new File(workspace, "queryData");
1139 for(File f : FileUtils.listFilesByExtension(dir, "queryData")) {
1140 byte[] bytes = FileUtils.readFile(f);
1141 QueryDeserializer qd = new QueryDeserializer(this, bytes);
1146 long end = System.nanoTime();
1148 System.err.println("restored queries in " + 1e-6*(end-start) + "ms.");
1152 boolean removeQuery(CacheEntry entry) {
1154 // This entry has been removed before. No need to do anything here.
1155 if(entry.isDiscarded()) return false;
1157 assert (!entry.isDiscarded());
1159 Query query = entry.getQuery();
1161 query.removeEntry(this);
1166 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1177 * @return true if this entry is being listened
1179 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1183 CacheEntry entry = e.entry;
1186 * If the dependency graph forms a DAG, some entries are inserted in the
1187 * todo list many times. They only need to be processed once though.
1189 if (entry.isDiscarded()) {
1190 if (Development.DEVELOPMENT) {
1191 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1192 System.err.print("D");
1193 for (int i = 0; i < e.indent; i++)
1194 System.err.print(" ");
1195 System.err.println(entry.getQuery());
1198 // System.err.println(" => DISCARDED");
1202 // if (entry.isRefuted()) {
1203 // if (Development.DEVELOPMENT) {
1204 // if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1205 // System.err.print("R");
1206 // for (int i = 0; i < e.indent; i++)
1207 // System.err.print(" ");
1208 // System.err.println(entry.getQuery());
1214 if (entry.isExcepted()) {
1215 if (Development.DEVELOPMENT) {
1216 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1217 System.err.print("E");
1222 if (entry.isPending()) {
1223 if (Development.DEVELOPMENT) {
1224 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1225 System.err.print("P");
1232 if (Development.DEVELOPMENT) {
1233 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1234 System.err.print("U ");
1235 for (int i = 0; i < e.indent; i++)
1236 System.err.print(" ");
1237 System.err.print(entry.getQuery());
1241 Query query = entry.getQuery();
1242 int type = query.type();
1244 boolean hasListener = listening.hasListener(entry);
1246 if (Development.DEVELOPMENT) {
1247 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1248 if(listening.hasListener(entry)) {
1249 System.err.println(" (L)");
1251 System.err.println("");
1256 if(entry.isPending() || entry.isExcepted()) {
1259 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1261 immediates.put(entry, entry);
1276 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1278 immediates.put(entry, entry);
1292 // System.err.println(" => FOO " + type);
1295 ArrayList<ListenerEntry> entries = listening.listeners.get(entry);
1296 if(entries != null) {
1297 for (ListenerEntry le : entries) {
1298 listening.scheduleListener(le);
1303 // If invalid, update parents
1304 if (type == RequestFlags.INVALIDATE) {
1305 listening.updateParents(e.indent, entry, todo);
1313 * @param av1 an array (guaranteed)
1314 * @param av2 any object
1315 * @return <code>true</code> if the two arrays are equal
1317 private final boolean arrayEquals(Object av1, Object av2) {
1320 Class<?> c1 = av1.getClass().getComponentType();
1321 Class<?> c2 = av2.getClass().getComponentType();
1322 if (c2 == null || !c1.equals(c2))
1324 boolean p1 = c1.isPrimitive();
1325 boolean p2 = c2.isPrimitive();
1329 return Arrays.equals((Object[]) av1, (Object[]) av2);
1330 if (boolean.class.equals(c1))
1331 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1332 else if (byte.class.equals(c1))
1333 return Arrays.equals((byte[]) av1, (byte[]) av2);
1334 else if (int.class.equals(c1))
1335 return Arrays.equals((int[]) av1, (int[]) av2);
1336 else if (long.class.equals(c1))
1337 return Arrays.equals((long[]) av1, (long[]) av2);
1338 else if (float.class.equals(c1))
1339 return Arrays.equals((float[]) av1, (float[]) av2);
1340 else if (double.class.equals(c1))
1341 return Arrays.equals((double[]) av1, (double[]) av2);
1342 throw new RuntimeException("??? Contact application querySupport.");
1347 final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1351 Query query = entry.getQuery();
1353 if (Development.DEVELOPMENT) {
1354 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_RECOMPUTE, Bindings.BOOLEAN)) {
1355 System.err.println("R " + query);
1359 entry.prepareRecompute(querySupport);
1361 ReadGraphImpl parentGraph = graph.forRecompute(entry);
1362 parentGraph.asyncBarrier.inc();
1363 query.recompute(parentGraph);
1364 parentGraph.asyncBarrier.dec();
1366 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1368 Object newValue = entry.getResult();
1370 if (ListenerEntry.NO_VALUE == oldValue) {
1371 if (Development.DEVELOPMENT) {
1372 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1373 System.out.println("C " + query);
1374 System.out.println("- " + oldValue);
1375 System.out.println("- " + newValue);
1381 boolean changed = false;
1383 if (newValue != null) {
1384 if (newValue.getClass().isArray()) {
1385 changed = !arrayEquals(newValue, oldValue);
1387 changed = !newValue.equals(oldValue);
1390 changed = (oldValue != null);
1392 if (Development.DEVELOPMENT) {
1393 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1394 System.err.println("C " + query);
1395 System.err.println("- " + oldValue);
1396 System.err.println("- " + newValue);
1400 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1402 } catch (Throwable t) {
1404 Logger.defaultLogError(t);
1406 return ListenerEntry.NO_VALUE;
1415 * @return true if this entry still has listeners
1417 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1419 assert (!cache.collecting);
1423 boolean hadListeners = false;
1424 boolean listenersUnknown = false;
1428 assert(entry != null);
1429 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1430 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1431 todo.add(new UpdateEntry(null, entry, 0));
1435 // Walk the tree and collect immediate updates
1436 while (!todo.isEmpty()) {
1437 UpdateEntry e = todo.pop();
1438 hadListeners |= updateQuery(e, todo, immediates);
1441 if(immediates.isEmpty()) break;
1443 // Evaluate all immediate updates and collect parents to update
1444 for(CacheEntry immediate : immediates.values()) {
1446 if(immediate.isDiscarded()) {
1450 if(immediate.isExcepted()) {
1452 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1453 if (newValue != ListenerEntry.NOT_CHANGED)
1454 listening.updateParents(0, immediate, todo);
1458 Object oldValue = immediate.getResult();
1459 Object newValue = compareTo(graph, immediate, oldValue);
1461 if (newValue != ListenerEntry.NOT_CHANGED) {
1462 listening.updateParents(0, immediate, todo);
1464 // If not changed, keep the old value
1465 immediate.setResult(oldValue);
1466 immediate.setReady();
1467 listenersUnknown = true;
1477 } catch (Throwable t) {
1478 Logger.defaultLogError(t);
1484 return hadListeners | listenersUnknown;
1488 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1489 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1490 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1491 // Maybe use a mutex from util.concurrent?
1492 private Object primitiveUpdateLock = new Object();
1493 private THashSet scheduledPrimitiveUpdates = new THashSet();
1495 private ArrayList<CacheEntry> refutations = new ArrayList<>();
1497 private void markForUpdate(ReadGraphImpl graph, CacheEntry e) {
1502 private void updateRefutations(ReadGraphImpl graph) {
1504 for(CacheEntry e : refutations)
1507 refutations.clear();
1511 public void propagateChangesInQueryCache(final ReadGraphImpl graph) {
1513 // Make sure that listening has performed its work
1516 cache.dirty = false;
1519 if (Development.DEVELOPMENT) {
1520 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1521 System.err.println("== Query update ==");
1525 // Special case - one statement
1526 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1528 long arg0 = scheduledObjectUpdates.getFirst();
1530 final int subject = (int)(arg0 >>> 32);
1531 final int predicate = (int)(arg0 & 0xffffffff);
1533 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1534 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1535 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1537 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1538 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1539 if(principalTypes != null) markForUpdate(graph, principalTypes);
1540 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1541 if(types != null) markForUpdate(graph, types);
1544 if(predicate == subrelationOf) {
1545 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1546 if(superRelations != null) markForUpdate(graph, superRelations);
1549 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1550 if(dp != null) markForUpdate(graph, dp);
1551 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1552 if(os != null) markForUpdate(graph, os);
1554 updateRefutations(graph);
1556 scheduledObjectUpdates.clear();
1558 if (Development.DEVELOPMENT) {
1559 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1560 System.err.println("== Query update ends ==");
1568 // Special case - one value
1569 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1571 int arg0 = scheduledValueUpdates.getFirst();
1573 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1574 if(valueQuery != null) markForUpdate(graph, valueQuery);
1576 updateRefutations(graph);
1578 scheduledValueUpdates.clear();
1580 if (Development.DEVELOPMENT) {
1581 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1582 System.err.println("== Query update ends ==");
1590 final TIntHashSet predicates = new TIntHashSet();
1591 final TIntHashSet orderedSets = new TIntHashSet();
1593 THashSet primitiveUpdates;
1594 synchronized (primitiveUpdateLock) {
1595 primitiveUpdates = scheduledPrimitiveUpdates;
1596 scheduledPrimitiveUpdates = new THashSet();
1599 scheduledValueUpdates.forEach(new TIntProcedure() {
1602 public boolean execute(int arg0) {
1603 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1604 if(valueQuery != null) markForUpdate(graph, valueQuery);
1610 scheduledInvalidates.forEach(new TIntProcedure() {
1613 public boolean execute(int resource) {
1615 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1616 if(valueQuery != null) markForUpdate(graph, valueQuery);
1618 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1619 if(principalTypes != null) markForUpdate(graph, principalTypes);
1620 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1621 if(types != null) markForUpdate(graph, types);
1623 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1624 if(superRelations != null) markForUpdate(graph, superRelations);
1626 predicates.add(resource);
1633 scheduledObjectUpdates.forEach(new TLongProcedure() {
1636 public boolean execute(long arg0) {
1638 final int subject = (int)(arg0 >>> 32);
1639 final int predicate = (int)(arg0 & 0xffffffff);
1641 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1642 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1643 if(principalTypes != null) markForUpdate(graph, principalTypes);
1644 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1645 if(types != null) markForUpdate(graph, types);
1648 if(predicate == subrelationOf) {
1649 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1650 if(superRelations != null) markForUpdate(graph, superRelations);
1653 predicates.add(subject);
1654 orderedSets.add(predicate);
1662 predicates.forEach(new TIntProcedure() {
1665 public boolean execute(final int subject) {
1667 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1668 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1669 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1671 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1672 if(entry != null) markForUpdate(graph, entry);
1680 orderedSets.forEach(new TIntProcedure() {
1683 public boolean execute(int orderedSet) {
1685 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1686 if(entry != null) markForUpdate(graph, entry);
1694 updateRefutations(graph);
1696 primitiveUpdates.forEach(new TObjectProcedure() {
1699 public boolean execute(Object arg0) {
1701 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1702 if (query != null) {
1703 boolean listening = update(graph, query);
1704 if (!listening && !query.hasParents()) {
1705 cache.externalReadEntryMap.remove(arg0);
1714 scheduledValueUpdates.clear();
1715 scheduledObjectUpdates.clear();
1716 scheduledInvalidates.clear();
1718 if (Development.DEVELOPMENT) {
1719 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1720 System.err.println("== Query update ends ==");
1726 public void updateValue(final int resource) {
1727 scheduledValueUpdates.add(resource);
1731 public void updateStatements(final int resource, final int predicate) {
1732 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
1736 private int lastInvalidate = 0;
1738 public void invalidateResource(final int resource) {
1739 if(lastInvalidate == resource) return;
1740 scheduledValueUpdates.add(resource);
1741 lastInvalidate = resource;
1745 public void updatePrimitive(final ExternalRead primitive) {
1747 // External reads may be updated from arbitrary threads.
1748 // Synchronize to prevent race-conditions.
1749 synchronized (primitiveUpdateLock) {
1750 scheduledPrimitiveUpdates.add(primitive);
1752 querySupport.dirtyPrimitives();
1757 public synchronized String toString() {
1758 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
1762 protected void doDispose() {
1764 requests.release(Integer.MAX_VALUE / 2);
1766 for(int index = 0; index < THREADS; index++) {
1767 executors[index].dispose();
1771 for(int i=0;i<100;i++) {
1773 boolean alive = false;
1774 for(int index = 0; index < THREADS; index++) {
1775 alive |= executors[index].isAlive();
1780 } catch (InterruptedException e) {
1781 Logger.defaultLogError(e);
1786 // Then start interrupting
1787 for(int i=0;i<100;i++) {
1789 boolean alive = false;
1790 for(int index = 0; index < THREADS; index++) {
1791 alive |= executors[index].isAlive();
1794 for(int index = 0; index < THREADS; index++) {
1795 executors[index].interrupt();
1799 // // Then just destroy
1800 // for(int index = 0; index < THREADS; index++) {
1801 // executors[index].destroy();
1804 for(int index = 0; index < THREADS; index++) {
1806 executors[index].join(5000);
1807 } catch (InterruptedException e) {
1808 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
1810 executors[index] = null;
1815 public int getHits() {
1819 public int getMisses() {
1820 return cache.misses;
1823 public int getSize() {
1827 public Set<Long> getReferencedClusters() {
1828 HashSet<Long> result = new HashSet<Long>();
1829 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
1830 Objects query = (Objects) entry.getQuery();
1831 result.add(querySupport.getClusterId(query.r1()));
1833 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
1834 DirectPredicates query = (DirectPredicates) entry.getQuery();
1835 result.add(querySupport.getClusterId(query.id));
1837 for (CacheEntry entry : cache.valueQueryMap.values()) {
1838 ValueQuery query = (ValueQuery) entry.getQuery();
1839 result.add(querySupport.getClusterId(query.id));
1844 public long cluster(int resource) {
1847 return querySupport.getClusterId(resource);
1850 public void assertDone() {
1853 CacheCollectionResult allCaches(CacheCollectionResult result) {
1855 return cache.allCaches(result);
1859 public void printDiagnostics() {
1862 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
1863 querySupport.requestCluster(graph, clusterId, runnable);
1866 public int clean() {
1867 collector.collect(0, Integer.MAX_VALUE);
1871 public void clean(final Collection<ExternalRead<?>> requests) {
1872 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
1873 Iterator<ExternalRead<?>> iterator = requests.iterator();
1875 public CacheCollectionResult allCaches() {
1876 throw new UnsupportedOperationException();
1879 public CacheEntryBase iterate(int level) {
1880 if(iterator.hasNext()) {
1881 ExternalRead<?> request = iterator.next();
1882 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1883 if (entry != null) return entry;
1884 else return iterate(level);
1886 iterator = requests.iterator();
1891 public void remove() {
1892 throw new UnsupportedOperationException();
1895 public void setLevel(CacheEntryBase entry, int level) {
1896 throw new UnsupportedOperationException();
1899 public Collection<CacheEntry> getRootList() {
1900 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
1901 for (ExternalRead<?> request : requests) {
1902 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1909 public int getCurrentSize() {
1913 public int calculateCurrentSize() {
1914 // This tells the collector to attempt collecting everything.
1915 return Integer.MAX_VALUE;
1918 public boolean start(boolean flush) {
1922 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
1925 public void scanPending() {
1927 cache.scanPending();
1931 public ReadGraphImpl graphForVirtualRequest() {
1932 return ReadGraphImpl.createAsync(this);
1936 private HashMap<Resource, Class<?>> builtinValues;
1938 public Class<?> getBuiltinValue(Resource r) {
1939 if(builtinValues == null) initBuiltinValues();
1940 return builtinValues.get(r);
1943 Exception callerException = null;
1945 public interface AsyncBarrier {
1948 public void waitBarrier(Object request, ReadGraphImpl impl);
1949 public boolean isBlocking();
1952 // final public QueryProcessor processor;
1953 // final public QuerySupport support;
1955 // boolean disposed = false;
1957 private void initBuiltinValues() {
1959 Layer0 b = getSession().peekService(Layer0.class);
1960 if(b == null) return;
1962 builtinValues = new HashMap<Resource, Class<?>>();
1964 builtinValues.put(b.String, String.class);
1965 builtinValues.put(b.Double, Double.class);
1966 builtinValues.put(b.Float, Float.class);
1967 builtinValues.put(b.Long, Long.class);
1968 builtinValues.put(b.Integer, Integer.class);
1969 builtinValues.put(b.Byte, Byte.class);
1970 builtinValues.put(b.Boolean, Boolean.class);
1972 builtinValues.put(b.StringArray, String[].class);
1973 builtinValues.put(b.DoubleArray, double[].class);
1974 builtinValues.put(b.FloatArray, float[].class);
1975 builtinValues.put(b.LongArray, long[].class);
1976 builtinValues.put(b.IntegerArray, int[].class);
1977 builtinValues.put(b.ByteArray, byte[].class);
1978 builtinValues.put(b.BooleanArray, boolean[].class);
1982 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
1984 // if (null == provider2) {
1985 // this.processor = null;
1989 // this.processor = provider2;
1990 // support = provider2.getCore();
1991 // initBuiltinValues();
1995 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
1996 // return new ReadGraphSupportImpl(impl.processor);
2000 final public Session getSession() {
2004 final public ResourceSupport getResourceSupport() {
2005 return resourceSupport;
2009 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2013 for(Resource predicate : getPredicates(impl, subject))
2014 procedure.execute(impl, predicate);
2016 procedure.finished(impl);
2018 } catch (Throwable e) {
2019 procedure.exception(impl, e);
2025 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2027 throw new UnsupportedOperationException();
2029 // assert(subject != null);
2030 // assert(procedure != null);
2032 // final ListenerBase listener = getListenerBase(procedure);
2035 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2038 // public void execute(ReadGraphImpl graph, int i) {
2040 // procedure.execute(querySupport.getResource(i));
2041 // } catch (Throwable t2) {
2042 // Logger.defaultLogError(t2);
2047 // public void finished(ReadGraphImpl graph) {
2049 // procedure.finished();
2050 // } catch (Throwable t2) {
2051 // Logger.defaultLogError(t2);
2053 //// impl.state.barrier.dec();
2057 // public void exception(ReadGraphImpl graph, Throwable t) {
2059 // procedure.exception(t);
2060 // } catch (Throwable t2) {
2061 // Logger.defaultLogError(t2);
2063 //// impl.state.barrier.dec();
2067 // } catch (DatabaseException e) {
2068 // Logger.defaultLogError(e);
2074 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2075 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2079 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2080 final Resource predicate, final MultiProcedure<Statement> procedure) {
2082 assert(subject != null);
2083 assert(predicate != null);
2084 assert(procedure != null);
2086 final ListenerBase listener = getListenerBase(procedure);
2088 // impl.state.barrier.inc();
2091 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2094 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2096 procedure.execute(querySupport.getStatement(s, p, o));
2097 } catch (Throwable t2) {
2098 Logger.defaultLogError(t2);
2103 public void finished(ReadGraphImpl graph) {
2105 procedure.finished();
2106 } catch (Throwable t2) {
2107 Logger.defaultLogError(t2);
2109 // impl.state.barrier.dec();
2113 public void exception(ReadGraphImpl graph, Throwable t) {
2115 procedure.exception(t);
2116 } catch (Throwable t2) {
2117 Logger.defaultLogError(t2);
2119 // impl.state.barrier.dec();
2123 } catch (DatabaseException e) {
2124 Logger.defaultLogError(e);
2130 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2131 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2133 assert(subject != null);
2134 assert(predicate != null);
2135 assert(procedure != null);
2137 final ListenerBase listener = getListenerBase(procedure);
2139 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2141 boolean first = true;
2144 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2147 procedure.execute(graph, querySupport.getStatement(s, p, o));
2149 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2151 } catch (Throwable t2) {
2152 Logger.defaultLogError(t2);
2157 public void finished(ReadGraphImpl graph) {
2162 procedure.finished(graph);
2163 // impl.state.barrier.dec(this);
2165 procedure.finished(impl.newRestart(graph));
2167 } catch (Throwable t2) {
2168 Logger.defaultLogError(t2);
2174 public void exception(ReadGraphImpl graph, Throwable t) {
2179 procedure.exception(graph, t);
2180 // impl.state.barrier.dec(this);
2182 procedure.exception(impl.newRestart(graph), t);
2184 } catch (Throwable t2) {
2185 Logger.defaultLogError(t2);
2192 int sId = querySupport.getId(subject);
2193 int pId = querySupport.getId(predicate);
2195 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2196 // else impl.state.barrier.inc(null, null);
2199 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2200 } catch (DatabaseException e) {
2201 Logger.defaultLogError(e);
2207 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2208 final Resource predicate, final StatementProcedure procedure) {
2210 assert(subject != null);
2211 assert(predicate != null);
2212 assert(procedure != null);
2214 final ListenerBase listener = getListenerBase(procedure);
2216 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2218 boolean first = true;
2221 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2224 procedure.execute(graph, s, p, o);
2226 procedure.execute(impl.newRestart(graph), s, p, o);
2228 } catch (Throwable t2) {
2229 Logger.defaultLogError(t2);
2234 public void finished(ReadGraphImpl graph) {
2239 procedure.finished(graph);
2240 // impl.state.barrier.dec(this);
2242 procedure.finished(impl.newRestart(graph));
2244 } catch (Throwable t2) {
2245 Logger.defaultLogError(t2);
2251 public void exception(ReadGraphImpl graph, Throwable t) {
2256 procedure.exception(graph, t);
2257 // impl.state.barrier.dec(this);
2259 procedure.exception(impl.newRestart(graph), t);
2261 } catch (Throwable t2) {
2262 Logger.defaultLogError(t2);
2269 int sId = querySupport.getId(subject);
2270 int pId = querySupport.getId(predicate);
2272 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2273 // else impl.state.barrier.inc(null, null);
2276 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2277 } catch (DatabaseException e) {
2278 Logger.defaultLogError(e);
2284 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2286 assert(subject != null);
2287 assert(predicate != null);
2288 assert(procedure != null);
2290 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2292 private Set<Statement> current = null;
2293 private Set<Statement> run = new HashSet<Statement>();
2296 public void execute(AsyncReadGraph graph, Statement result) {
2298 boolean found = false;
2300 if(current != null) {
2302 found = current.remove(result);
2306 if(!found) procedure.add(graph, result);
2313 public void finished(AsyncReadGraph graph) {
2315 if(current != null) {
2316 for(Statement r : current) procedure.remove(graph, r);
2321 run = new HashSet<Statement>();
2326 public void exception(AsyncReadGraph graph, Throwable t) {
2327 procedure.exception(graph, t);
2331 public boolean isDisposed() {
2332 return procedure.isDisposed();
2340 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2341 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2343 assert(subject != null);
2344 assert(predicate != null);
2345 assert(procedure != null);
2347 final ListenerBase listener = getListenerBase(procedure);
2349 // impl.state.barrier.inc();
2352 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2355 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2357 procedure.execute(graph, querySupport.getStatement(s, p, o));
2358 } catch (Throwable t2) {
2359 Logger.defaultLogError(t2);
2364 public void finished(ReadGraphImpl graph) {
2366 procedure.finished(graph);
2367 } catch (Throwable t2) {
2368 Logger.defaultLogError(t2);
2370 // impl.state.barrier.dec();
2374 public void exception(ReadGraphImpl graph, Throwable t) {
2376 procedure.exception(graph, t);
2377 } catch (Throwable t2) {
2378 Logger.defaultLogError(t2);
2380 // impl.state.barrier.dec();
2384 } catch (DatabaseException e) {
2385 Logger.defaultLogError(e);
2390 private static ListenerBase getListenerBase(Object procedure) {
2391 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2396 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2398 assert(subject != null);
2399 assert(predicate != null);
2400 assert(procedure != null);
2402 final ListenerBase listener = getListenerBase(procedure);
2404 // impl.state.barrier.inc();
2407 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2410 public void execute(ReadGraphImpl graph, int i) {
2412 procedure.execute(querySupport.getResource(i));
2413 } catch (Throwable t2) {
2414 Logger.defaultLogError(t2);
2419 public void finished(ReadGraphImpl graph) {
2421 procedure.finished();
2422 } catch (Throwable t2) {
2423 Logger.defaultLogError(t2);
2425 // impl.state.barrier.dec();
2429 public void exception(ReadGraphImpl graph, Throwable t) {
2430 System.out.println("forEachObject exception " + t);
2432 procedure.exception(t);
2433 } catch (Throwable t2) {
2434 Logger.defaultLogError(t2);
2436 // impl.state.barrier.dec();
2440 } catch (DatabaseException e) {
2441 Logger.defaultLogError(e);
2447 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
2449 assert(subject != null);
2450 assert(procedure != null);
2452 final ListenerBase listener = getListenerBase(procedure);
2454 int sId = querySupport.getId(subject);
2457 QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
2460 public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
2461 procedure.execute(graph, result);
2465 public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
2466 procedure.exception(graph, throwable);
2470 } catch (DatabaseException e) {
2471 Logger.defaultLogError(e);
2476 final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
2478 // assert(subject != null);
2479 // assert(procedure != null);
2481 // final ListenerBase listener = getListenerBase(procedure);
2483 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2485 return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
2490 // final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2492 // assert(subject != null);
2493 // assert(procedure != null);
2495 // final ListenerBase listener = getListenerBase(procedure);
2497 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2501 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2504 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2506 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2508 private Resource single = null;
2511 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2512 if(single == null) {
2515 single = INVALID_RESOURCE;
2520 public synchronized void finished(AsyncReadGraph graph) {
2521 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2522 else procedure.execute(graph, single);
2526 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2527 procedure.exception(graph, throwable);
2534 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2536 final int sId = querySupport.getId(subject);
2537 final int pId = querySupport.getId(predicate);
2540 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2541 } catch (DatabaseException e) {
2542 Logger.defaultLogError(e);
2547 static class Runner2Procedure implements IntProcedure {
2549 public int single = 0;
2550 public Throwable t = null;
2552 public void clear() {
2558 public void execute(ReadGraphImpl graph, int i) {
2559 if(single == 0) single = i;
2564 public void finished(ReadGraphImpl graph) {
2565 if(single == -1) single = 0;
2569 public void exception(ReadGraphImpl graph, Throwable throwable) {
2574 public int get() throws DatabaseException {
2576 if(t instanceof DatabaseException) throw (DatabaseException)t;
2577 else throw new DatabaseException(t);
2584 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2586 final int sId = querySupport.getId(subject);
2587 final int pId = querySupport.getId(predicate);
2589 Runner2Procedure proc = new Runner2Procedure();
2590 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2595 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2597 assert(subject != null);
2598 assert(predicate != null);
2600 final ListenerBase listener = getListenerBase(procedure);
2602 if(impl.parent != null || listener != null) {
2604 IntProcedure ip = new IntProcedure() {
2606 AtomicBoolean first = new AtomicBoolean(true);
2609 public void execute(ReadGraphImpl graph, int i) {
2612 procedure.execute(impl, querySupport.getResource(i));
2614 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2616 } catch (Throwable t2) {
2617 Logger.defaultLogError(t2);
2623 public void finished(ReadGraphImpl graph) {
2625 if(first.compareAndSet(true, false)) {
2626 procedure.finished(impl);
2627 // impl.state.barrier.dec(this);
2629 procedure.finished(impl.newRestart(graph));
2631 } catch (Throwable t2) {
2632 Logger.defaultLogError(t2);
2637 public void exception(ReadGraphImpl graph, Throwable t) {
2639 procedure.exception(graph, t);
2640 } catch (Throwable t2) {
2641 Logger.defaultLogError(t2);
2643 // impl.state.barrier.dec(this);
2647 public String toString() {
2648 return "forEachObject with " + procedure;
2653 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2654 // else impl.state.barrier.inc(null, null);
2656 forEachObject(impl, subject, predicate, listener, ip);
2660 IntProcedure ip = new IntProcedure() {
2663 public void execute(ReadGraphImpl graph, int i) {
2664 procedure.execute(graph, querySupport.getResource(i));
2668 public void finished(ReadGraphImpl graph) {
2669 procedure.finished(graph);
2673 public void exception(ReadGraphImpl graph, Throwable t) {
2674 procedure.exception(graph, t);
2678 public String toString() {
2679 return "forEachObject with " + procedure;
2684 forEachObject(impl, subject, predicate, listener, ip);
2691 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2693 assert(subject != null);
2694 assert(predicate != null);
2695 assert(procedure != null);
2697 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2699 private Set<Resource> current = null;
2700 private Set<Resource> run = new HashSet<Resource>();
2703 public void execute(AsyncReadGraph graph, Resource result) {
2705 boolean found = false;
2707 if(current != null) {
2709 found = current.remove(result);
2713 if(!found) procedure.add(graph, result);
2720 public void finished(AsyncReadGraph graph) {
2722 if(current != null) {
2723 for(Resource r : current) procedure.remove(graph, r);
2728 run = new HashSet<Resource>();
2733 public boolean isDisposed() {
2734 return procedure.isDisposed();
2738 public void exception(AsyncReadGraph graph, Throwable t) {
2739 procedure.exception(graph, t);
2743 public String toString() {
2744 return "forObjectSet " + procedure;
2752 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
2754 assert(subject != null);
2755 assert(procedure != null);
2757 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
2759 private Set<Resource> current = null;
2760 private Set<Resource> run = new HashSet<Resource>();
2763 public void execute(AsyncReadGraph graph, Resource result) {
2765 boolean found = false;
2767 if(current != null) {
2769 found = current.remove(result);
2773 if(!found) procedure.add(graph, result);
2780 public void finished(AsyncReadGraph graph) {
2782 if(current != null) {
2783 for(Resource r : current) procedure.remove(graph, r);
2788 run = new HashSet<Resource>();
2793 public boolean isDisposed() {
2794 return procedure.isDisposed();
2798 public void exception(AsyncReadGraph graph, Throwable t) {
2799 procedure.exception(graph, t);
2803 public String toString() {
2804 return "forPredicateSet " + procedure;
2812 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
2814 assert(subject != null);
2815 assert(procedure != null);
2817 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
2819 private Set<Resource> current = null;
2820 private Set<Resource> run = new HashSet<Resource>();
2823 public void execute(AsyncReadGraph graph, Resource result) {
2825 boolean found = false;
2827 if(current != null) {
2829 found = current.remove(result);
2833 if(!found) procedure.add(graph, result);
2840 public void finished(AsyncReadGraph graph) {
2842 if(current != null) {
2843 for(Resource r : current) procedure.remove(graph, r);
2848 run = new HashSet<Resource>();
2853 public boolean isDisposed() {
2854 return procedure.isDisposed();
2858 public void exception(AsyncReadGraph graph, Throwable t) {
2859 procedure.exception(graph, t);
2863 public String toString() {
2864 return "forPrincipalTypeSet " + procedure;
2872 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2874 assert(subject != null);
2875 assert(predicate != null);
2876 assert(procedure != null);
2878 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2880 private Set<Resource> current = null;
2881 private Set<Resource> run = new HashSet<Resource>();
2884 public void execute(AsyncReadGraph graph, Resource result) {
2886 boolean found = false;
2888 if(current != null) {
2890 found = current.remove(result);
2894 if(!found) procedure.add(graph, result);
2901 public void finished(AsyncReadGraph graph) {
2903 if(current != null) {
2904 for(Resource r : current) procedure.remove(graph, r);
2909 run = new HashSet<Resource>();
2914 public boolean isDisposed() {
2915 return procedure.isDisposed();
2919 public void exception(AsyncReadGraph graph, Throwable t) {
2920 procedure.exception(graph, t);
2924 public String toString() {
2925 return "forObjectSet " + procedure;
2933 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2935 assert(subject != null);
2936 assert(predicate != null);
2937 assert(procedure != null);
2939 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2941 private Set<Statement> current = null;
2942 private Set<Statement> run = new HashSet<Statement>();
2945 public void execute(AsyncReadGraph graph, Statement result) {
2947 boolean found = false;
2949 if(current != null) {
2951 found = current.remove(result);
2955 if(!found) procedure.add(graph, result);
2962 public void finished(AsyncReadGraph graph) {
2964 if(current != null) {
2965 for(Statement s : current) procedure.remove(graph, s);
2970 run = new HashSet<Statement>();
2975 public boolean isDisposed() {
2976 return procedure.isDisposed();
2980 public void exception(AsyncReadGraph graph, Throwable t) {
2981 procedure.exception(graph, t);
2985 public String toString() {
2986 return "forStatementSet " + procedure;
2994 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
2995 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2997 assert(subject != null);
2998 assert(predicate != null);
2999 assert(procedure != null);
3001 final ListenerBase listener = getListenerBase(procedure);
3004 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3007 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3009 procedure.execute(graph, querySupport.getResource(o));
3010 } catch (Throwable t2) {
3011 Logger.defaultLogError(t2);
3016 public void finished(ReadGraphImpl graph) {
3018 procedure.finished(graph);
3019 } catch (Throwable t2) {
3020 Logger.defaultLogError(t2);
3022 // impl.state.barrier.dec();
3026 public void exception(ReadGraphImpl graph, Throwable t) {
3028 procedure.exception(graph, t);
3029 } catch (Throwable t2) {
3030 Logger.defaultLogError(t2);
3032 // impl.state.barrier.dec();
3036 } catch (DatabaseException e) {
3037 Logger.defaultLogError(e);
3043 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3045 assert(subject != null);
3046 assert(procedure != null);
3048 final ListenerBase listener = getListenerBase(procedure);
3050 IntProcedure ip = new IntProcedure() {
3053 public void execute(ReadGraphImpl graph, int i) {
3055 procedure.execute(graph, querySupport.getResource(i));
3056 } catch (Throwable t2) {
3057 Logger.defaultLogError(t2);
3062 public void finished(ReadGraphImpl graph) {
3064 procedure.finished(graph);
3065 } catch (Throwable t2) {
3066 Logger.defaultLogError(t2);
3068 // impl.state.barrier.dec(this);
3072 public void exception(ReadGraphImpl graph, Throwable t) {
3074 procedure.exception(graph, t);
3075 } catch (Throwable t2) {
3076 Logger.defaultLogError(t2);
3078 // impl.state.barrier.dec(this);
3083 int sId = querySupport.getId(subject);
3085 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3086 // else impl.state.barrier.inc(null, null);
3089 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3090 } catch (DatabaseException e) {
3091 Logger.defaultLogError(e);
3097 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3099 assert(subject != null);
3100 assert(procedure != null);
3102 final ListenerBase listener = getListenerBase(procedure);
3104 // impl.state.barrier.inc();
3107 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3110 public void execute(ReadGraphImpl graph, int i) {
3112 procedure.execute(querySupport.getResource(i));
3113 } catch (Throwable t2) {
3114 Logger.defaultLogError(t2);
3119 public void finished(ReadGraphImpl graph) {
3121 procedure.finished();
3122 } catch (Throwable t2) {
3123 Logger.defaultLogError(t2);
3125 // impl.state.barrier.dec();
3129 public void exception(ReadGraphImpl graph, Throwable t) {
3131 procedure.exception(t);
3132 } catch (Throwable t2) {
3133 Logger.defaultLogError(t2);
3135 // impl.state.barrier.dec();
3139 } catch (DatabaseException e) {
3140 Logger.defaultLogError(e);
3144 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3146 assert(subject != null);
3147 assert(procedure != null);
3149 final ListenerBase listener = getListenerBase(procedure);
3150 assert(listener == null);
3152 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3155 public void execute(final ReadGraphImpl graph, IntSet set) {
3156 procedure.execute(graph, set);
3160 public void exception(ReadGraphImpl graph, Throwable t) {
3161 procedure.exception(graph, t);
3166 int sId = querySupport.getId(subject);
3169 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3170 } catch (DatabaseException e) {
3171 Logger.defaultLogError(e);
3177 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3179 assert(subject != null);
3181 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3186 final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3188 assert(subject != null);
3190 return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null);
3195 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3197 assert(subject != null);
3198 assert(procedure != null);
3200 final ListenerBase listener = getListenerBase(procedure);
3203 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3205 AtomicBoolean first = new AtomicBoolean(true);
3208 public void execute(final ReadGraphImpl graph, IntSet set) {
3209 // final HashSet<Resource> result = new HashSet<Resource>();
3210 // set.forEach(new TIntProcedure() {
3213 // public boolean execute(int type) {
3214 // result.add(querySupport.getResource(type));
3220 if(first.compareAndSet(true, false)) {
3221 procedure.execute(graph, set);
3222 // impl.state.barrier.dec();
3224 procedure.execute(impl.newRestart(graph), set);
3226 } catch (Throwable t2) {
3227 Logger.defaultLogError(t2);
3232 public void exception(ReadGraphImpl graph, Throwable t) {
3234 if(first.compareAndSet(true, false)) {
3235 procedure.exception(graph, t);
3236 // impl.state.barrier.dec();
3238 procedure.exception(impl.newRestart(graph), t);
3240 } catch (Throwable t2) {
3241 Logger.defaultLogError(t2);
3246 } catch (DatabaseException e) {
3247 Logger.defaultLogError(e);
3253 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3255 assert(subject != null);
3256 assert(procedure != null);
3258 final ListenerBase listener = getListenerBase(procedure);
3260 IntProcedure ip = new IntProcedureAdapter() {
3263 public void execute(final ReadGraphImpl graph, int superRelation) {
3265 procedure.execute(graph, querySupport.getResource(superRelation));
3266 } catch (Throwable t2) {
3267 Logger.defaultLogError(t2);
3272 public void finished(final ReadGraphImpl graph) {
3274 procedure.finished(graph);
3275 } catch (Throwable t2) {
3276 Logger.defaultLogError(t2);
3278 // impl.state.barrier.dec(this);
3283 public void exception(ReadGraphImpl graph, Throwable t) {
3285 procedure.exception(graph, t);
3286 } catch (Throwable t2) {
3287 Logger.defaultLogError(t2);
3289 // impl.state.barrier.dec(this);
3294 int sId = querySupport.getId(subject);
3296 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3297 // else impl.state.barrier.inc(null, null);
3300 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3301 } catch (DatabaseException e) {
3302 Logger.defaultLogError(e);
3305 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3310 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3312 assert(subject != null);
3313 assert(procedure != null);
3315 final ListenerBase listener = getListenerBase(procedure);
3317 // impl.state.barrier.inc();
3319 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3324 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3326 assert(subject != null);
3327 assert(procedure != null);
3329 final ListenerBase listener = getListenerBase(procedure);
3331 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3334 public void execute(final ReadGraphImpl graph, IntSet set) {
3335 // final HashSet<Resource> result = new HashSet<Resource>();
3336 // set.forEach(new TIntProcedure() {
3339 // public boolean execute(int type) {
3340 // result.add(querySupport.getResource(type));
3346 procedure.execute(graph, set);
3347 } catch (Throwable t2) {
3348 Logger.defaultLogError(t2);
3350 // impl.state.barrier.dec(this);
3354 public void exception(ReadGraphImpl graph, Throwable t) {
3356 procedure.exception(graph, t);
3357 } catch (Throwable t2) {
3358 Logger.defaultLogError(t2);
3360 // impl.state.barrier.dec(this);
3365 int sId = querySupport.getId(subject);
3367 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3368 // else impl.state.barrier.inc(null, null);
3371 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3372 } catch (DatabaseException e) {
3373 Logger.defaultLogError(e);
3378 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3379 return getValue(impl, querySupport.getId(subject));
3382 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3383 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3387 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3389 assert(subject != null);
3390 assert(procedure != null);
3392 int sId = querySupport.getId(subject);
3394 // if(procedure != null) {
3396 final ListenerBase listener = getListenerBase(procedure);
3398 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3400 AtomicBoolean first = new AtomicBoolean(true);
3403 public void execute(ReadGraphImpl graph, byte[] result) {
3405 if(first.compareAndSet(true, false)) {
3406 procedure.execute(graph, result);
3407 // impl.state.barrier.dec(this);
3409 procedure.execute(impl.newRestart(graph), result);
3411 } catch (Throwable t2) {
3412 Logger.defaultLogError(t2);
3417 public void exception(ReadGraphImpl graph, Throwable t) {
3419 if(first.compareAndSet(true, false)) {
3420 procedure.exception(graph, t);
3421 // impl.state.barrier.dec(this);
3423 procedure.exception(impl.newRestart(graph), t);
3425 } catch (Throwable t2) {
3426 Logger.defaultLogError(t2);
3432 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3433 // else impl.state.barrier.inc(null, null);
3436 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3437 } catch (DatabaseException e) {
3438 throw new IllegalStateException("Internal error");
3443 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3447 // throw new IllegalStateException("Internal error");
3452 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3454 assert(subject != null);
3455 assert(procedure != null);
3457 final ListenerBase listener = getListenerBase(procedure);
3459 if(impl.parent != null || listener != null) {
3461 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3463 AtomicBoolean first = new AtomicBoolean(true);
3466 public void execute(ReadGraphImpl graph, byte[] result) {
3468 if(first.compareAndSet(true, false)) {
3469 procedure.execute(graph, result);
3470 // impl.state.barrier.dec(this);
3472 procedure.execute(impl.newRestart(graph), result);
3474 } catch (Throwable t2) {
3475 Logger.defaultLogError(t2);
3480 public void exception(ReadGraphImpl graph, Throwable t) {
3482 if(first.compareAndSet(true, false)) {
3483 procedure.exception(graph, t);
3484 // impl.state.barrier.dec(this);
3486 procedure.exception(impl.newRestart(graph), t);
3488 } catch (Throwable t2) {
3489 Logger.defaultLogError(t2);
3495 int sId = querySupport.getId(subject);
3497 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3498 // else impl.state.barrier.inc(null, null);
3501 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3502 } catch (DatabaseException e) {
3503 Logger.defaultLogError(e);
3508 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3511 public void execute(ReadGraphImpl graph, byte[] result) {
3513 procedure.execute(graph, result);
3518 public void exception(ReadGraphImpl graph, Throwable t) {
3520 procedure.exception(graph, t);
3526 int sId = querySupport.getId(subject);
3529 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3530 } catch (DatabaseException e) {
3531 Logger.defaultLogError(e);
3539 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3541 assert(relation != null);
3542 assert(procedure != null);
3544 final ListenerBase listener = getListenerBase(procedure);
3546 IntProcedure ip = new IntProcedure() {
3548 private int result = 0;
3550 final AtomicBoolean found = new AtomicBoolean(false);
3551 final AtomicBoolean done = new AtomicBoolean(false);
3554 public void finished(ReadGraphImpl graph) {
3556 // Shall fire exactly once!
3557 if(done.compareAndSet(false, true)) {
3560 procedure.exception(graph, new NoInverseException(""));
3561 // impl.state.barrier.dec(this);
3563 procedure.execute(graph, querySupport.getResource(result));
3564 // impl.state.barrier.dec(this);
3566 } catch (Throwable t) {
3567 Logger.defaultLogError(t);
3574 public void execute(ReadGraphImpl graph, int i) {
3576 if(found.compareAndSet(false, true)) {
3579 // Shall fire exactly once!
3580 if(done.compareAndSet(false, true)) {
3582 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3583 // impl.state.barrier.dec(this);
3584 } catch (Throwable t) {
3585 Logger.defaultLogError(t);
3593 public void exception(ReadGraphImpl graph, Throwable t) {
3594 // Shall fire exactly once!
3595 if(done.compareAndSet(false, true)) {
3597 procedure.exception(graph, t);
3598 // impl.state.barrier.dec(this);
3599 } catch (Throwable t2) {
3600 Logger.defaultLogError(t2);
3607 int sId = querySupport.getId(relation);
3609 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3610 // else impl.state.barrier.inc(null, null);
3613 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3614 } catch (DatabaseException e) {
3615 Logger.defaultLogError(e);
3621 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3624 assert(procedure != null);
3626 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3629 public void execute(ReadGraphImpl graph, Integer result) {
3631 procedure.execute(graph, querySupport.getResource(result));
3632 } catch (Throwable t2) {
3633 Logger.defaultLogError(t2);
3635 // impl.state.barrier.dec(this);
3639 public void exception(ReadGraphImpl graph, Throwable t) {
3642 procedure.exception(graph, t);
3643 } catch (Throwable t2) {
3644 Logger.defaultLogError(t2);
3646 // impl.state.barrier.dec(this);
3651 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3652 // else impl.state.barrier.inc(null, null);
3654 forResource(impl, id, impl.parent, ip);
3659 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3662 assert(procedure != null);
3664 // impl.state.barrier.inc();
3667 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
3670 public void execute(ReadGraphImpl graph, Integer result) {
3672 procedure.execute(graph, querySupport.getResource(result));
3673 } catch (Throwable t2) {
3674 Logger.defaultLogError(t2);
3676 // impl.state.barrier.dec();
3680 public void exception(ReadGraphImpl graph, Throwable t) {
3682 procedure.exception(graph, t);
3683 } catch (Throwable t2) {
3684 Logger.defaultLogError(t2);
3686 // impl.state.barrier.dec();
3690 } catch (DatabaseException e) {
3691 Logger.defaultLogError(e);
3697 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
3699 assert(subject != null);
3700 assert(procedure != null);
3702 final ListenerBase listener = getListenerBase(procedure);
3705 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
3706 procedure.execute(impl, !result.isEmpty());
3707 } catch (DatabaseException e) {
3708 procedure.exception(impl, e);
3714 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
3716 assert(subject != null);
3717 assert(predicate != null);
3718 assert(procedure != null);
3720 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
3722 boolean found = false;
3725 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
3730 synchronized public void finished(AsyncReadGraph graph) {
3732 procedure.execute(graph, found);
3733 } catch (Throwable t2) {
3734 Logger.defaultLogError(t2);
3736 // impl.state.barrier.dec(this);
3740 public void exception(AsyncReadGraph graph, Throwable t) {
3742 procedure.exception(graph, t);
3743 } catch (Throwable t2) {
3744 Logger.defaultLogError(t2);
3746 // impl.state.barrier.dec(this);
3751 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
3752 // else impl.state.barrier.inc(null, null);
3754 forEachObject(impl, subject, predicate, ip);
3759 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
3761 assert(subject != null);
3762 assert(predicate != null);
3763 assert(procedure != null);
3765 // impl.state.barrier.inc();
3767 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
3769 boolean found = false;
3772 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
3773 if(resource.equals(object)) found = true;
3777 synchronized public void finished(AsyncReadGraph graph) {
3779 procedure.execute(graph, found);
3780 } catch (Throwable t2) {
3781 Logger.defaultLogError(t2);
3783 // impl.state.barrier.dec();
3787 public void exception(AsyncReadGraph graph, Throwable t) {
3789 procedure.exception(graph, t);
3790 } catch (Throwable t2) {
3791 Logger.defaultLogError(t2);
3793 // impl.state.barrier.dec();
3801 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
3803 assert(subject != null);
3804 assert(procedure != null);
3806 final ListenerBase listener = getListenerBase(procedure);
3808 // impl.state.barrier.inc();
3811 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
3814 public void execute(ReadGraphImpl graph, byte[] object) {
3815 boolean result = object != null;
3817 procedure.execute(graph, result);
3818 } catch (Throwable t2) {
3819 Logger.defaultLogError(t2);
3821 // impl.state.barrier.dec();
3825 public void exception(ReadGraphImpl graph, Throwable t) {
3827 procedure.exception(graph, t);
3828 } catch (Throwable t2) {
3829 Logger.defaultLogError(t2);
3831 // impl.state.barrier.dec();
3835 } catch (DatabaseException e) {
3836 Logger.defaultLogError(e);
3842 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3844 assert(subject != null);
3845 assert(procedure != null);
3847 final ListenerBase listener = getListenerBase(procedure);
3851 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3854 public void exception(ReadGraphImpl graph, Throwable t) {
3856 procedure.exception(graph, t);
3857 } catch (Throwable t2) {
3858 Logger.defaultLogError(t2);
3860 // impl.state.barrier.dec();
3864 public void execute(ReadGraphImpl graph, int i) {
3866 procedure.execute(graph, querySupport.getResource(i));
3867 } catch (Throwable t2) {
3868 Logger.defaultLogError(t2);
3873 public void finished(ReadGraphImpl graph) {
3875 procedure.finished(graph);
3876 } catch (Throwable t2) {
3877 Logger.defaultLogError(t2);
3879 // impl.state.barrier.dec();
3883 } catch (DatabaseException e) {
3884 Logger.defaultLogError(e);
3890 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
3892 // assert(request != null);
3893 // assert(procedure != null);
3895 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
3900 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
3902 // assert(graph != null);
3903 // assert(request != null);
3905 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
3906 // if(entry != null && entry.isReady()) {
3907 // return (T)entry.get(graph, this, null);
3909 // return request.perform(graph);
3914 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
3916 // assert(graph != null);
3917 // assert(request != null);
3919 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
3920 // if(entry != null && entry.isReady()) {
3921 // if(entry.isExcepted()) {
3922 // Throwable t = (Throwable)entry.getResult();
3923 // if(t instanceof DatabaseException) throw (DatabaseException)t;
3924 // else throw new DatabaseException(t);
3926 // return (T)entry.getResult();
3930 // final DataContainer<T> result = new DataContainer<T>();
3931 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3933 // request.register(graph, new Listener<T>() {
3936 // public void exception(Throwable t) {
3937 // exception.set(t);
3941 // public void execute(T t) {
3946 // public boolean isDisposed() {
3952 // Throwable t = exception.get();
3954 // if(t instanceof DatabaseException) throw (DatabaseException)t;
3955 // else throw new DatabaseException(t);
3958 // return result.get();
3965 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
3967 // assert(graph != null);
3968 // assert(request != null);
3970 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
3971 // if(entry != null && entry.isReady()) {
3972 // if(entry.isExcepted()) {
3973 // procedure.exception(graph, (Throwable)entry.getResult());
3975 // procedure.execute(graph, (T)entry.getResult());
3978 // request.perform(graph, procedure);
3984 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final SyncMultiProcedure<T> procedure, ListenerBase listener) {
3986 assert(request != null);
3987 assert(procedure != null);
3991 queryMultiRead(impl, request, parent, listener, procedure);
3993 } catch (DatabaseException e) {
3995 throw new IllegalStateException(e);
4002 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4004 assert(request != null);
4005 assert(procedure != null);
4007 // impl.state.barrier.inc();
4009 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4011 public void execute(AsyncReadGraph graph, T result) {
4014 procedure.execute(graph, result);
4015 } catch (Throwable t2) {
4016 Logger.defaultLogError(t2);
4021 public void finished(AsyncReadGraph graph) {
4024 procedure.finished(graph);
4025 } catch (Throwable t2) {
4026 Logger.defaultLogError(t2);
4029 // impl.state.barrier.dec();
4034 public String toString() {
4035 return procedure.toString();
4039 public void exception(AsyncReadGraph graph, Throwable t) {
4042 procedure.exception(graph, t);
4043 } catch (Throwable t2) {
4044 Logger.defaultLogError(t2);
4047 // impl.state.barrier.dec();
4056 // final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
4058 // assert(request != null);
4059 // assert(procedure != null);
4063 // queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4066 // public String toString() {
4067 // return procedure.toString();
4071 // public void execute(AsyncReadGraph graph, T result) {
4073 // procedure.execute(result);
4074 // } catch (Throwable t2) {
4075 // Logger.defaultLogError(t2);
4080 // public void exception(AsyncReadGraph graph, Throwable throwable) {
4082 // procedure.exception(throwable);
4083 // } catch (Throwable t2) {
4084 // Logger.defaultLogError(t2);
4090 // } catch (DatabaseException e) {
4092 // throw new IllegalStateException(e);
4099 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4101 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4106 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4108 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4113 public VirtualGraph getValueProvider(Resource subject) {
4115 return querySupport.getValueProvider(querySupport.getId(subject));
4119 public boolean resumeTasks(ReadGraphImpl graph) {
4121 return querySupport.resume(graph);
4125 public boolean isImmutable(int resourceId) {
4126 return querySupport.isImmutable(resourceId);
4129 public boolean isImmutable(Resource resource) {
4130 ResourceImpl impl = (ResourceImpl)resource;
4131 return isImmutable(impl.id);
4136 public Layer0 getL0(ReadGraph graph) {
4138 L0 = Layer0.getInstance(graph);
4143 public Layer0 getL0() {
4147 public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
4148 protected Integer initialValue() {