1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
35 import org.simantics.databoard.Bindings;
36 import org.simantics.db.AsyncReadGraph;
37 import org.simantics.db.DevelopmentKeys;
38 import org.simantics.db.DirectStatements;
39 import org.simantics.db.ReadGraph;
40 import org.simantics.db.RelationInfo;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
46 import org.simantics.db.common.utils.Logger;
47 import org.simantics.db.exception.DatabaseException;
48 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
49 import org.simantics.db.exception.NoInverseException;
50 import org.simantics.db.exception.ResourceNotFoundException;
51 import org.simantics.db.impl.ResourceImpl;
52 import org.simantics.db.impl.graph.BarrierTracing;
53 import org.simantics.db.impl.graph.ReadGraphImpl;
54 import org.simantics.db.impl.graph.ReadGraphSupport;
55 import org.simantics.db.impl.procedure.IntProcedureAdapter;
56 import org.simantics.db.impl.procedure.InternalProcedure;
57 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
58 import org.simantics.db.impl.support.ResourceSupport;
59 import org.simantics.db.procedure.AsyncMultiListener;
60 import org.simantics.db.procedure.AsyncMultiProcedure;
61 import org.simantics.db.procedure.AsyncProcedure;
62 import org.simantics.db.procedure.AsyncSetListener;
63 import org.simantics.db.procedure.ListenerBase;
64 import org.simantics.db.procedure.MultiProcedure;
65 import org.simantics.db.procedure.StatementProcedure;
66 import org.simantics.db.procedure.SyncMultiProcedure;
67 import org.simantics.db.request.AsyncMultiRead;
68 import org.simantics.db.request.ExternalRead;
69 import org.simantics.db.request.MultiRead;
70 import org.simantics.db.request.RequestFlags;
71 import org.simantics.layer0.Layer0;
72 import org.simantics.utils.DataContainer;
73 import org.simantics.utils.Development;
74 import org.simantics.utils.datastructures.Pair;
75 import org.simantics.utils.datastructures.collections.CollectionUtils;
76 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
78 import gnu.trove.procedure.TIntProcedure;
79 import gnu.trove.procedure.TLongProcedure;
80 import gnu.trove.procedure.TObjectProcedure;
81 import gnu.trove.set.hash.THashSet;
82 import gnu.trove.set.hash.TIntHashSet;
84 @SuppressWarnings({"rawtypes", "unchecked"})
85 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
87 public static int indent = 0;
92 public int boundQueries = 0;
95 final private int functionalRelation;
97 final private int superrelationOf;
99 final private int instanceOf;
101 final private int inverseOf;
103 final private int asserts;
105 final private int hasPredicate;
107 final private int hasPredicateInverse;
109 final private int hasObject;
111 final private int inherits;
113 final private int subrelationOf;
115 final private int rootLibrary;
118 * A cache for the root library resource. Initialized in
119 * {@link #getRootLibraryResource()}.
121 private volatile ResourceImpl rootLibraryResource;
123 final private int library;
125 final private int consistsOf;
127 final private int hasName;
129 AtomicInteger sleepers = new AtomicInteger(0);
131 boolean updating = false;
134 final public QueryCache cache;
135 final public QuerySupport querySupport;
136 final public Session session;
137 final public ResourceSupport resourceSupport;
139 final public QueryListening listening = new QueryListening(this);
141 QueryThread[] executors;
143 // public ArrayList<SessionTask>[] queues;
145 public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
149 INIT, RUN, SLEEP, DISPOSED
153 public ThreadState[] threadStates;
154 // public ReentrantLock[] threadLocks;
155 // public Condition[] threadConditions;
157 //public ArrayList<SessionTask>[] ownTasks;
159 //public ArrayList<SessionTask>[] ownSyncTasks;
161 //ArrayList<SessionTask>[] delayQueues;
163 final Object querySupportLock;
165 public Long modificationCounter = 0L;
167 public void close() {
170 public SessionTask getOwnTask(ReadGraphImpl impl) {
171 Set<ReadGraphImpl> ancestors = impl.ancestorSet();
172 synchronized(querySupportLock) {
174 while(index < freeScheduling.size()) {
175 SessionTask task = freeScheduling.get(index);
176 if(task.hasCommonParent(ancestors)) {
177 return freeScheduling.remove(index);
185 public SessionTask getSubTask(ReadGraphImpl impl) {
186 Set<ReadGraphImpl> onlyThis = Collections.singleton(impl);
187 synchronized(querySupportLock) {
189 while(index < freeScheduling.size()) {
190 SessionTask task = freeScheduling.get(index);
191 if(task.hasCommonParent(onlyThis)) {
192 return freeScheduling.remove(index);
200 public boolean performPending(ReadGraphImpl graph) {
201 SessionTask task = getOwnTask(graph);
203 task.run(QueryProcessor.thread.get());
210 // final public void scheduleOwn(int caller, SessionTask request) {
211 // ownTasks[caller].add(request);
214 final public void schedule(SessionTask request) {
216 //int performer = request.thread;
218 // if(DebugPolicy.SCHEDULE)
219 // System.out.println("schedule " + request + " " + " -> " + performer);
221 //assert(performer >= 0);
223 assert(request != null);
225 // if(caller == performer) {
226 // request.run(caller);
229 // if(performer == THREADS) {
231 synchronized(querySupportLock) {
233 if(BarrierTracing.BOOKKEEPING) {
234 Exception current = new Exception();
235 Exception previous = BarrierTracing.tasks.put(request, current);
236 if(previous != null) {
237 previous.printStackTrace();
238 current.printStackTrace();
242 freeScheduling.add(request);
244 querySupportLock.notifyAll();
252 // ReentrantLock queueLock = threadLocks[performer];
254 // queues[performer].add(request);
255 // // This thread could have been sleeping
256 // if(queues[performer].size() == 1) {
257 // //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
258 // threadConditions[performer].signalAll();
260 // queueLock.unlock();
267 final public int THREAD_MASK;
269 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
271 public static abstract class SessionTask {
273 public final ReadGraphImpl graph;
274 private Set<ReadGraphImpl> ancestors;
275 private int counter = 0;
276 private Exception trace;
278 public SessionTask(ReadGraphImpl graph) {
280 if(graph != null) graph.asyncBarrier.inc();
283 public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
284 if(graph == null) return false;
285 if(ancestors == null) ancestors = graph.ancestorSet();
286 return !Collections.disjoint(ancestors, otherAncestors);
289 public abstract void run0(int thread);
291 public final void run(int thread) {
293 if(BarrierTracing.BOOKKEEPING) {
294 trace.printStackTrace();
295 new Exception().printStackTrace();
297 throw new IllegalStateException("Multiple invocations of SessionTask!");
299 if(BarrierTracing.BOOKKEEPING) {
300 trace = new Exception();
303 if(graph != null) graph.asyncBarrier.dec();
307 public String toString() {
308 return "SessionTask[" + graph.parent + "]";
313 public static abstract class SessionRead extends SessionTask {
315 final public Semaphore notify;
316 final public DataContainer<Throwable> throwable;
318 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
320 this.throwable = throwable;
321 this.notify = notify;
326 long waitingTime = 0;
329 static int koss2 = 0;
331 public boolean resume(ReadGraphImpl graph) {
332 return executors[0].runSynchronized();
335 //private WeakReference<GarbageTracker> garbageTracker;
337 private class GarbageTracker {
340 protected void finalize() throws Throwable {
342 // System.err.println("GarbageTracker");
344 // garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
352 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
353 throws DatabaseException {
355 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
358 THREAD_MASK = threads - 1;
361 cache = new QueryCache(core, threads);
362 session = querySupport.getSession();
363 resourceSupport = querySupport.getSupport();
364 querySupportLock = core.getLock();
366 executors = new QueryThread[THREADS];
367 // queues = new ArrayList[THREADS];
368 // threadLocks = new ReentrantLock[THREADS];
369 // threadConditions = new Condition[THREADS];
370 threadStates = new ThreadState[THREADS];
371 // ownTasks = new ArrayList[THREADS];
372 // ownSyncTasks = new ArrayList[THREADS];
373 // delayQueues = new ArrayList[THREADS * THREADS];
375 // freeSchedule = new AtomicInteger(0);
377 // for (int i = 0; i < THREADS * THREADS; i++) {
378 // delayQueues[i] = new ArrayList<SessionTask>();
381 for (int i = 0; i < THREADS; i++) {
383 // tasks[i] = new ArrayList<Runnable>();
384 // ownTasks[i] = new ArrayList<SessionTask>();
385 // ownSyncTasks[i] = new ArrayList<SessionTask>();
386 // queues[i] = new ArrayList<SessionTask>();
387 // threadLocks[i] = new ReentrantLock();
388 // threadConditions[i] = threadLocks[i].newCondition();
389 // limits[i] = false;
390 threadStates[i] = ThreadState.INIT;
394 for (int i = 0; i < THREADS; i++) {
398 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
400 threadSet.add(executors[i]);
405 for (int i = 0; i < THREADS; i++) {
406 executors[i].start();
409 // Make sure that query threads are up and running
410 while(sleepers.get() != THREADS) {
413 } catch (InterruptedException e) {
418 rootLibrary = core.getBuiltin("http:/");
419 boolean builtinsInstalled = rootLibrary != 0;
421 if (builtinsInstalled) {
422 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
423 assert (functionalRelation != 0);
425 functionalRelation = 0;
427 if (builtinsInstalled) {
428 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
429 assert (instanceOf != 0);
433 if (builtinsInstalled) {
434 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
435 assert (inverseOf != 0);
440 if (builtinsInstalled) {
441 inherits = core.getBuiltin(Layer0.URIs.Inherits);
442 assert (inherits != 0);
446 if (builtinsInstalled) {
447 asserts = core.getBuiltin(Layer0.URIs.Asserts);
448 assert (asserts != 0);
452 if (builtinsInstalled) {
453 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
454 assert (hasPredicate != 0);
458 if (builtinsInstalled) {
459 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
460 assert (hasPredicateInverse != 0);
462 hasPredicateInverse = 0;
464 if (builtinsInstalled) {
465 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
466 assert (hasObject != 0);
470 if (builtinsInstalled) {
471 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
472 assert (subrelationOf != 0);
476 if (builtinsInstalled) {
477 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
478 assert (superrelationOf != 0);
482 if (builtinsInstalled) {
483 library = core.getBuiltin(Layer0.URIs.Library);
484 assert (library != 0);
488 if (builtinsInstalled) {
489 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
490 assert (consistsOf != 0);
494 if (builtinsInstalled) {
495 hasName = core.getBuiltin(Layer0.URIs.HasName);
496 assert (hasName != 0);
502 final public void releaseWrite(ReadGraphImpl graph) {
503 propagateChangesInQueryCache(graph);
504 modificationCounter++;
507 final public int getId(final Resource r) {
508 return querySupport.getId(r);
511 public QuerySupport getCore() {
515 public int getFunctionalRelation() {
516 return functionalRelation;
519 public int getInherits() {
523 public int getInstanceOf() {
527 public int getInverseOf() {
531 public int getSubrelationOf() {
532 return subrelationOf;
535 public int getSuperrelationOf() {
536 return superrelationOf;
539 public int getAsserts() {
543 public int getHasPredicate() {
547 public int getHasPredicateInverse() {
548 return hasPredicateInverse;
551 public int getHasObject() {
555 public int getRootLibrary() {
559 public Resource getRootLibraryResource() {
560 if (rootLibraryResource == null) {
561 // Synchronization is not needed here, it doesn't matter if multiple
562 // threads simultaneously set rootLibraryResource once.
563 int root = getRootLibrary();
565 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
566 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
568 return rootLibraryResource;
571 public int getLibrary() {
575 public int getConsistsOf() {
579 public int getHasName() {
583 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
587 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
590 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
592 if (result != null && result != 0) {
593 procedure.execute(graph, result);
597 // Fall back to using the fixed builtins.
598 // result = querySupport.getBuiltin(id);
599 // if (result != 0) {
600 // procedure.execute(graph, result);
605 // result = querySupport.getRandomAccessReference(id);
606 // } catch (ResourceNotFoundException e) {
607 // procedure.exception(graph, e);
612 procedure.execute(graph, result);
614 procedure.exception(graph, new ResourceNotFoundException(id));
620 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
621 procedure.exception(graph, t);
625 } catch (DatabaseException e) {
629 procedure.exception(graph, e);
631 } catch (DatabaseException e1) {
633 Logger.defaultLogError(e1);
641 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
643 Integer result = querySupport.getBuiltin(id);
645 procedure.execute(graph, result);
647 procedure.exception(graph, new ResourceNotFoundException(id));
652 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
655 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
656 } catch (DatabaseException e) {
657 throw new IllegalStateException(e);
662 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
666 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
667 } catch (DatabaseException e) {
668 throw new IllegalStateException(e);
673 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
674 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
678 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
680 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
684 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
686 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
690 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
692 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
696 boolean isBound(ExternalReadEntry<?> entry) {
697 if(entry.hasParents()) return true;
698 else if(listening.hasListener(entry)) return true;
702 static class Dummy implements InternalProcedure<Object>, IntProcedure {
705 public void execute(ReadGraphImpl graph, int i) {
709 public void finished(ReadGraphImpl graph) {
713 public void execute(ReadGraphImpl graph, Object result) {
717 public void exception(ReadGraphImpl graph, Throwable throwable) {
722 private static final Dummy dummy = new Dummy();
725 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
727 if (DebugPolicy.PERFORM)
728 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
731 assert (!collecting);
733 assert(query.assertNotDiscarded());
735 registerDependencies(graph, query, parent, listener, procedure, false);
737 // FRESH, REFUTED, EXCEPTED go here
738 if (!query.isReady()) {
743 query.computeForEach(graph, this, (Procedure)dummy, true);
744 return query.get(graph, this, null);
750 return query.get(graph, this, procedure);
758 interface QueryCollectorSupport {
759 public CacheCollectionResult allCaches();
760 public Collection<CacheEntry> getRootList();
761 public int getCurrentSize();
762 public int calculateCurrentSize();
763 public CacheEntryBase iterate(int level);
764 public void remove();
765 public void setLevel(CacheEntryBase entry, int level);
766 public boolean start(boolean flush);
769 interface QueryCollector {
771 public void collect(int youngTarget, int allowedTimeInMs);
775 class QueryCollectorSupportImpl implements QueryCollectorSupport {
777 private static final boolean DEBUG = false;
778 private static final double ITERATION_RATIO = 0.2;
780 private CacheCollectionResult iteration = new CacheCollectionResult();
781 private boolean fresh = true;
782 private boolean needDataInStart = true;
784 QueryCollectorSupportImpl() {
788 public CacheCollectionResult allCaches() {
789 CacheCollectionResult result = new CacheCollectionResult();
790 QueryProcessor.this.allCaches(result);
795 public boolean start(boolean flush) {
796 // We need new data from query maps
798 if(needDataInStart || flush) {
799 // Last run ended after processing all queries => refresh data
800 restart(flush ? 0.0 : ITERATION_RATIO);
802 // continue with previous big data
804 // Notify caller about iteration situation
805 return iteration.isAtStart();
808 private void restart(double targetRatio) {
810 needDataInStart = true;
812 long start = System.nanoTime();
815 // We need new data from query maps
817 int iterationSize = iteration.size()+1;
818 int diff = calculateCurrentSize()-iterationSize;
820 double ratio = (double)diff / (double)iterationSize;
821 boolean dirty = Math.abs(ratio) >= targetRatio;
824 iteration = allCaches();
826 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
827 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
828 System.err.print(" " + iteration.levels[i].size());
829 System.err.println("");
836 needDataInStart = false;
838 // We are returning here within the same GC round - reuse the cache table
847 public CacheEntryBase iterate(int level) {
849 CacheEntryBase entry = iteration.next(level);
851 restart(ITERATION_RATIO);
855 while(entry != null && entry.isDiscarded()) {
856 entry = iteration.next(level);
864 public void remove() {
869 public void setLevel(CacheEntryBase entry, int level) {
870 iteration.setLevel(entry, level);
873 public Collection<CacheEntry> getRootList() {
874 return cache.getRootList();
878 public int calculateCurrentSize() {
879 return cache.calculateCurrentSize();
883 public int getCurrentSize() {
888 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
890 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
891 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
893 public int querySize() {
897 public void gc(int youngTarget, int allowedTimeInMs) {
899 collector.collect(youngTarget, allowedTimeInMs);
904 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
906 if(entry.isDiscarded()) return;
907 if(workarea.containsKey(entry)) return;
909 Iterable<CacheEntry> parents = entry.getParents(this);
910 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
911 for(CacheEntry e : parents) {
912 if(e.isDiscarded()) continue;
914 processParentReport(e, workarea);
916 workarea.put(entry, ps);
920 public synchronized String reportQueryActivity(File file) throws IOException {
922 System.err.println("reportQueries " + file.getAbsolutePath());
927 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
929 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
930 Collections.reverse(entries);
932 for(Pair<String,Integer> entry : entries) {
933 b.println(entry.first + ": " + entry.second);
938 Development.histogram.clear();
944 public synchronized String reportQueries(File file) throws IOException {
946 System.err.println("reportQueries " + file.getAbsolutePath());
951 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
953 long start = System.nanoTime();
955 // ArrayList<CacheEntry> all = ;
957 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
958 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
959 for(CacheEntryBase entry : caches) {
960 processParentReport(entry, workarea);
963 // for(CacheEntry e : all) System.err.println("entry: " + e);
965 long duration = System.nanoTime() - start;
966 System.err.println("Query root set in " + 1e-9*duration + "s.");
968 start = System.nanoTime();
970 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
974 for(CacheEntry entry : workarea.keySet()) {
975 boolean listener = listening.hasListenerAfterDisposing(entry);
976 boolean hasParents = entry.getParents(this).iterator().hasNext();
979 flagMap.put(entry, 0);
980 } else if (!hasParents) {
982 flagMap.put(entry, 1);
985 flagMap.put(entry, 2);
998 long start2 = System.nanoTime();
1000 int boundCounter = 0;
1001 int unboundCounter = 0;
1002 int unknownCounter = 0;
1004 for(CacheEntry<?> entry : workarea.keySet()) {
1006 //System.err.println("process " + entry);
1008 int flags = flagMap.get(entry);
1009 int bindStatus = flags & 3;
1011 if(bindStatus == 0) boundCounter++;
1012 else if(bindStatus == 1) unboundCounter++;
1013 else if(bindStatus == 2) unknownCounter++;
1015 if(bindStatus < 2) continue;
1018 for(CacheEntry parent : entry.getParents(this)) {
1020 if(parent.isDiscarded()) flagMap.put(parent, 1);
1022 int flags2 = flagMap.get(parent);
1023 int bindStatus2 = flags2 & 3;
1024 // Parent is bound => child is bound
1025 if(bindStatus2 == 0) {
1029 // Parent is unknown => child is unknown
1030 else if (bindStatus2 == 2) {
1037 flagMap.put(entry, newStatus);
1041 duration = System.nanoTime() - start2;
1042 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1043 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1045 } while(!done && loops++ < 20);
1049 for(CacheEntry entry : workarea.keySet()) {
1051 int bindStatus = flagMap.get(entry);
1052 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1058 duration = System.nanoTime() - start;
1059 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1061 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1063 for(CacheEntry entry : workarea.keySet()) {
1064 Class<?> clazz = entry.getClass();
1065 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).id.getClass();
1066 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).id.getClass();
1067 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).id.getClass();
1068 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).id.getClass();
1069 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).id.getClass();
1070 Integer c = counts.get(clazz);
1071 if(c == null) counts.put(clazz, -1);
1072 else counts.put(clazz, c-1);
1075 b.print("// Simantics DB client query report file\n");
1076 b.print("// This file contains the following information\n");
1077 b.print("// -The amount of cached query instances per query class\n");
1078 b.print("// -The sizes of retained child sets\n");
1079 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1080 b.print("// -Followed by status, where\n");
1081 b.print("// -0=bound\n");
1082 b.print("// -1=free\n");
1083 b.print("// -2=unknown\n");
1084 b.print("// -L=has listener\n");
1085 b.print("// -List of children for each query (search for 'C <query name>')\n");
1087 b.print("----------------------------------------\n");
1089 b.print("// Queries by class\n");
1090 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1091 b.print(-p.second + " " + p.first.getName() + "\n");
1094 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1095 for(CacheEntry e : workarea.keySet())
1098 boolean changed = true;
1100 while(changed && iter++<50) {
1104 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1105 for(CacheEntry e : workarea.keySet())
1108 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1109 Integer c = hist.get(e.getKey());
1110 for(CacheEntry p : e.getValue()) {
1111 Integer i = newHist.get(p);
1112 newHist.put(p, i+c);
1115 for(CacheEntry e : workarea.keySet()) {
1116 Integer value = newHist.get(e);
1117 Integer old = hist.get(e);
1118 if(!value.equals(old)) {
1120 // System.err.println("hist " + e + ": " + old + " => " + value);
1125 System.err.println("Retained set iteration " + iter);
1129 b.print("// Queries by retained set\n");
1130 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1131 b.print("" + -p.second + " " + p.first + "\n");
1134 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1136 b.print("// Entry parent listing\n");
1137 for(CacheEntry entry : workarea.keySet()) {
1138 int status = flagMap.get(entry);
1139 boolean hasListener = listening.hasListenerAfterDisposing(entry);
1140 b.print("Q " + entry.toString());
1142 b.print(" (L" + status + ")");
1145 b.print(" (" + status + ")");
1148 for(CacheEntry parent : workarea.get(entry)) {
1149 Collection<CacheEntry> inv = inverse.get(parent);
1151 inv = new ArrayList<CacheEntry>();
1152 inverse.put(parent, inv);
1155 b.print(" " + parent.toString());
1160 b.print("// Entry child listing\n");
1161 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1162 b.print("C " + entry.getKey().toString());
1164 for(CacheEntry child : entry.getValue()) {
1165 Integer h = hist.get(child);
1169 b.print(" <no children>");
1171 b.print(" " + child.toString());
1176 b.print("#queries: " + workarea.keySet().size() + "\n");
1177 b.print("#listeners: " + listeners + "\n");
1181 return "Dumped " + workarea.keySet().size() + " queries.";
1185 boolean removeQuery(CacheEntry entry) {
1187 // This entry has been removed before. No need to do anything here.
1188 if(entry.isDiscarded()) return false;
1190 assert (!entry.isDiscarded());
1192 Query query = entry.getQuery();
1194 query.removeEntry(this);
1199 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1210 * @return true if this entry is being listened
1212 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1216 CacheEntry entry = e.entry;
1219 * If the dependency graph forms a DAG, some entries are inserted in the
1220 * todo list many times. They only need to be processed once though.
1222 if (entry.isDiscarded()) {
1223 if (Development.DEVELOPMENT) {
1224 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1225 System.err.print("D");
1226 for (int i = 0; i < e.indent; i++)
1227 System.err.print(" ");
1228 System.err.println(entry.getQuery());
1231 // System.err.println(" => DISCARDED");
1235 // if (entry.isRefuted()) {
1236 // if (Development.DEVELOPMENT) {
1237 // if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1238 // System.err.print("R");
1239 // for (int i = 0; i < e.indent; i++)
1240 // System.err.print(" ");
1241 // System.err.println(entry.getQuery());
1247 if (entry.isExcepted()) {
1248 if (Development.DEVELOPMENT) {
1249 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1250 System.err.print("E");
1255 if (entry.isPending()) {
1256 if (Development.DEVELOPMENT) {
1257 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1258 System.err.print("P");
1265 if (Development.DEVELOPMENT) {
1266 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1267 System.err.print("U ");
1268 for (int i = 0; i < e.indent; i++)
1269 System.err.print(" ");
1270 System.err.print(entry.getQuery());
1274 Query query = entry.getQuery();
1275 int type = query.type();
1277 boolean hasListener = listening.hasListener(entry);
1279 if (Development.DEVELOPMENT) {
1280 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1281 if(listening.hasListener(entry)) {
1282 System.err.println(" (L)");
1284 System.err.println("");
1289 if(entry.isPending() || entry.isExcepted()) {
1292 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1294 immediates.put(entry, entry);
1309 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1311 immediates.put(entry, entry);
1325 // System.err.println(" => FOO " + type);
1328 ArrayList<ListenerEntry> entries = listening.listeners.get(entry);
1329 if(entries != null) {
1330 for (ListenerEntry le : entries) {
1331 listening.scheduleListener(le);
1336 // If invalid, update parents
1337 if (type == RequestFlags.INVALIDATE) {
1338 listening.updateParents(e.indent, entry, todo);
1346 * @param av1 an array (guaranteed)
1347 * @param av2 any object
1348 * @return <code>true</code> if the two arrays are equal
1350 private final boolean arrayEquals(Object av1, Object av2) {
1353 Class<?> c1 = av1.getClass().getComponentType();
1354 Class<?> c2 = av2.getClass().getComponentType();
1355 if (c2 == null || !c1.equals(c2))
1357 boolean p1 = c1.isPrimitive();
1358 boolean p2 = c2.isPrimitive();
1362 return Arrays.equals((Object[]) av1, (Object[]) av2);
1363 if (boolean.class.equals(c1))
1364 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1365 else if (byte.class.equals(c1))
1366 return Arrays.equals((byte[]) av1, (byte[]) av2);
1367 else if (int.class.equals(c1))
1368 return Arrays.equals((int[]) av1, (int[]) av2);
1369 else if (long.class.equals(c1))
1370 return Arrays.equals((long[]) av1, (long[]) av2);
1371 else if (float.class.equals(c1))
1372 return Arrays.equals((float[]) av1, (float[]) av2);
1373 else if (double.class.equals(c1))
1374 return Arrays.equals((double[]) av1, (double[]) av2);
1375 throw new RuntimeException("??? Contact application querySupport.");
1380 final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1384 Query query = entry.getQuery();
1386 if (Development.DEVELOPMENT) {
1387 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_RECOMPUTE, Bindings.BOOLEAN)) {
1388 System.err.println("R " + query);
1392 entry.prepareRecompute(querySupport);
1394 ReadGraphImpl parentGraph = graph.forRecompute(entry);
1396 query.recompute(parentGraph);
1398 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1400 Object newValue = entry.getResult();
1402 if (ListenerEntry.NO_VALUE == oldValue) {
1403 if (Development.DEVELOPMENT) {
1404 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1405 System.out.println("C " + query);
1406 System.out.println("- " + oldValue);
1407 System.out.println("- " + newValue);
1413 boolean changed = false;
1415 if (newValue != null) {
1416 if (newValue.getClass().isArray()) {
1417 changed = !arrayEquals(newValue, oldValue);
1419 changed = !newValue.equals(oldValue);
1422 changed = (oldValue != null);
1424 if (Development.DEVELOPMENT) {
1425 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1426 System.err.println("C " + query);
1427 System.err.println("- " + oldValue);
1428 System.err.println("- " + newValue);
1432 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1434 } catch (Throwable t) {
1436 Logger.defaultLogError(t);
1438 return ListenerEntry.NO_VALUE;
1447 * @return true if this entry still has listeners
1449 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1451 assert (!cache.collecting);
1455 boolean hadListeners = false;
1456 boolean listenersUnknown = false;
1460 assert(entry != null);
1461 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1462 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1463 todo.add(new UpdateEntry(null, entry, 0));
1467 // Walk the tree and collect immediate updates
1468 while (!todo.isEmpty()) {
1469 UpdateEntry e = todo.pop();
1470 hadListeners |= updateQuery(e, todo, immediates);
1473 if(immediates.isEmpty()) break;
1475 // Evaluate all immediate updates and collect parents to update
1476 for(CacheEntry immediate : immediates.values()) {
1478 if(immediate.isDiscarded()) {
1482 if(immediate.isExcepted()) {
1484 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1485 if (newValue != ListenerEntry.NOT_CHANGED)
1486 listening.updateParents(0, immediate, todo);
1490 Object oldValue = immediate.getResult();
1491 Object newValue = compareTo(graph, immediate, oldValue);
1493 if (newValue != ListenerEntry.NOT_CHANGED) {
1494 listening.updateParents(0, immediate, todo);
1496 // If not changed, keep the old value
1497 immediate.setResult(oldValue);
1498 immediate.setReady();
1499 listenersUnknown = true;
1509 } catch (Throwable t) {
1510 Logger.defaultLogError(t);
1516 return hadListeners | listenersUnknown;
1520 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1521 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1522 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1523 // Maybe use a mutex from util.concurrent?
1524 private Object primitiveUpdateLock = new Object();
1525 private THashSet scheduledPrimitiveUpdates = new THashSet();
1527 private ArrayList<CacheEntry> refutations = new ArrayList<>();
1529 private void markForUpdate(ReadGraphImpl graph, CacheEntry e) {
1534 private void updateRefutations(ReadGraphImpl graph) {
1536 for(CacheEntry e : refutations)
1539 refutations.clear();
1543 public void propagateChangesInQueryCache(final ReadGraphImpl graph) {
1545 // Make sure that listening has performed its work
1548 cache.dirty = false;
1551 if (Development.DEVELOPMENT) {
1552 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1553 System.err.println("== Query update ==");
1557 // Special case - one statement
1558 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1560 long arg0 = scheduledObjectUpdates.getFirst();
1562 final int subject = (int)(arg0 >>> 32);
1563 final int predicate = (int)(arg0 & 0xffffffff);
1565 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1566 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1567 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1569 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1570 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1571 if(principalTypes != null) markForUpdate(graph, principalTypes);
1572 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1573 if(types != null) markForUpdate(graph, types);
1576 if(predicate == subrelationOf) {
1577 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1578 if(superRelations != null) markForUpdate(graph, superRelations);
1581 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1582 if(dp != null) markForUpdate(graph, dp);
1583 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1584 if(os != null) markForUpdate(graph, os);
1586 updateRefutations(graph);
1588 scheduledObjectUpdates.clear();
1590 if (Development.DEVELOPMENT) {
1591 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1592 System.err.println("== Query update ends ==");
1600 // Special case - one value
1601 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1603 int arg0 = scheduledValueUpdates.getFirst();
1605 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1606 if(valueQuery != null) markForUpdate(graph, valueQuery);
1608 updateRefutations(graph);
1610 scheduledValueUpdates.clear();
1612 if (Development.DEVELOPMENT) {
1613 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1614 System.err.println("== Query update ends ==");
1622 final TIntHashSet predicates = new TIntHashSet();
1623 final TIntHashSet orderedSets = new TIntHashSet();
1625 THashSet primitiveUpdates;
1626 synchronized (primitiveUpdateLock) {
1627 primitiveUpdates = scheduledPrimitiveUpdates;
1628 scheduledPrimitiveUpdates = new THashSet();
1631 scheduledValueUpdates.forEach(new TIntProcedure() {
1634 public boolean execute(int arg0) {
1635 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1636 if(valueQuery != null) markForUpdate(graph, valueQuery);
1642 scheduledInvalidates.forEach(new TIntProcedure() {
1645 public boolean execute(int resource) {
1647 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1648 if(valueQuery != null) markForUpdate(graph, valueQuery);
1650 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1651 if(principalTypes != null) markForUpdate(graph, principalTypes);
1652 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1653 if(types != null) markForUpdate(graph, types);
1655 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1656 if(superRelations != null) markForUpdate(graph, superRelations);
1658 predicates.add(resource);
1665 scheduledObjectUpdates.forEach(new TLongProcedure() {
1668 public boolean execute(long arg0) {
1670 final int subject = (int)(arg0 >>> 32);
1671 final int predicate = (int)(arg0 & 0xffffffff);
1673 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1674 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1675 if(principalTypes != null) markForUpdate(graph, principalTypes);
1676 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1677 if(types != null) markForUpdate(graph, types);
1680 if(predicate == subrelationOf) {
1681 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1682 if(superRelations != null) markForUpdate(graph, superRelations);
1685 predicates.add(subject);
1686 orderedSets.add(predicate);
1694 predicates.forEach(new TIntProcedure() {
1697 public boolean execute(final int subject) {
1699 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1700 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
1701 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
1703 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1704 if(entry != null) markForUpdate(graph, entry);
1712 orderedSets.forEach(new TIntProcedure() {
1715 public boolean execute(int orderedSet) {
1717 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1718 if(entry != null) markForUpdate(graph, entry);
1726 updateRefutations(graph);
1728 primitiveUpdates.forEach(new TObjectProcedure() {
1731 public boolean execute(Object arg0) {
1733 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1734 if (query != null) {
1735 boolean listening = update(graph, query);
1736 if (!listening && !query.hasParents()) {
1737 cache.externalReadEntryMap.remove(arg0);
1746 scheduledValueUpdates.clear();
1747 scheduledObjectUpdates.clear();
1748 scheduledInvalidates.clear();
1750 if (Development.DEVELOPMENT) {
1751 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1752 System.err.println("== Query update ends ==");
1758 public void updateValue(final int resource) {
1759 scheduledValueUpdates.add(resource);
1763 public void updateStatements(final int resource, final int predicate) {
1764 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
1768 private int lastInvalidate = 0;
1770 public void invalidateResource(final int resource) {
1771 if(lastInvalidate == resource) return;
1772 scheduledValueUpdates.add(resource);
1773 lastInvalidate = resource;
1777 public void updatePrimitive(final ExternalRead primitive) {
1779 // External reads may be updated from arbitrary threads.
1780 // Synchronize to prevent race-conditions.
1781 synchronized (primitiveUpdateLock) {
1782 scheduledPrimitiveUpdates.add(primitive);
1784 querySupport.dirtyPrimitives();
1789 public synchronized String toString() {
1790 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
1794 protected void doDispose() {
1796 for(int index = 0; index < THREADS; index++) {
1797 executors[index].dispose();
1801 for(int i=0;i<100;i++) {
1803 boolean alive = false;
1804 for(int index = 0; index < THREADS; index++) {
1805 alive |= executors[index].isAlive();
1810 } catch (InterruptedException e) {
1811 Logger.defaultLogError(e);
1816 // Then start interrupting
1817 for(int i=0;i<100;i++) {
1819 boolean alive = false;
1820 for(int index = 0; index < THREADS; index++) {
1821 alive |= executors[index].isAlive();
1824 for(int index = 0; index < THREADS; index++) {
1825 executors[index].interrupt();
1829 // // Then just destroy
1830 // for(int index = 0; index < THREADS; index++) {
1831 // executors[index].destroy();
1834 for(int index = 0; index < THREADS; index++) {
1836 executors[index].join(5000);
1837 } catch (InterruptedException e) {
1838 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
1840 executors[index] = null;
1845 public int getHits() {
1849 public int getMisses() {
1850 return cache.misses;
1853 public int getSize() {
1857 public Set<Long> getReferencedClusters() {
1858 HashSet<Long> result = new HashSet<Long>();
1859 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
1860 Objects query = (Objects) entry.getQuery();
1861 result.add(querySupport.getClusterId(query.r1()));
1863 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
1864 DirectPredicates query = (DirectPredicates) entry.getQuery();
1865 result.add(querySupport.getClusterId(query.id));
1867 for (CacheEntry entry : cache.valueQueryMap.values()) {
1868 ValueQuery query = (ValueQuery) entry.getQuery();
1869 result.add(querySupport.getClusterId(query.id));
1874 public void assertDone() {
1877 CacheCollectionResult allCaches(CacheCollectionResult result) {
1879 return cache.allCaches(result);
1883 public void printDiagnostics() {
1886 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
1887 querySupport.requestCluster(graph, clusterId, runnable);
1890 public int clean() {
1891 collector.collect(0, Integer.MAX_VALUE);
1895 public void clean(final Collection<ExternalRead<?>> requests) {
1896 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
1897 Iterator<ExternalRead<?>> iterator = requests.iterator();
1899 public CacheCollectionResult allCaches() {
1900 throw new UnsupportedOperationException();
1903 public CacheEntryBase iterate(int level) {
1904 if(iterator.hasNext()) {
1905 ExternalRead<?> request = iterator.next();
1906 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1907 if (entry != null) return entry;
1908 else return iterate(level);
1910 iterator = requests.iterator();
1915 public void remove() {
1916 throw new UnsupportedOperationException();
1919 public void setLevel(CacheEntryBase entry, int level) {
1920 throw new UnsupportedOperationException();
1923 public Collection<CacheEntry> getRootList() {
1924 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
1925 for (ExternalRead<?> request : requests) {
1926 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
1933 public int getCurrentSize() {
1937 public int calculateCurrentSize() {
1938 // This tells the collector to attempt collecting everything.
1939 return Integer.MAX_VALUE;
1942 public boolean start(boolean flush) {
1946 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
1949 public void scanPending() {
1951 cache.scanPending();
1955 public ReadGraphImpl graphForVirtualRequest() {
1956 return ReadGraphImpl.createAsync(this);
1960 private HashMap<Resource, Class<?>> builtinValues;
1962 public Class<?> getBuiltinValue(Resource r) {
1963 if(builtinValues == null) initBuiltinValues();
1964 return builtinValues.get(r);
1967 Exception callerException = null;
1969 public interface AsyncBarrier {
1972 // public void inc(String debug);
1973 // public void dec(String debug);
1976 // final public QueryProcessor processor;
1977 // final public QuerySupport support;
1979 // boolean disposed = false;
1981 private void initBuiltinValues() {
1983 Layer0 b = getSession().peekService(Layer0.class);
1984 if(b == null) return;
1986 builtinValues = new HashMap<Resource, Class<?>>();
1988 builtinValues.put(b.String, String.class);
1989 builtinValues.put(b.Double, Double.class);
1990 builtinValues.put(b.Float, Float.class);
1991 builtinValues.put(b.Long, Long.class);
1992 builtinValues.put(b.Integer, Integer.class);
1993 builtinValues.put(b.Byte, Byte.class);
1994 builtinValues.put(b.Boolean, Boolean.class);
1996 builtinValues.put(b.StringArray, String[].class);
1997 builtinValues.put(b.DoubleArray, double[].class);
1998 builtinValues.put(b.FloatArray, float[].class);
1999 builtinValues.put(b.LongArray, long[].class);
2000 builtinValues.put(b.IntegerArray, int[].class);
2001 builtinValues.put(b.ByteArray, byte[].class);
2002 builtinValues.put(b.BooleanArray, boolean[].class);
2006 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
2008 // if (null == provider2) {
2009 // this.processor = null;
2013 // this.processor = provider2;
2014 // support = provider2.getCore();
2015 // initBuiltinValues();
2019 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2020 // return new ReadGraphSupportImpl(impl.processor);
2024 final public Session getSession() {
2028 final public ResourceSupport getResourceSupport() {
2029 return resourceSupport;
2033 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2037 for(Resource predicate : getPredicates(impl, subject))
2038 procedure.execute(impl, predicate);
2040 procedure.finished(impl);
2042 } catch (Throwable e) {
2043 procedure.exception(impl, e);
2049 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2051 throw new UnsupportedOperationException();
2053 // assert(subject != null);
2054 // assert(procedure != null);
2056 // final ListenerBase listener = getListenerBase(procedure);
2059 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2062 // public void execute(ReadGraphImpl graph, int i) {
2064 // procedure.execute(querySupport.getResource(i));
2065 // } catch (Throwable t2) {
2066 // Logger.defaultLogError(t2);
2071 // public void finished(ReadGraphImpl graph) {
2073 // procedure.finished();
2074 // } catch (Throwable t2) {
2075 // Logger.defaultLogError(t2);
2077 //// impl.state.barrier.dec();
2081 // public void exception(ReadGraphImpl graph, Throwable t) {
2083 // procedure.exception(t);
2084 // } catch (Throwable t2) {
2085 // Logger.defaultLogError(t2);
2087 //// impl.state.barrier.dec();
2091 // } catch (DatabaseException e) {
2092 // Logger.defaultLogError(e);
2098 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2099 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2103 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2104 final Resource predicate, final MultiProcedure<Statement> procedure) {
2106 assert(subject != null);
2107 assert(predicate != null);
2108 assert(procedure != null);
2110 final ListenerBase listener = getListenerBase(procedure);
2112 // impl.state.barrier.inc();
2115 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2118 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2120 procedure.execute(querySupport.getStatement(s, p, o));
2121 } catch (Throwable t2) {
2122 Logger.defaultLogError(t2);
2127 public void finished(ReadGraphImpl graph) {
2129 procedure.finished();
2130 } catch (Throwable t2) {
2131 Logger.defaultLogError(t2);
2133 // impl.state.barrier.dec();
2137 public void exception(ReadGraphImpl graph, Throwable t) {
2139 procedure.exception(t);
2140 } catch (Throwable t2) {
2141 Logger.defaultLogError(t2);
2143 // impl.state.barrier.dec();
2147 } catch (DatabaseException e) {
2148 Logger.defaultLogError(e);
2154 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2155 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2157 assert(subject != null);
2158 assert(predicate != null);
2159 assert(procedure != null);
2161 final ListenerBase listener = getListenerBase(procedure);
2163 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2165 boolean first = true;
2168 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2171 procedure.execute(graph, querySupport.getStatement(s, p, o));
2173 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2175 } catch (Throwable t2) {
2176 Logger.defaultLogError(t2);
2181 public void finished(ReadGraphImpl graph) {
2186 procedure.finished(graph);
2187 // impl.state.barrier.dec(this);
2189 procedure.finished(impl.newRestart(graph));
2191 } catch (Throwable t2) {
2192 Logger.defaultLogError(t2);
2198 public void exception(ReadGraphImpl graph, Throwable t) {
2203 procedure.exception(graph, t);
2204 // impl.state.barrier.dec(this);
2206 procedure.exception(impl.newRestart(graph), t);
2208 } catch (Throwable t2) {
2209 Logger.defaultLogError(t2);
2216 int sId = querySupport.getId(subject);
2217 int pId = querySupport.getId(predicate);
2219 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2220 // else impl.state.barrier.inc(null, null);
2223 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2224 } catch (DatabaseException e) {
2225 Logger.defaultLogError(e);
2231 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2232 final Resource predicate, final StatementProcedure procedure) {
2234 assert(subject != null);
2235 assert(predicate != null);
2236 assert(procedure != null);
2238 final ListenerBase listener = getListenerBase(procedure);
2240 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2242 boolean first = true;
2245 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2248 procedure.execute(graph, s, p, o);
2250 procedure.execute(impl.newRestart(graph), s, p, o);
2252 } catch (Throwable t2) {
2253 Logger.defaultLogError(t2);
2258 public void finished(ReadGraphImpl graph) {
2263 procedure.finished(graph);
2264 // impl.state.barrier.dec(this);
2266 procedure.finished(impl.newRestart(graph));
2268 } catch (Throwable t2) {
2269 Logger.defaultLogError(t2);
2275 public void exception(ReadGraphImpl graph, Throwable t) {
2280 procedure.exception(graph, t);
2281 // impl.state.barrier.dec(this);
2283 procedure.exception(impl.newRestart(graph), t);
2285 } catch (Throwable t2) {
2286 Logger.defaultLogError(t2);
2293 int sId = querySupport.getId(subject);
2294 int pId = querySupport.getId(predicate);
2296 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2297 // else impl.state.barrier.inc(null, null);
2300 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2301 } catch (DatabaseException e) {
2302 Logger.defaultLogError(e);
2308 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2310 assert(subject != null);
2311 assert(predicate != null);
2312 assert(procedure != null);
2314 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2316 private Set<Statement> current = null;
2317 private Set<Statement> run = new HashSet<Statement>();
2320 public void execute(AsyncReadGraph graph, Statement result) {
2322 boolean found = false;
2324 if(current != null) {
2326 found = current.remove(result);
2330 if(!found) procedure.add(graph, result);
2337 public void finished(AsyncReadGraph graph) {
2339 if(current != null) {
2340 for(Statement r : current) procedure.remove(graph, r);
2345 run = new HashSet<Statement>();
2350 public void exception(AsyncReadGraph graph, Throwable t) {
2351 procedure.exception(graph, t);
2355 public boolean isDisposed() {
2356 return procedure.isDisposed();
2364 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2365 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2367 assert(subject != null);
2368 assert(predicate != null);
2369 assert(procedure != null);
2371 final ListenerBase listener = getListenerBase(procedure);
2373 // impl.state.barrier.inc();
2376 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2379 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2381 procedure.execute(graph, querySupport.getStatement(s, p, o));
2382 } catch (Throwable t2) {
2383 Logger.defaultLogError(t2);
2388 public void finished(ReadGraphImpl graph) {
2390 procedure.finished(graph);
2391 } catch (Throwable t2) {
2392 Logger.defaultLogError(t2);
2394 // impl.state.barrier.dec();
2398 public void exception(ReadGraphImpl graph, Throwable t) {
2400 procedure.exception(graph, t);
2401 } catch (Throwable t2) {
2402 Logger.defaultLogError(t2);
2404 // impl.state.barrier.dec();
2408 } catch (DatabaseException e) {
2409 Logger.defaultLogError(e);
2414 private static ListenerBase getListenerBase(Object procedure) {
2415 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2420 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2422 assert(subject != null);
2423 assert(predicate != null);
2424 assert(procedure != null);
2426 final ListenerBase listener = getListenerBase(procedure);
2428 // impl.state.barrier.inc();
2431 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2434 public void execute(ReadGraphImpl graph, int i) {
2436 procedure.execute(querySupport.getResource(i));
2437 } catch (Throwable t2) {
2438 Logger.defaultLogError(t2);
2443 public void finished(ReadGraphImpl graph) {
2445 procedure.finished();
2446 } catch (Throwable t2) {
2447 Logger.defaultLogError(t2);
2449 // impl.state.barrier.dec();
2453 public void exception(ReadGraphImpl graph, Throwable t) {
2454 System.out.println("forEachObject exception " + t);
2456 procedure.exception(t);
2457 } catch (Throwable t2) {
2458 Logger.defaultLogError(t2);
2460 // impl.state.barrier.dec();
2464 } catch (DatabaseException e) {
2465 Logger.defaultLogError(e);
2471 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
2473 assert(subject != null);
2474 assert(procedure != null);
2476 final ListenerBase listener = getListenerBase(procedure);
2478 int sId = querySupport.getId(subject);
2481 QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
2484 public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
2485 procedure.execute(graph, result);
2489 public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
2490 procedure.exception(graph, throwable);
2494 } catch (DatabaseException e) {
2495 Logger.defaultLogError(e);
2500 final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
2502 // assert(subject != null);
2503 // assert(procedure != null);
2505 // final ListenerBase listener = getListenerBase(procedure);
2507 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2509 return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
2514 // final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2516 // assert(subject != null);
2517 // assert(procedure != null);
2519 // final ListenerBase listener = getListenerBase(procedure);
2521 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2525 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2528 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2530 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2532 private Resource single = null;
2535 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2536 if(single == null) {
2539 single = INVALID_RESOURCE;
2544 public synchronized void finished(AsyncReadGraph graph) {
2545 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2546 else procedure.execute(graph, single);
2550 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2551 procedure.exception(graph, throwable);
2558 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2560 final int sId = querySupport.getId(subject);
2561 final int pId = querySupport.getId(predicate);
2564 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2565 } catch (DatabaseException e) {
2566 Logger.defaultLogError(e);
2571 static class Runner2Procedure implements IntProcedure {
2573 public int single = 0;
2574 public Throwable t = null;
2576 public void clear() {
2582 public void execute(ReadGraphImpl graph, int i) {
2583 if(single == 0) single = i;
2588 public void finished(ReadGraphImpl graph) {
2589 if(single == -1) single = 0;
2593 public void exception(ReadGraphImpl graph, Throwable throwable) {
2598 public int get() throws DatabaseException {
2600 if(t instanceof DatabaseException) throw (DatabaseException)t;
2601 else throw new DatabaseException(t);
2608 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2610 final int sId = querySupport.getId(subject);
2611 final int pId = querySupport.getId(predicate);
2613 Runner2Procedure proc = new Runner2Procedure();
2614 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2619 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2621 assert(subject != null);
2622 assert(predicate != null);
2624 final ListenerBase listener = getListenerBase(procedure);
2626 if(impl.parent != null || listener != null) {
2628 IntProcedure ip = new IntProcedure() {
2630 AtomicBoolean first = new AtomicBoolean(true);
2633 public void execute(ReadGraphImpl graph, int i) {
2636 procedure.execute(impl, querySupport.getResource(i));
2638 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2640 } catch (Throwable t2) {
2641 Logger.defaultLogError(t2);
2647 public void finished(ReadGraphImpl graph) {
2649 if(first.compareAndSet(true, false)) {
2650 procedure.finished(impl);
2651 // impl.state.barrier.dec(this);
2653 procedure.finished(impl.newRestart(graph));
2655 } catch (Throwable t2) {
2656 Logger.defaultLogError(t2);
2661 public void exception(ReadGraphImpl graph, Throwable t) {
2663 procedure.exception(graph, t);
2664 } catch (Throwable t2) {
2665 Logger.defaultLogError(t2);
2667 // impl.state.barrier.dec(this);
2671 public String toString() {
2672 return "forEachObject with " + procedure;
2677 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2678 // else impl.state.barrier.inc(null, null);
2680 forEachObject(impl, subject, predicate, listener, ip);
2684 IntProcedure ip = new IntProcedure() {
2687 public void execute(ReadGraphImpl graph, int i) {
2688 procedure.execute(graph, querySupport.getResource(i));
2692 public void finished(ReadGraphImpl graph) {
2693 procedure.finished(graph);
2697 public void exception(ReadGraphImpl graph, Throwable t) {
2698 procedure.exception(graph, t);
2702 public String toString() {
2703 return "forEachObject with " + procedure;
2708 forEachObject(impl, subject, predicate, listener, ip);
2715 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2717 assert(subject != null);
2718 assert(predicate != null);
2719 assert(procedure != null);
2721 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2723 private Set<Resource> current = null;
2724 private Set<Resource> run = new HashSet<Resource>();
2727 public void execute(AsyncReadGraph graph, Resource result) {
2729 boolean found = false;
2731 if(current != null) {
2733 found = current.remove(result);
2737 if(!found) procedure.add(graph, result);
2744 public void finished(AsyncReadGraph graph) {
2746 if(current != null) {
2747 for(Resource r : current) procedure.remove(graph, r);
2752 run = new HashSet<Resource>();
2757 public boolean isDisposed() {
2758 return procedure.isDisposed();
2762 public void exception(AsyncReadGraph graph, Throwable t) {
2763 procedure.exception(graph, t);
2767 public String toString() {
2768 return "forObjectSet " + procedure;
2776 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
2778 assert(subject != null);
2779 assert(procedure != null);
2781 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
2783 private Set<Resource> current = null;
2784 private Set<Resource> run = new HashSet<Resource>();
2787 public void execute(AsyncReadGraph graph, Resource result) {
2789 boolean found = false;
2791 if(current != null) {
2793 found = current.remove(result);
2797 if(!found) procedure.add(graph, result);
2804 public void finished(AsyncReadGraph graph) {
2806 if(current != null) {
2807 for(Resource r : current) procedure.remove(graph, r);
2812 run = new HashSet<Resource>();
2817 public boolean isDisposed() {
2818 return procedure.isDisposed();
2822 public void exception(AsyncReadGraph graph, Throwable t) {
2823 procedure.exception(graph, t);
2827 public String toString() {
2828 return "forPredicateSet " + procedure;
2836 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
2838 assert(subject != null);
2839 assert(procedure != null);
2841 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
2843 private Set<Resource> current = null;
2844 private Set<Resource> run = new HashSet<Resource>();
2847 public void execute(AsyncReadGraph graph, Resource result) {
2849 boolean found = false;
2851 if(current != null) {
2853 found = current.remove(result);
2857 if(!found) procedure.add(graph, result);
2864 public void finished(AsyncReadGraph graph) {
2866 if(current != null) {
2867 for(Resource r : current) procedure.remove(graph, r);
2872 run = new HashSet<Resource>();
2877 public boolean isDisposed() {
2878 return procedure.isDisposed();
2882 public void exception(AsyncReadGraph graph, Throwable t) {
2883 procedure.exception(graph, t);
2887 public String toString() {
2888 return "forPrincipalTypeSet " + procedure;
2896 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2898 assert(subject != null);
2899 assert(predicate != null);
2900 assert(procedure != null);
2902 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2904 private Set<Resource> current = null;
2905 private Set<Resource> run = new HashSet<Resource>();
2908 public void execute(AsyncReadGraph graph, Resource result) {
2910 boolean found = false;
2912 if(current != null) {
2914 found = current.remove(result);
2918 if(!found) procedure.add(graph, result);
2925 public void finished(AsyncReadGraph graph) {
2927 if(current != null) {
2928 for(Resource r : current) procedure.remove(graph, r);
2933 run = new HashSet<Resource>();
2938 public boolean isDisposed() {
2939 return procedure.isDisposed();
2943 public void exception(AsyncReadGraph graph, Throwable t) {
2944 procedure.exception(graph, t);
2948 public String toString() {
2949 return "forObjectSet " + procedure;
2957 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2959 assert(subject != null);
2960 assert(predicate != null);
2961 assert(procedure != null);
2963 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2965 private Set<Statement> current = null;
2966 private Set<Statement> run = new HashSet<Statement>();
2969 public void execute(AsyncReadGraph graph, Statement result) {
2971 boolean found = false;
2973 if(current != null) {
2975 found = current.remove(result);
2979 if(!found) procedure.add(graph, result);
2986 public void finished(AsyncReadGraph graph) {
2988 if(current != null) {
2989 for(Statement s : current) procedure.remove(graph, s);
2994 run = new HashSet<Statement>();
2999 public boolean isDisposed() {
3000 return procedure.isDisposed();
3004 public void exception(AsyncReadGraph graph, Throwable t) {
3005 procedure.exception(graph, t);
3009 public String toString() {
3010 return "forStatementSet " + procedure;
3018 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
3019 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3021 assert(subject != null);
3022 assert(predicate != null);
3023 assert(procedure != null);
3025 final ListenerBase listener = getListenerBase(procedure);
3028 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3031 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3033 procedure.execute(graph, querySupport.getResource(o));
3034 } catch (Throwable t2) {
3035 Logger.defaultLogError(t2);
3040 public void finished(ReadGraphImpl graph) {
3042 procedure.finished(graph);
3043 } catch (Throwable t2) {
3044 Logger.defaultLogError(t2);
3046 // impl.state.barrier.dec();
3050 public void exception(ReadGraphImpl graph, Throwable t) {
3052 procedure.exception(graph, t);
3053 } catch (Throwable t2) {
3054 Logger.defaultLogError(t2);
3056 // impl.state.barrier.dec();
3060 } catch (DatabaseException e) {
3061 Logger.defaultLogError(e);
3067 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3069 assert(subject != null);
3070 assert(procedure != null);
3072 final ListenerBase listener = getListenerBase(procedure);
3074 IntProcedure ip = new IntProcedure() {
3077 public void execute(ReadGraphImpl graph, int i) {
3079 procedure.execute(graph, querySupport.getResource(i));
3080 } catch (Throwable t2) {
3081 Logger.defaultLogError(t2);
3086 public void finished(ReadGraphImpl graph) {
3088 procedure.finished(graph);
3089 } catch (Throwable t2) {
3090 Logger.defaultLogError(t2);
3092 // impl.state.barrier.dec(this);
3096 public void exception(ReadGraphImpl graph, Throwable t) {
3098 procedure.exception(graph, t);
3099 } catch (Throwable t2) {
3100 Logger.defaultLogError(t2);
3102 // impl.state.barrier.dec(this);
3107 int sId = querySupport.getId(subject);
3109 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3110 // else impl.state.barrier.inc(null, null);
3113 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3114 } catch (DatabaseException e) {
3115 Logger.defaultLogError(e);
3121 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3123 assert(subject != null);
3124 assert(procedure != null);
3126 final ListenerBase listener = getListenerBase(procedure);
3128 // impl.state.barrier.inc();
3131 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3134 public void execute(ReadGraphImpl graph, int i) {
3136 procedure.execute(querySupport.getResource(i));
3137 } catch (Throwable t2) {
3138 Logger.defaultLogError(t2);
3143 public void finished(ReadGraphImpl graph) {
3145 procedure.finished();
3146 } catch (Throwable t2) {
3147 Logger.defaultLogError(t2);
3149 // impl.state.barrier.dec();
3153 public void exception(ReadGraphImpl graph, Throwable t) {
3155 procedure.exception(t);
3156 } catch (Throwable t2) {
3157 Logger.defaultLogError(t2);
3159 // impl.state.barrier.dec();
3163 } catch (DatabaseException e) {
3164 Logger.defaultLogError(e);
3168 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3170 assert(subject != null);
3171 assert(procedure != null);
3173 final ListenerBase listener = getListenerBase(procedure);
3174 assert(listener == null);
3176 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3179 public void execute(final ReadGraphImpl graph, IntSet set) {
3180 procedure.execute(graph, set);
3184 public void exception(ReadGraphImpl graph, Throwable t) {
3185 procedure.exception(graph, t);
3190 int sId = querySupport.getId(subject);
3193 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3194 } catch (DatabaseException e) {
3195 Logger.defaultLogError(e);
3201 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3203 assert(subject != null);
3205 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3210 final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3212 assert(subject != null);
3214 return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null);
3219 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3221 assert(subject != null);
3222 assert(procedure != null);
3224 final ListenerBase listener = getListenerBase(procedure);
3227 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3229 AtomicBoolean first = new AtomicBoolean(true);
3232 public void execute(final ReadGraphImpl graph, IntSet set) {
3233 // final HashSet<Resource> result = new HashSet<Resource>();
3234 // set.forEach(new TIntProcedure() {
3237 // public boolean execute(int type) {
3238 // result.add(querySupport.getResource(type));
3244 if(first.compareAndSet(true, false)) {
3245 procedure.execute(graph, set);
3246 // impl.state.barrier.dec();
3248 procedure.execute(impl.newRestart(graph), set);
3250 } catch (Throwable t2) {
3251 Logger.defaultLogError(t2);
3256 public void exception(ReadGraphImpl graph, Throwable t) {
3258 if(first.compareAndSet(true, false)) {
3259 procedure.exception(graph, t);
3260 // impl.state.barrier.dec();
3262 procedure.exception(impl.newRestart(graph), t);
3264 } catch (Throwable t2) {
3265 Logger.defaultLogError(t2);
3270 } catch (DatabaseException e) {
3271 Logger.defaultLogError(e);
3277 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3279 assert(subject != null);
3280 assert(procedure != null);
3282 final ListenerBase listener = getListenerBase(procedure);
3284 IntProcedure ip = new IntProcedureAdapter() {
3287 public void execute(final ReadGraphImpl graph, int superRelation) {
3289 procedure.execute(graph, querySupport.getResource(superRelation));
3290 } catch (Throwable t2) {
3291 Logger.defaultLogError(t2);
3296 public void finished(final ReadGraphImpl graph) {
3298 procedure.finished(graph);
3299 } catch (Throwable t2) {
3300 Logger.defaultLogError(t2);
3302 // impl.state.barrier.dec(this);
3307 public void exception(ReadGraphImpl graph, Throwable t) {
3309 procedure.exception(graph, t);
3310 } catch (Throwable t2) {
3311 Logger.defaultLogError(t2);
3313 // impl.state.barrier.dec(this);
3318 int sId = querySupport.getId(subject);
3320 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3321 // else impl.state.barrier.inc(null, null);
3324 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3325 } catch (DatabaseException e) {
3326 Logger.defaultLogError(e);
3329 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3334 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3336 assert(subject != null);
3337 assert(procedure != null);
3339 final ListenerBase listener = getListenerBase(procedure);
3341 // impl.state.barrier.inc();
3343 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3348 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3350 assert(subject != null);
3351 assert(procedure != null);
3353 final ListenerBase listener = getListenerBase(procedure);
3355 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3358 public void execute(final ReadGraphImpl graph, IntSet set) {
3359 // final HashSet<Resource> result = new HashSet<Resource>();
3360 // set.forEach(new TIntProcedure() {
3363 // public boolean execute(int type) {
3364 // result.add(querySupport.getResource(type));
3370 procedure.execute(graph, set);
3371 } catch (Throwable t2) {
3372 Logger.defaultLogError(t2);
3374 // impl.state.barrier.dec(this);
3378 public void exception(ReadGraphImpl graph, Throwable t) {
3380 procedure.exception(graph, t);
3381 } catch (Throwable t2) {
3382 Logger.defaultLogError(t2);
3384 // impl.state.barrier.dec(this);
3389 int sId = querySupport.getId(subject);
3391 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3392 // else impl.state.barrier.inc(null, null);
3395 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3396 } catch (DatabaseException e) {
3397 Logger.defaultLogError(e);
3402 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3403 return getValue(impl, querySupport.getId(subject));
3406 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3407 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3411 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3413 assert(subject != null);
3414 assert(procedure != null);
3416 int sId = querySupport.getId(subject);
3418 // if(procedure != null) {
3420 final ListenerBase listener = getListenerBase(procedure);
3422 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3424 AtomicBoolean first = new AtomicBoolean(true);
3427 public void execute(ReadGraphImpl graph, byte[] result) {
3429 if(first.compareAndSet(true, false)) {
3430 procedure.execute(graph, result);
3431 // impl.state.barrier.dec(this);
3433 procedure.execute(impl.newRestart(graph), result);
3435 } catch (Throwable t2) {
3436 Logger.defaultLogError(t2);
3441 public void exception(ReadGraphImpl graph, Throwable t) {
3443 if(first.compareAndSet(true, false)) {
3444 procedure.exception(graph, t);
3445 // impl.state.barrier.dec(this);
3447 procedure.exception(impl.newRestart(graph), t);
3449 } catch (Throwable t2) {
3450 Logger.defaultLogError(t2);
3456 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3457 // else impl.state.barrier.inc(null, null);
3460 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3461 } catch (DatabaseException e) {
3462 throw new IllegalStateException("Internal error");
3467 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3471 // throw new IllegalStateException("Internal error");
3476 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3478 assert(subject != null);
3479 assert(procedure != null);
3481 final ListenerBase listener = getListenerBase(procedure);
3483 if(impl.parent != null || listener != null) {
3485 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3487 AtomicBoolean first = new AtomicBoolean(true);
3490 public void execute(ReadGraphImpl graph, byte[] result) {
3492 if(first.compareAndSet(true, false)) {
3493 procedure.execute(graph, result);
3494 // impl.state.barrier.dec(this);
3496 procedure.execute(impl.newRestart(graph), result);
3498 } catch (Throwable t2) {
3499 Logger.defaultLogError(t2);
3504 public void exception(ReadGraphImpl graph, Throwable t) {
3506 if(first.compareAndSet(true, false)) {
3507 procedure.exception(graph, t);
3508 // impl.state.barrier.dec(this);
3510 procedure.exception(impl.newRestart(graph), t);
3512 } catch (Throwable t2) {
3513 Logger.defaultLogError(t2);
3519 int sId = querySupport.getId(subject);
3521 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3522 // else impl.state.barrier.inc(null, null);
3525 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3526 } catch (DatabaseException e) {
3527 Logger.defaultLogError(e);
3532 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3535 public void execute(ReadGraphImpl graph, byte[] result) {
3537 procedure.execute(graph, result);
3542 public void exception(ReadGraphImpl graph, Throwable t) {
3544 procedure.exception(graph, t);
3550 int sId = querySupport.getId(subject);
3553 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3554 } catch (DatabaseException e) {
3555 Logger.defaultLogError(e);
3563 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3565 assert(relation != null);
3566 assert(procedure != null);
3568 final ListenerBase listener = getListenerBase(procedure);
3570 IntProcedure ip = new IntProcedure() {
3572 private int result = 0;
3574 final AtomicBoolean found = new AtomicBoolean(false);
3575 final AtomicBoolean done = new AtomicBoolean(false);
3578 public void finished(ReadGraphImpl graph) {
3580 // Shall fire exactly once!
3581 if(done.compareAndSet(false, true)) {
3584 procedure.exception(graph, new NoInverseException(""));
3585 // impl.state.barrier.dec(this);
3587 procedure.execute(graph, querySupport.getResource(result));
3588 // impl.state.barrier.dec(this);
3590 } catch (Throwable t) {
3591 Logger.defaultLogError(t);
3598 public void execute(ReadGraphImpl graph, int i) {
3600 if(found.compareAndSet(false, true)) {
3603 // Shall fire exactly once!
3604 if(done.compareAndSet(false, true)) {
3606 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3607 // impl.state.barrier.dec(this);
3608 } catch (Throwable t) {
3609 Logger.defaultLogError(t);
3617 public void exception(ReadGraphImpl graph, Throwable t) {
3618 // Shall fire exactly once!
3619 if(done.compareAndSet(false, true)) {
3621 procedure.exception(graph, t);
3622 // impl.state.barrier.dec(this);
3623 } catch (Throwable t2) {
3624 Logger.defaultLogError(t2);
3631 int sId = querySupport.getId(relation);
3633 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3634 // else impl.state.barrier.inc(null, null);
3637 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3638 } catch (DatabaseException e) {
3639 Logger.defaultLogError(e);
3645 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3648 assert(procedure != null);
3650 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3653 public void execute(ReadGraphImpl graph, Integer result) {
3655 procedure.execute(graph, querySupport.getResource(result));
3656 } catch (Throwable t2) {
3657 Logger.defaultLogError(t2);
3659 // impl.state.barrier.dec(this);
3663 public void exception(ReadGraphImpl graph, Throwable t) {
3666 procedure.exception(graph, t);
3667 } catch (Throwable t2) {
3668 Logger.defaultLogError(t2);
3670 // impl.state.barrier.dec(this);
3675 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3676 // else impl.state.barrier.inc(null, null);
3678 forResource(impl, id, impl.parent, ip);
3683 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3686 assert(procedure != null);
3688 // impl.state.barrier.inc();
3691 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
3694 public void execute(ReadGraphImpl graph, Integer result) {
3696 procedure.execute(graph, querySupport.getResource(result));
3697 } catch (Throwable t2) {
3698 Logger.defaultLogError(t2);
3700 // impl.state.barrier.dec();
3704 public void exception(ReadGraphImpl graph, Throwable t) {
3706 procedure.exception(graph, t);
3707 } catch (Throwable t2) {
3708 Logger.defaultLogError(t2);
3710 // impl.state.barrier.dec();
3714 } catch (DatabaseException e) {
3715 Logger.defaultLogError(e);
3721 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
3723 assert(subject != null);
3724 assert(procedure != null);
3726 final ListenerBase listener = getListenerBase(procedure);
3729 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
3730 procedure.execute(impl, !result.isEmpty());
3731 } catch (DatabaseException e) {
3732 procedure.exception(impl, e);
3738 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
3740 assert(subject != null);
3741 assert(predicate != null);
3742 assert(procedure != null);
3744 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
3746 boolean found = false;
3749 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
3754 synchronized public void finished(AsyncReadGraph graph) {
3756 procedure.execute(graph, found);
3757 } catch (Throwable t2) {
3758 Logger.defaultLogError(t2);
3760 // impl.state.barrier.dec(this);
3764 public void exception(AsyncReadGraph graph, Throwable t) {
3766 procedure.exception(graph, t);
3767 } catch (Throwable t2) {
3768 Logger.defaultLogError(t2);
3770 // impl.state.barrier.dec(this);
3775 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
3776 // else impl.state.barrier.inc(null, null);
3778 forEachObject(impl, subject, predicate, ip);
3783 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
3785 assert(subject != null);
3786 assert(predicate != null);
3787 assert(procedure != null);
3789 // impl.state.barrier.inc();
3791 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
3793 boolean found = false;
3796 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
3797 if(resource.equals(object)) found = true;
3801 synchronized public void finished(AsyncReadGraph graph) {
3803 procedure.execute(graph, found);
3804 } catch (Throwable t2) {
3805 Logger.defaultLogError(t2);
3807 // impl.state.barrier.dec();
3811 public void exception(AsyncReadGraph graph, Throwable t) {
3813 procedure.exception(graph, t);
3814 } catch (Throwable t2) {
3815 Logger.defaultLogError(t2);
3817 // impl.state.barrier.dec();
3825 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
3827 assert(subject != null);
3828 assert(procedure != null);
3830 final ListenerBase listener = getListenerBase(procedure);
3832 // impl.state.barrier.inc();
3835 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
3838 public void execute(ReadGraphImpl graph, byte[] object) {
3839 boolean result = object != null;
3841 procedure.execute(graph, result);
3842 } catch (Throwable t2) {
3843 Logger.defaultLogError(t2);
3845 // impl.state.barrier.dec();
3849 public void exception(ReadGraphImpl graph, Throwable t) {
3851 procedure.exception(graph, t);
3852 } catch (Throwable t2) {
3853 Logger.defaultLogError(t2);
3855 // impl.state.barrier.dec();
3859 } catch (DatabaseException e) {
3860 Logger.defaultLogError(e);
3866 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3868 assert(subject != null);
3869 assert(procedure != null);
3871 final ListenerBase listener = getListenerBase(procedure);
3875 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3878 public void exception(ReadGraphImpl graph, Throwable t) {
3880 procedure.exception(graph, t);
3881 } catch (Throwable t2) {
3882 Logger.defaultLogError(t2);
3884 // impl.state.barrier.dec();
3888 public void execute(ReadGraphImpl graph, int i) {
3890 procedure.execute(graph, querySupport.getResource(i));
3891 } catch (Throwable t2) {
3892 Logger.defaultLogError(t2);
3897 public void finished(ReadGraphImpl graph) {
3899 procedure.finished(graph);
3900 } catch (Throwable t2) {
3901 Logger.defaultLogError(t2);
3903 // impl.state.barrier.dec();
3907 } catch (DatabaseException e) {
3908 Logger.defaultLogError(e);
3914 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
3916 // assert(request != null);
3917 // assert(procedure != null);
3919 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
3924 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
3926 // assert(graph != null);
3927 // assert(request != null);
3929 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
3930 // if(entry != null && entry.isReady()) {
3931 // return (T)entry.get(graph, this, null);
3933 // return request.perform(graph);
3938 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
3940 // assert(graph != null);
3941 // assert(request != null);
3943 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
3944 // if(entry != null && entry.isReady()) {
3945 // if(entry.isExcepted()) {
3946 // Throwable t = (Throwable)entry.getResult();
3947 // if(t instanceof DatabaseException) throw (DatabaseException)t;
3948 // else throw new DatabaseException(t);
3950 // return (T)entry.getResult();
3954 // final DataContainer<T> result = new DataContainer<T>();
3955 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3957 // request.register(graph, new Listener<T>() {
3960 // public void exception(Throwable t) {
3961 // exception.set(t);
3965 // public void execute(T t) {
3970 // public boolean isDisposed() {
3976 // Throwable t = exception.get();
3978 // if(t instanceof DatabaseException) throw (DatabaseException)t;
3979 // else throw new DatabaseException(t);
3982 // return result.get();
3989 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
3991 // assert(graph != null);
3992 // assert(request != null);
3994 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
3995 // if(entry != null && entry.isReady()) {
3996 // if(entry.isExcepted()) {
3997 // procedure.exception(graph, (Throwable)entry.getResult());
3999 // procedure.execute(graph, (T)entry.getResult());
4002 // request.perform(graph, procedure);
4008 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final SyncMultiProcedure<T> procedure, ListenerBase listener) {
4010 assert(request != null);
4011 assert(procedure != null);
4015 queryMultiRead(impl, request, parent, listener, procedure);
4017 } catch (DatabaseException e) {
4019 throw new IllegalStateException(e);
4026 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4028 assert(request != null);
4029 assert(procedure != null);
4031 // impl.state.barrier.inc();
4033 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4035 public void execute(AsyncReadGraph graph, T result) {
4038 procedure.execute(graph, result);
4039 } catch (Throwable t2) {
4040 Logger.defaultLogError(t2);
4045 public void finished(AsyncReadGraph graph) {
4048 procedure.finished(graph);
4049 } catch (Throwable t2) {
4050 Logger.defaultLogError(t2);
4053 // impl.state.barrier.dec();
4058 public String toString() {
4059 return procedure.toString();
4063 public void exception(AsyncReadGraph graph, Throwable t) {
4066 procedure.exception(graph, t);
4067 } catch (Throwable t2) {
4068 Logger.defaultLogError(t2);
4071 // impl.state.barrier.dec();
4080 // final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
4082 // assert(request != null);
4083 // assert(procedure != null);
4087 // queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4090 // public String toString() {
4091 // return procedure.toString();
4095 // public void execute(AsyncReadGraph graph, T result) {
4097 // procedure.execute(result);
4098 // } catch (Throwable t2) {
4099 // Logger.defaultLogError(t2);
4104 // public void exception(AsyncReadGraph graph, Throwable throwable) {
4106 // procedure.exception(throwable);
4107 // } catch (Throwable t2) {
4108 // Logger.defaultLogError(t2);
4114 // } catch (DatabaseException e) {
4116 // throw new IllegalStateException(e);
4123 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4125 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4130 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4132 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4137 public VirtualGraph getValueProvider(Resource subject) {
4139 return querySupport.getValueProvider(querySupport.getId(subject));
4143 public boolean resumeTasks(ReadGraphImpl graph) {
4145 return querySupport.resume(graph);
4149 public boolean isImmutable(int resourceId) {
4150 return querySupport.isImmutable(resourceId);
4153 public boolean isImmutable(Resource resource) {
4154 ResourceImpl impl = (ResourceImpl)resource;
4155 return isImmutable(impl.id);
4160 public Layer0 getL0(ReadGraph graph) {
4162 L0 = Layer0.getInstance(graph);
4167 public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
4168 protected Integer initialValue() {