1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
35 import org.simantics.databoard.Bindings;
36 import org.simantics.db.AsyncReadGraph;
37 import org.simantics.db.DevelopmentKeys;
38 import org.simantics.db.DirectStatements;
39 import org.simantics.db.ReadGraph;
40 import org.simantics.db.RelationInfo;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
46 import org.simantics.db.common.utils.Logger;
47 import org.simantics.db.debug.ListenerReport;
48 import org.simantics.db.exception.DatabaseException;
49 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
50 import org.simantics.db.exception.NoInverseException;
51 import org.simantics.db.exception.ResourceNotFoundException;
52 import org.simantics.db.impl.DebugPolicy;
53 import org.simantics.db.impl.ResourceImpl;
54 import org.simantics.db.impl.graph.ReadGraphImpl;
55 import org.simantics.db.impl.graph.ReadGraphSupport;
56 import org.simantics.db.impl.graph.WriteGraphImpl;
57 import org.simantics.db.impl.procedure.IntProcedureAdapter;
58 import org.simantics.db.impl.procedure.InternalProcedure;
59 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
60 import org.simantics.db.impl.support.ResourceSupport;
61 import org.simantics.db.procedure.AsyncMultiListener;
62 import org.simantics.db.procedure.AsyncMultiProcedure;
63 import org.simantics.db.procedure.AsyncProcedure;
64 import org.simantics.db.procedure.AsyncSetListener;
65 import org.simantics.db.procedure.ListenerBase;
66 import org.simantics.db.procedure.MultiProcedure;
67 import org.simantics.db.procedure.StatementProcedure;
68 import org.simantics.db.procedure.SyncMultiProcedure;
69 import org.simantics.db.request.AsyncMultiRead;
70 import org.simantics.db.request.ExternalRead;
71 import org.simantics.db.request.MultiRead;
72 import org.simantics.db.request.RequestFlags;
73 import org.simantics.layer0.Layer0;
74 import org.simantics.utils.DataContainer;
75 import org.simantics.utils.Development;
76 import org.simantics.utils.datastructures.Pair;
77 import org.simantics.utils.datastructures.collections.CollectionUtils;
78 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
80 import gnu.trove.procedure.TIntProcedure;
81 import gnu.trove.procedure.TLongProcedure;
82 import gnu.trove.procedure.TObjectProcedure;
83 import gnu.trove.set.hash.THashSet;
84 import gnu.trove.set.hash.TIntHashSet;
86 @SuppressWarnings({"rawtypes", "unchecked"})
87 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
89 public static int indent = 0;
94 public int boundQueries = 0;
97 final private int functionalRelation;
99 final private int superrelationOf;
101 final private int instanceOf;
103 final private int inverseOf;
105 final private int asserts;
107 final private int hasPredicate;
109 final private int hasPredicateInverse;
111 final private int hasObject;
113 final private int inherits;
115 final private int subrelationOf;
117 final private int rootLibrary;
120 * A cache for the root library resource. Initialized in
121 * {@link #getRootLibraryResource()}.
123 private volatile ResourceImpl rootLibraryResource;
125 final private int library;
127 final private int consistsOf;
129 final private int hasName;
131 AtomicInteger sleepers = new AtomicInteger(0);
133 private boolean updating = false;
136 private boolean firingListeners = false;
138 final public QueryCache cache;
139 final public QuerySupport querySupport;
140 final public Session session;
141 final public ResourceSupport resourceSupport;
143 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
145 QueryThread[] executors;
147 // public ArrayList<SessionTask>[] queues;
149 public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
153 INIT, RUN, SLEEP, DISPOSED
157 public ThreadState[] threadStates;
158 // public ReentrantLock[] threadLocks;
159 // public Condition[] threadConditions;
161 //public ArrayList<SessionTask>[] ownTasks;
163 //public ArrayList<SessionTask>[] ownSyncTasks;
165 //ArrayList<SessionTask>[] delayQueues;
167 final Object querySupportLock;
169 public Long modificationCounter = 0L;
171 public void close() {
174 SessionTask getOwnTask(ReadGraphImpl impl) {
175 Set<ReadGraphImpl> ancestors = impl.ancestorSet();
176 synchronized(querySupportLock) {
178 while(index < freeScheduling.size()) {
179 SessionTask task = freeScheduling.get(index);
180 if(task.hasCommonParent(ancestors)) {
181 return freeScheduling.remove(index);
189 public boolean performPending(ReadGraphImpl graph) {
190 SessionTask task = getOwnTask(graph);
192 task.run(QueryProcessor.thread.get());
199 // final public void scheduleOwn(int caller, SessionTask request) {
200 // ownTasks[caller].add(request);
203 final public void schedule(SessionTask request) {
205 //int performer = request.thread;
207 // if(DebugPolicy.SCHEDULE)
208 // System.out.println("schedule " + request + " " + " -> " + performer);
210 //assert(performer >= 0);
212 assert(request != null);
214 // if(caller == performer) {
215 // request.run(caller);
218 // if(performer == THREADS) {
220 synchronized(querySupportLock) {
222 //new Exception().printStackTrace();
224 freeScheduling.add(request);
226 querySupportLock.notifyAll();
228 //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
230 // for(int i=0;i<THREADS;i++) {
231 // ReentrantLock queueLock = threadLocks[i];
233 // //queues[performer].add(request);
234 // //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
235 // threadConditions[i].signalAll();
236 // queueLock.unlock();
245 // ReentrantLock queueLock = threadLocks[performer];
247 // queues[performer].add(request);
248 // // This thread could have been sleeping
249 // if(queues[performer].size() == 1) {
250 // //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
251 // threadConditions[performer].signalAll();
253 // queueLock.unlock();
260 final public int THREAD_MASK;
262 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
264 public static abstract class SessionTask {
266 public final ReadGraphImpl graph;
267 private Set<ReadGraphImpl> ancestors;
269 public SessionTask(ReadGraphImpl graph) {
273 public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
274 if(graph == null) return false;
275 if(ancestors == null) ancestors = graph.ancestorSet();
276 return !Collections.disjoint(ancestors, otherAncestors);
279 public abstract void run(int thread);
282 public String toString() {
283 return "SessionTask[" + graph.parent + "]";
288 public static abstract class SessionRead extends SessionTask {
290 final public Semaphore notify;
291 final public DataContainer<Throwable> throwable;
293 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
295 this.throwable = throwable;
296 this.notify = notify;
301 long waitingTime = 0;
304 static int koss2 = 0;
306 public boolean resume(ReadGraphImpl graph) {
307 return executors[0].runSynchronized();
310 //private WeakReference<GarbageTracker> garbageTracker;
312 private class GarbageTracker {
315 protected void finalize() throws Throwable {
317 // System.err.println("GarbageTracker");
319 // garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
327 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
328 throws DatabaseException {
330 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
333 THREAD_MASK = threads - 1;
336 cache = new QueryCache(core, threads);
337 session = querySupport.getSession();
338 resourceSupport = querySupport.getSupport();
339 querySupportLock = core.getLock();
341 executors = new QueryThread[THREADS];
342 // queues = new ArrayList[THREADS];
343 // threadLocks = new ReentrantLock[THREADS];
344 // threadConditions = new Condition[THREADS];
345 threadStates = new ThreadState[THREADS];
346 // ownTasks = new ArrayList[THREADS];
347 // ownSyncTasks = new ArrayList[THREADS];
348 // delayQueues = new ArrayList[THREADS * THREADS];
350 // freeSchedule = new AtomicInteger(0);
352 // for (int i = 0; i < THREADS * THREADS; i++) {
353 // delayQueues[i] = new ArrayList<SessionTask>();
356 for (int i = 0; i < THREADS; i++) {
358 // tasks[i] = new ArrayList<Runnable>();
359 // ownTasks[i] = new ArrayList<SessionTask>();
360 // ownSyncTasks[i] = new ArrayList<SessionTask>();
361 // queues[i] = new ArrayList<SessionTask>();
362 // threadLocks[i] = new ReentrantLock();
363 // threadConditions[i] = threadLocks[i].newCondition();
364 // limits[i] = false;
365 threadStates[i] = ThreadState.INIT;
369 for (int i = 0; i < THREADS; i++) {
373 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
375 threadSet.add(executors[i]);
380 for (int i = 0; i < THREADS; i++) {
381 executors[i].start();
384 // Make sure that query threads are up and running
385 while(sleepers.get() != THREADS) {
388 } catch (InterruptedException e) {
393 rootLibrary = core.getBuiltin("http:/");
394 boolean builtinsInstalled = rootLibrary != 0;
396 if (builtinsInstalled) {
397 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
398 assert (functionalRelation != 0);
400 functionalRelation = 0;
402 if (builtinsInstalled) {
403 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
404 assert (instanceOf != 0);
408 if (builtinsInstalled) {
409 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
410 assert (inverseOf != 0);
415 if (builtinsInstalled) {
416 inherits = core.getBuiltin(Layer0.URIs.Inherits);
417 assert (inherits != 0);
421 if (builtinsInstalled) {
422 asserts = core.getBuiltin(Layer0.URIs.Asserts);
423 assert (asserts != 0);
427 if (builtinsInstalled) {
428 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
429 assert (hasPredicate != 0);
433 if (builtinsInstalled) {
434 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
435 assert (hasPredicateInverse != 0);
437 hasPredicateInverse = 0;
439 if (builtinsInstalled) {
440 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
441 assert (hasObject != 0);
445 if (builtinsInstalled) {
446 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
447 assert (subrelationOf != 0);
451 if (builtinsInstalled) {
452 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
453 assert (superrelationOf != 0);
457 if (builtinsInstalled) {
458 library = core.getBuiltin(Layer0.URIs.Library);
459 assert (library != 0);
463 if (builtinsInstalled) {
464 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
465 assert (consistsOf != 0);
469 if (builtinsInstalled) {
470 hasName = core.getBuiltin(Layer0.URIs.HasName);
471 assert (hasName != 0);
477 final public void releaseWrite(ReadGraphImpl graph) {
478 performDirtyUpdates(graph);
479 modificationCounter++;
482 final public int getId(final Resource r) {
483 return querySupport.getId(r);
486 public QuerySupport getCore() {
490 public int getFunctionalRelation() {
491 return functionalRelation;
494 public int getInherits() {
498 public int getInstanceOf() {
502 public int getInverseOf() {
506 public int getSubrelationOf() {
507 return subrelationOf;
510 public int getSuperrelationOf() {
511 return superrelationOf;
514 public int getAsserts() {
518 public int getHasPredicate() {
522 public int getHasPredicateInverse() {
523 return hasPredicateInverse;
526 public int getHasObject() {
530 public int getRootLibrary() {
534 public Resource getRootLibraryResource() {
535 if (rootLibraryResource == null) {
536 // Synchronization is not needed here, it doesn't matter if multiple
537 // threads simultaneously set rootLibraryResource once.
538 int root = getRootLibrary();
540 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
541 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
543 return rootLibraryResource;
546 public int getLibrary() {
550 public int getConsistsOf() {
554 public int getHasName() {
558 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
562 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
565 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
567 if (result != null && result != 0) {
568 procedure.execute(graph, result);
572 // Fall back to using the fixed builtins.
573 // result = querySupport.getBuiltin(id);
574 // if (result != 0) {
575 // procedure.execute(graph, result);
580 // result = querySupport.getRandomAccessReference(id);
581 // } catch (ResourceNotFoundException e) {
582 // procedure.exception(graph, e);
587 procedure.execute(graph, result);
589 procedure.exception(graph, new ResourceNotFoundException(id));
595 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
596 procedure.exception(graph, t);
600 } catch (DatabaseException e) {
604 procedure.exception(graph, e);
606 } catch (DatabaseException e1) {
608 Logger.defaultLogError(e1);
616 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
618 Integer result = querySupport.getBuiltin(id);
620 procedure.execute(graph, result);
622 procedure.exception(graph, new ResourceNotFoundException(id));
627 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
630 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
631 } catch (DatabaseException e) {
632 throw new IllegalStateException(e);
637 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
641 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
642 } catch (DatabaseException e) {
643 throw new IllegalStateException(e);
648 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
649 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
653 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
655 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
659 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
661 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
665 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
667 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
671 boolean isBound(ExternalReadEntry<?> entry) {
672 if(entry.hasParents()) return true;
673 else if(hasListener(entry)) return true;
677 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
679 if (parent != null && !inferred) {
681 if(!child.isImmutable(graph))
682 child.addParent(parent);
683 } catch (DatabaseException e) {
684 Logger.defaultLogError(e);
686 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
689 if (listener != null) {
690 return registerListener(child, listener, procedure);
698 static class Dummy implements InternalProcedure<Object>, IntProcedure {
701 public void execute(ReadGraphImpl graph, int i) {
705 public void finished(ReadGraphImpl graph) {
709 public void execute(ReadGraphImpl graph, Object result) {
713 public void exception(ReadGraphImpl graph, Throwable throwable) {
718 private static final Dummy dummy = new Dummy();
721 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
723 if (DebugPolicy.PERFORM)
724 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
727 assert (!collecting);
729 assert(query.assertNotDiscarded());
731 registerDependencies(graph, query, parent, listener, procedure, false);
733 // FRESH, REFUTED, EXCEPTED go here
734 if (!query.isReady()) {
739 query.computeForEach(graph, this, (Procedure)dummy, true);
740 return query.get(graph, this, null);
746 return query.get(graph, this, procedure);
754 interface QueryCollectorSupport {
755 public CacheCollectionResult allCaches();
756 public Collection<CacheEntry> getRootList();
757 public int getCurrentSize();
758 public int calculateCurrentSize();
759 public CacheEntryBase iterate(int level);
760 public void remove();
761 public void setLevel(CacheEntryBase entry, int level);
762 public boolean start(boolean flush);
765 interface QueryCollector {
767 public void collect(int youngTarget, int allowedTimeInMs);
771 class QueryCollectorSupportImpl implements QueryCollectorSupport {
773 private static final boolean DEBUG = false;
774 private static final double ITERATION_RATIO = 0.2;
776 private CacheCollectionResult iteration = new CacheCollectionResult();
777 private boolean fresh = true;
778 private boolean needDataInStart = true;
780 QueryCollectorSupportImpl() {
784 public CacheCollectionResult allCaches() {
785 CacheCollectionResult result = new CacheCollectionResult();
786 QueryProcessor.this.allCaches(result);
791 public boolean start(boolean flush) {
792 // We need new data from query maps
794 if(needDataInStart || flush) {
795 // Last run ended after processing all queries => refresh data
796 restart(flush ? 0.0 : ITERATION_RATIO);
798 // continue with previous big data
800 // Notify caller about iteration situation
801 return iteration.isAtStart();
804 private void restart(double targetRatio) {
806 needDataInStart = true;
808 long start = System.nanoTime();
811 // We need new data from query maps
813 int iterationSize = iteration.size()+1;
814 int diff = calculateCurrentSize()-iterationSize;
816 double ratio = (double)diff / (double)iterationSize;
817 boolean dirty = Math.abs(ratio) >= targetRatio;
820 iteration = allCaches();
822 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
823 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
824 System.err.print(" " + iteration.levels[i].size());
825 System.err.println("");
832 needDataInStart = false;
834 // We are returning here within the same GC round - reuse the cache table
843 public CacheEntryBase iterate(int level) {
845 CacheEntryBase entry = iteration.next(level);
847 restart(ITERATION_RATIO);
851 while(entry != null && entry.isDiscarded()) {
852 entry = iteration.next(level);
860 public void remove() {
865 public void setLevel(CacheEntryBase entry, int level) {
866 iteration.setLevel(entry, level);
869 public Collection<CacheEntry> getRootList() {
870 return cache.getRootList();
874 public int calculateCurrentSize() {
875 return cache.calculateCurrentSize();
879 public int getCurrentSize() {
884 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
886 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
887 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
889 public int querySize() {
893 public void gc(int youngTarget, int allowedTimeInMs) {
895 collector.collect(youngTarget, allowedTimeInMs);
899 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
901 assert (entry != null);
903 if (base.isDisposed())
906 return addListener(entry, base, procedure);
910 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
911 entry.setLastKnown(result);
914 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
916 assert (entry != null);
917 assert (procedure != null);
919 ArrayList<ListenerEntry> list = cache.listeners.get(entry);
921 list = new ArrayList<ListenerEntry>(1);
922 cache.listeners.put(entry, list);
925 ListenerEntry result = new ListenerEntry(entry, base, procedure);
926 int currentIndex = list.indexOf(result);
927 // There was already a listener
928 if(currentIndex > -1) {
929 ListenerEntry current = list.get(currentIndex);
930 if(!current.base.isDisposed()) return null;
931 list.set(currentIndex, result);
936 if(DebugPolicy.LISTENER) {
937 new Exception().printStackTrace();
938 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
945 private void scheduleListener(ListenerEntry entry) {
946 assert (entry != null);
947 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
948 scheduledListeners.add(entry);
951 private void removeListener(ListenerEntry entry) {
952 assert (entry != null);
953 ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
954 if(list == null) return;
955 boolean success = list.remove(entry);
958 cache.listeners.remove(entry.entry);
961 private boolean hasListener(CacheEntry entry) {
962 if(cache.listeners.get(entry) != null) return true;
966 boolean hasListenerAfterDisposing(CacheEntry entry) {
967 if(cache.listeners.get(entry) != null) {
968 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
969 ArrayList<ListenerEntry> list = null;
970 for (ListenerEntry e : entries) {
971 if (e.base.isDisposed()) {
972 if(list == null) list = new ArrayList<ListenerEntry>();
977 for (ListenerEntry e : list) {
981 if (entries.isEmpty()) {
982 cache.listeners.remove(entry);
990 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
991 hasListenerAfterDisposing(entry);
992 if(cache.listeners.get(entry) != null)
993 return cache.listeners.get(entry);
995 return Collections.emptyList();
998 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
1000 if(!workarea.containsKey(entry)) {
1002 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
1003 for(ListenerEntry e : getListenerEntries(entry))
1006 workarea.put(entry, ls);
1008 for(CacheEntry parent : entry.getParents(this)) {
1009 processListenerReport(parent, workarea);
1010 ls.addAll(workarea.get(parent));
1017 public synchronized ListenerReport getListenerReport() throws IOException {
1019 class ListenerReportImpl implements ListenerReport {
1021 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
1024 public void print(PrintStream b) {
1025 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
1026 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
1027 for(ListenerBase l : e.getValue()) {
1028 Integer i = hist.get(l);
1029 hist.put(l, i != null ? i-1 : -1);
1033 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1034 b.print("" + -p.second + " " + p.first + "\n");
1042 ListenerReportImpl result = new ListenerReportImpl();
1044 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1045 for(CacheEntryBase entry : all) {
1046 hasListenerAfterDisposing(entry);
1048 for(CacheEntryBase entry : all) {
1049 processListenerReport(entry, result.workarea);
1056 public synchronized String reportListeners(File file) throws IOException {
1061 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1062 ListenerReport report = getListenerReport();
1065 return "Done reporting listeners.";
1069 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1071 if(entry.isDiscarded()) return;
1072 if(workarea.containsKey(entry)) return;
1074 Iterable<CacheEntry> parents = entry.getParents(this);
1075 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1076 for(CacheEntry e : parents) {
1077 if(e.isDiscarded()) continue;
1079 processParentReport(e, workarea);
1081 workarea.put(entry, ps);
1085 public synchronized String reportQueryActivity(File file) throws IOException {
1087 System.err.println("reportQueries " + file.getAbsolutePath());
1092 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1094 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
1095 Collections.reverse(entries);
1097 for(Pair<String,Integer> entry : entries) {
1098 b.println(entry.first + ": " + entry.second);
1103 Development.histogram.clear();
1109 public synchronized String reportQueries(File file) throws IOException {
1111 System.err.println("reportQueries " + file.getAbsolutePath());
1116 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1118 long start = System.nanoTime();
1120 // ArrayList<CacheEntry> all = ;
1122 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
1123 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1124 for(CacheEntryBase entry : caches) {
1125 processParentReport(entry, workarea);
1128 // for(CacheEntry e : all) System.err.println("entry: " + e);
1130 long duration = System.nanoTime() - start;
1131 System.err.println("Query root set in " + 1e-9*duration + "s.");
1133 start = System.nanoTime();
1135 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
1139 for(CacheEntry entry : workarea.keySet()) {
1140 boolean listener = hasListenerAfterDisposing(entry);
1141 boolean hasParents = entry.getParents(this).iterator().hasNext();
1144 flagMap.put(entry, 0);
1145 } else if (!hasParents) {
1147 flagMap.put(entry, 1);
1150 flagMap.put(entry, 2);
1152 // // Write leaf bit
1153 // entry.flags |= 4;
1156 boolean done = true;
1163 long start2 = System.nanoTime();
1165 int boundCounter = 0;
1166 int unboundCounter = 0;
1167 int unknownCounter = 0;
1169 for(CacheEntry<?> entry : workarea.keySet()) {
1171 //System.err.println("process " + entry);
1173 int flags = flagMap.get(entry);
1174 int bindStatus = flags & 3;
1176 if(bindStatus == 0) boundCounter++;
1177 else if(bindStatus == 1) unboundCounter++;
1178 else if(bindStatus == 2) unknownCounter++;
1180 if(bindStatus < 2) continue;
1183 for(CacheEntry parent : entry.getParents(this)) {
1185 if(parent.isDiscarded()) flagMap.put(parent, 1);
1187 int flags2 = flagMap.get(parent);
1188 int bindStatus2 = flags2 & 3;
1189 // Parent is bound => child is bound
1190 if(bindStatus2 == 0) {
1194 // Parent is unknown => child is unknown
1195 else if (bindStatus2 == 2) {
1202 flagMap.put(entry, newStatus);
1206 duration = System.nanoTime() - start2;
1207 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1208 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1210 } while(!done && loops++ < 20);
1214 for(CacheEntry entry : workarea.keySet()) {
1216 int bindStatus = flagMap.get(entry);
1217 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1223 duration = System.nanoTime() - start;
1224 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1226 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1228 for(CacheEntry entry : workarea.keySet()) {
1229 Class<?> clazz = entry.getClass();
1230 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
1231 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
1232 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
1233 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
1234 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
1235 Integer c = counts.get(clazz);
1236 if(c == null) counts.put(clazz, -1);
1237 else counts.put(clazz, c-1);
1240 b.print("// Simantics DB client query report file\n");
1241 b.print("// This file contains the following information\n");
1242 b.print("// -The amount of cached query instances per query class\n");
1243 b.print("// -The sizes of retained child sets\n");
1244 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1245 b.print("// -Followed by status, where\n");
1246 b.print("// -0=bound\n");
1247 b.print("// -1=free\n");
1248 b.print("// -2=unknown\n");
1249 b.print("// -L=has listener\n");
1250 b.print("// -List of children for each query (search for 'C <query name>')\n");
1252 b.print("----------------------------------------\n");
1254 b.print("// Queries by class\n");
1255 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1256 b.print(-p.second + " " + p.first.getName() + "\n");
1259 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1260 for(CacheEntry e : workarea.keySet())
1263 boolean changed = true;
1265 while(changed && iter++<50) {
1269 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1270 for(CacheEntry e : workarea.keySet())
1273 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1274 Integer c = hist.get(e.getKey());
1275 for(CacheEntry p : e.getValue()) {
1276 Integer i = newHist.get(p);
1277 newHist.put(p, i+c);
1280 for(CacheEntry e : workarea.keySet()) {
1281 Integer value = newHist.get(e);
1282 Integer old = hist.get(e);
1283 if(!value.equals(old)) {
1285 // System.err.println("hist " + e + ": " + old + " => " + value);
1290 System.err.println("Retained set iteration " + iter);
1294 b.print("// Queries by retained set\n");
1295 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1296 b.print("" + -p.second + " " + p.first + "\n");
1299 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1301 b.print("// Entry parent listing\n");
1302 for(CacheEntry entry : workarea.keySet()) {
1303 int status = flagMap.get(entry);
1304 boolean hasListener = hasListenerAfterDisposing(entry);
1305 b.print("Q " + entry.toString());
1307 b.print(" (L" + status + ")");
1310 b.print(" (" + status + ")");
1313 for(CacheEntry parent : workarea.get(entry)) {
1314 Collection<CacheEntry> inv = inverse.get(parent);
1316 inv = new ArrayList<CacheEntry>();
1317 inverse.put(parent, inv);
1320 b.print(" " + parent.toString());
1325 b.print("// Entry child listing\n");
1326 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1327 b.print("C " + entry.getKey().toString());
1329 for(CacheEntry child : entry.getValue()) {
1330 Integer h = hist.get(child);
1334 b.print(" <no children>");
1336 b.print(" " + child.toString());
1341 b.print("#queries: " + workarea.keySet().size() + "\n");
1342 b.print("#listeners: " + listeners + "\n");
1346 return "Dumped " + workarea.keySet().size() + " queries.";
1352 public CacheEntry caller;
1354 public CacheEntry entry;
1358 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
1359 this.caller = caller;
1361 this.indent = indent;
1366 boolean removeQuery(CacheEntry entry) {
1368 // This entry has been removed before. No need to do anything here.
1369 if(entry.isDiscarded()) return false;
1371 assert (!entry.isDiscarded());
1373 Query query = entry.getQuery();
1375 query.removeEntry(this);
1380 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1391 * @return true if this entry is being listened
1393 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1397 CacheEntry entry = e.entry;
1399 //System.err.println("updateQuery " + entry);
1402 * If the dependency graph forms a DAG, some entries are inserted in the
1403 * todo list many times. They only need to be processed once though.
1405 if (entry.isDiscarded()) {
1406 if (Development.DEVELOPMENT) {
1407 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1408 System.out.print("D");
1409 for (int i = 0; i < e.indent; i++)
1410 System.out.print(" ");
1411 System.out.println(entry.getQuery());
1414 // System.err.println(" => DISCARDED");
1418 if (entry.isRefuted()) {
1419 if (Development.DEVELOPMENT) {
1420 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1421 System.out.print("R");
1422 for (int i = 0; i < e.indent; i++)
1423 System.out.print(" ");
1424 System.out.println(entry.getQuery());
1430 if (entry.isExcepted()) {
1431 if (Development.DEVELOPMENT) {
1432 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1433 System.out.print("E");
1438 if (entry.isPending()) {
1439 if (Development.DEVELOPMENT) {
1440 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1441 System.out.print("P");
1448 if (Development.DEVELOPMENT) {
1449 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1450 System.out.print("U ");
1451 for (int i = 0; i < e.indent; i++)
1452 System.out.print(" ");
1453 System.out.print(entry.getQuery());
1457 Query query = entry.getQuery();
1458 int type = query.type();
1460 boolean hasListener = hasListener(entry);
1462 if (Development.DEVELOPMENT) {
1463 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1464 if(hasListener(entry)) {
1465 System.out.println(" (L)");
1467 System.out.println("");
1472 if(entry.isPending() || entry.isExcepted()) {
1475 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1477 immediates.put(entry, entry);
1492 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1494 immediates.put(entry, entry);
1508 // System.err.println(" => FOO " + type);
1511 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1512 if(entries != null) {
1513 for (ListenerEntry le : entries) {
1514 scheduleListener(le);
1519 // If invalid, update parents
1520 if (type == RequestFlags.INVALIDATE) {
1521 updateParents(e.indent, entry, todo);
1528 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
1530 Iterable<CacheEntry> oldParents = entry.getParents(this);
1531 for (CacheEntry parent : oldParents) {
1532 // System.err.println("updateParents " + entry + " => " + parent);
1533 if(!parent.isDiscarded())
1534 todo.push(new UpdateEntry(entry, parent, indent + 2));
1539 private boolean pruneListener(ListenerEntry entry) {
1540 if (entry.base.isDisposed()) {
1541 removeListener(entry);
1549 * @param av1 an array (guaranteed)
1550 * @param av2 any object
1551 * @return <code>true</code> if the two arrays are equal
1553 private final boolean arrayEquals(Object av1, Object av2) {
1556 Class<?> c1 = av1.getClass().getComponentType();
1557 Class<?> c2 = av2.getClass().getComponentType();
1558 if (c2 == null || !c1.equals(c2))
1560 boolean p1 = c1.isPrimitive();
1561 boolean p2 = c2.isPrimitive();
1565 return Arrays.equals((Object[]) av1, (Object[]) av2);
1566 if (boolean.class.equals(c1))
1567 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1568 else if (byte.class.equals(c1))
1569 return Arrays.equals((byte[]) av1, (byte[]) av2);
1570 else if (int.class.equals(c1))
1571 return Arrays.equals((int[]) av1, (int[]) av2);
1572 else if (long.class.equals(c1))
1573 return Arrays.equals((long[]) av1, (long[]) av2);
1574 else if (float.class.equals(c1))
1575 return Arrays.equals((float[]) av1, (float[]) av2);
1576 else if (double.class.equals(c1))
1577 return Arrays.equals((double[]) av1, (double[]) av2);
1578 throw new RuntimeException("??? Contact application querySupport.");
1583 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1587 Query query = entry.getQuery();
1589 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
1591 entry.prepareRecompute(querySupport);
1593 ReadGraphImpl parentGraph = graph.forRecompute(entry);
1595 query.recompute(parentGraph);
1597 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1599 Object newValue = entry.getResult();
1601 if (ListenerEntry.NO_VALUE == oldValue) {
1602 if(DebugPolicy.CHANGES) {
1603 System.out.println("C " + query);
1604 System.out.println("- " + oldValue);
1605 System.out.println("- " + newValue);
1610 boolean changed = false;
1612 if (newValue != null) {
1613 if (newValue.getClass().isArray()) {
1614 changed = !arrayEquals(newValue, oldValue);
1616 changed = !newValue.equals(oldValue);
1619 changed = (oldValue != null);
1621 if(DebugPolicy.CHANGES && changed) {
1622 System.out.println("C " + query);
1623 System.out.println("- " + oldValue);
1624 System.out.println("- " + newValue);
1627 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1629 } catch (Throwable t) {
1631 Logger.defaultLogError(t);
1633 return ListenerEntry.NO_VALUE;
1639 public boolean hasScheduledUpdates() {
1640 return !scheduledListeners.isEmpty();
1643 public void performScheduledUpdates(WriteGraphImpl graph) {
1646 assert (!cache.collecting);
1647 assert (!firingListeners);
1649 firingListeners = true;
1653 // Performing may cause further events to be scheduled.
1654 while (!scheduledListeners.isEmpty()) {
1657 // graph.state.barrier.inc();
1659 // Clone current events to make new entries possible during
1661 THashSet<ListenerEntry> entries = scheduledListeners;
1662 scheduledListeners = new THashSet<ListenerEntry>();
1664 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
1666 for (ListenerEntry listenerEntry : entries) {
1668 if (pruneListener(listenerEntry)) {
1669 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
1673 final CacheEntry entry = listenerEntry.entry;
1674 assert (entry != null);
1676 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
1678 if (newValue != ListenerEntry.NOT_CHANGED) {
1679 if(DebugPolicy.LISTENER)
1680 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
1681 schedule.add(listenerEntry);
1682 listenerEntry.setLastKnown(entry.getResult());
1687 for(ListenerEntry listenerEntry : schedule) {
1688 final CacheEntry entry = listenerEntry.entry;
1689 if(DebugPolicy.LISTENER)
1690 System.out.println("Firing " + listenerEntry.procedure);
1692 if(DebugPolicy.LISTENER)
1693 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
1694 entry.performFromCache(graph, listenerEntry.procedure);
1695 } catch (Throwable t) {
1696 t.printStackTrace();
1700 // graph.state.barrier.dec();
1701 // graph.waitAsync(null);
1702 // graph.state.barrier.assertReady();
1707 firingListeners = false;
1714 * @return true if this entry still has listeners
1716 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1718 assert (!cache.collecting);
1722 boolean hadListeners = false;
1723 boolean listenersUnknown = false;
1727 assert(entry != null);
1728 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1729 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1730 todo.add(new UpdateEntry(null, entry, 0));
1734 // Walk the tree and collect immediate updates
1735 while (!todo.isEmpty()) {
1736 UpdateEntry e = todo.pop();
1737 hadListeners |= updateQuery(e, todo, immediates);
1740 if(immediates.isEmpty()) break;
1742 // Evaluate all immediate updates and collect parents to update
1743 for(CacheEntry immediate : immediates.values()) {
1745 if(immediate.isDiscarded()) {
1749 if(immediate.isExcepted()) {
1751 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1752 if (newValue != ListenerEntry.NOT_CHANGED)
1753 updateParents(0, immediate, todo);
1757 Object oldValue = immediate.getResult();
1758 Object newValue = compareTo(graph, immediate, oldValue);
1760 if (newValue != ListenerEntry.NOT_CHANGED) {
1761 updateParents(0, immediate, todo);
1763 // If not changed, keep the old value
1764 immediate.setResult(oldValue);
1765 immediate.setReady();
1766 listenersUnknown = true;
1776 } catch (Throwable t) {
1777 Logger.defaultLogError(t);
1783 return hadListeners | listenersUnknown;
1787 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1788 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1789 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1790 // Maybe use a mutex from util.concurrent?
1791 private Object primitiveUpdateLock = new Object();
1792 private THashSet scheduledPrimitiveUpdates = new THashSet();
1794 public void performDirtyUpdates(final ReadGraphImpl graph) {
1796 cache.dirty = false;
1799 if (Development.DEVELOPMENT) {
1800 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1801 System.err.println("== Query update ==");
1805 // Special case - one statement
1806 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1808 long arg0 = scheduledObjectUpdates.getFirst();
1810 final int subject = (int)(arg0 >>> 32);
1811 final int predicate = (int)(arg0 & 0xffffffff);
1813 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1814 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1815 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1817 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1818 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1819 if(principalTypes != null) update(graph, principalTypes);
1820 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1821 if(types != null) update(graph, types);
1824 if(predicate == subrelationOf) {
1825 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1826 if(superRelations != null) update(graph, superRelations);
1829 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1830 if(dp != null) update(graph, dp);
1831 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1832 if(os != null) update(graph, os);
1834 scheduledObjectUpdates.clear();
1839 // Special case - one value
1840 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1842 int arg0 = scheduledValueUpdates.getFirst();
1844 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1845 if(valueQuery != null) update(graph, valueQuery);
1847 scheduledValueUpdates.clear();
1852 final TIntHashSet predicates = new TIntHashSet();
1853 final TIntHashSet orderedSets = new TIntHashSet();
1855 THashSet primitiveUpdates;
1856 synchronized (primitiveUpdateLock) {
1857 primitiveUpdates = scheduledPrimitiveUpdates;
1858 scheduledPrimitiveUpdates = new THashSet();
1861 primitiveUpdates.forEach(new TObjectProcedure() {
1864 public boolean execute(Object arg0) {
1866 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1867 if (query != null) {
1868 boolean listening = update(graph, query);
1869 if (!listening && !query.hasParents()) {
1870 cache.externalReadEntryMap.remove(arg0);
1879 scheduledValueUpdates.forEach(new TIntProcedure() {
1882 public boolean execute(int arg0) {
1883 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1884 if(valueQuery != null) update(graph, valueQuery);
1890 scheduledInvalidates.forEach(new TIntProcedure() {
1893 public boolean execute(int resource) {
1895 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1896 if(valueQuery != null) update(graph, valueQuery);
1898 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1899 if(principalTypes != null) update(graph, principalTypes);
1900 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1901 if(types != null) update(graph, types);
1903 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1904 if(superRelations != null) update(graph, superRelations);
1906 predicates.add(resource);
1913 scheduledObjectUpdates.forEach(new TLongProcedure() {
1916 public boolean execute(long arg0) {
1918 final int subject = (int)(arg0 >>> 32);
1919 final int predicate = (int)(arg0 & 0xffffffff);
1921 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1922 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1923 if(principalTypes != null) update(graph, principalTypes);
1924 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1925 if(types != null) update(graph, types);
1928 if(predicate == subrelationOf) {
1929 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1930 if(superRelations != null) update(graph, superRelations);
1933 predicates.add(subject);
1934 orderedSets.add(predicate);
1942 predicates.forEach(new TIntProcedure() {
1945 public boolean execute(final int subject) {
1947 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1948 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1949 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1951 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1952 if(entry != null) update(graph, entry);
1960 orderedSets.forEach(new TIntProcedure() {
1963 public boolean execute(int orderedSet) {
1965 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1966 if(entry != null) update(graph, entry);
1974 // for (Integer subject : predicates) {
1975 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
1976 // if(entry != null) update(graph, entry);
1980 if (Development.DEVELOPMENT) {
1981 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1982 System.err.println("== Query update ends ==");
1986 scheduledValueUpdates.clear();
1987 scheduledObjectUpdates.clear();
1988 scheduledInvalidates.clear();
1992 public void updateValue(final int resource) {
1993 scheduledValueUpdates.add(resource);
1997 public void updateStatements(final int resource, final int predicate) {
1998 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
2002 private int lastInvalidate = 0;
2004 public void invalidateResource(final int resource) {
2005 if(lastInvalidate == resource) return;
2006 scheduledValueUpdates.add(resource);
2007 lastInvalidate = resource;
2011 public void updatePrimitive(final ExternalRead primitive) {
2013 // External reads may be updated from arbitrary threads.
2014 // Synchronize to prevent race-conditions.
2015 synchronized (primitiveUpdateLock) {
2016 scheduledPrimitiveUpdates.add(primitive);
2018 querySupport.dirtyPrimitives();
2023 public synchronized String toString() {
2024 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
2028 protected void doDispose() {
2030 for(int index = 0; index < THREADS; index++) {
2031 executors[index].dispose();
2035 for(int i=0;i<100;i++) {
2037 boolean alive = false;
2038 for(int index = 0; index < THREADS; index++) {
2039 alive |= executors[index].isAlive();
2044 } catch (InterruptedException e) {
2045 Logger.defaultLogError(e);
2050 // Then start interrupting
2051 for(int i=0;i<100;i++) {
2053 boolean alive = false;
2054 for(int index = 0; index < THREADS; index++) {
2055 alive |= executors[index].isAlive();
2058 for(int index = 0; index < THREADS; index++) {
2059 executors[index].interrupt();
2063 // // Then just destroy
2064 // for(int index = 0; index < THREADS; index++) {
2065 // executors[index].destroy();
2068 for(int index = 0; index < THREADS; index++) {
2070 executors[index].join(5000);
2071 } catch (InterruptedException e) {
2072 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2074 executors[index] = null;
2079 public int getHits() {
2083 public int getMisses() {
2084 return cache.misses;
2087 public int getSize() {
2091 public Set<Long> getReferencedClusters() {
2092 HashSet<Long> result = new HashSet<Long>();
2093 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
2094 Objects query = (Objects) entry.getQuery();
2095 result.add(querySupport.getClusterId(query.r1()));
2097 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
2098 DirectPredicates query = (DirectPredicates) entry.getQuery();
2099 result.add(querySupport.getClusterId(query.id));
2101 for (CacheEntry entry : cache.valueQueryMap.values()) {
2102 ValueQuery query = (ValueQuery) entry.getQuery();
2103 result.add(querySupport.getClusterId(query.id));
2108 public void assertDone() {
2111 CacheCollectionResult allCaches(CacheCollectionResult result) {
2113 return cache.allCaches(result);
2117 public void printDiagnostics() {
2120 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
2121 querySupport.requestCluster(graph, clusterId, runnable);
2124 public int clean() {
2125 collector.collect(0, Integer.MAX_VALUE);
2129 public void clean(final Collection<ExternalRead<?>> requests) {
2130 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
2131 Iterator<ExternalRead<?>> iterator = requests.iterator();
2133 public CacheCollectionResult allCaches() {
2134 throw new UnsupportedOperationException();
2137 public CacheEntryBase iterate(int level) {
2138 if(iterator.hasNext()) {
2139 ExternalRead<?> request = iterator.next();
2140 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2141 if (entry != null) return entry;
2142 else return iterate(level);
2144 iterator = requests.iterator();
2149 public void remove() {
2150 throw new UnsupportedOperationException();
2153 public void setLevel(CacheEntryBase entry, int level) {
2154 throw new UnsupportedOperationException();
2157 public Collection<CacheEntry> getRootList() {
2158 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
2159 for (ExternalRead<?> request : requests) {
2160 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2167 public int getCurrentSize() {
2171 public int calculateCurrentSize() {
2172 // This tells the collector to attempt collecting everything.
2173 return Integer.MAX_VALUE;
2176 public boolean start(boolean flush) {
2180 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
2183 public void scanPending() {
2185 cache.scanPending();
2189 public ReadGraphImpl graphForVirtualRequest() {
2190 return ReadGraphImpl.createAsync(this);
2194 private HashMap<Resource, Class<?>> builtinValues;
2196 public Class<?> getBuiltinValue(Resource r) {
2197 if(builtinValues == null) initBuiltinValues();
2198 return builtinValues.get(r);
2201 Exception callerException = null;
2203 public interface AsyncBarrier {
2206 // public void inc(String debug);
2207 // public void dec(String debug);
2210 // final public QueryProcessor processor;
2211 // final public QuerySupport support;
2213 // boolean disposed = false;
2215 private void initBuiltinValues() {
2217 Layer0 b = getSession().peekService(Layer0.class);
2218 if(b == null) return;
2220 builtinValues = new HashMap<Resource, Class<?>>();
2222 builtinValues.put(b.String, String.class);
2223 builtinValues.put(b.Double, Double.class);
2224 builtinValues.put(b.Float, Float.class);
2225 builtinValues.put(b.Long, Long.class);
2226 builtinValues.put(b.Integer, Integer.class);
2227 builtinValues.put(b.Byte, Byte.class);
2228 builtinValues.put(b.Boolean, Boolean.class);
2230 builtinValues.put(b.StringArray, String[].class);
2231 builtinValues.put(b.DoubleArray, double[].class);
2232 builtinValues.put(b.FloatArray, float[].class);
2233 builtinValues.put(b.LongArray, long[].class);
2234 builtinValues.put(b.IntegerArray, int[].class);
2235 builtinValues.put(b.ByteArray, byte[].class);
2236 builtinValues.put(b.BooleanArray, boolean[].class);
2240 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
2242 // if (null == provider2) {
2243 // this.processor = null;
2247 // this.processor = provider2;
2248 // support = provider2.getCore();
2249 // initBuiltinValues();
2253 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2254 // return new ReadGraphSupportImpl(impl.processor);
2258 final public Session getSession() {
2262 final public ResourceSupport getResourceSupport() {
2263 return resourceSupport;
2267 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2269 throw new UnsupportedOperationException();
2271 // assert(subject != null);
2272 // assert(procedure != null);
2274 // final ListenerBase listener = getListenerBase(procedure);
2276 // IntProcedure ip = new IntProcedure() {
2278 // AtomicBoolean first = new AtomicBoolean(true);
2281 // public void execute(ReadGraphImpl graph, int i) {
2283 // if(first.get()) {
2284 // procedure.execute(graph, querySupport.getResource(i));
2286 // procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2288 // } catch (Throwable t2) {
2289 // Logger.defaultLogError(t2);
2294 // public void finished(ReadGraphImpl graph) {
2296 // if(first.compareAndSet(true, false)) {
2297 // procedure.finished(graph);
2298 //// impl.state.barrier.dec(this);
2300 // procedure.finished(impl.newRestart(graph));
2303 // } catch (Throwable t2) {
2304 // Logger.defaultLogError(t2);
2309 // public void exception(ReadGraphImpl graph, Throwable t) {
2311 // if(first.compareAndSet(true, false)) {
2312 // procedure.exception(graph, t);
2314 // procedure.exception(impl.newRestart(graph), t);
2316 // } catch (Throwable t2) {
2317 // Logger.defaultLogError(t2);
2323 // int sId = querySupport.getId(subject);
2326 // QueryCache.runnerPredicates(impl, sId, impl.parent, listener, ip);
2327 // } catch (DatabaseException e) {
2328 // Logger.defaultLogError(e);
2334 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2336 throw new UnsupportedOperationException();
2338 // assert(subject != null);
2339 // assert(procedure != null);
2341 // final ListenerBase listener = getListenerBase(procedure);
2344 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2347 // public void execute(ReadGraphImpl graph, int i) {
2349 // procedure.execute(querySupport.getResource(i));
2350 // } catch (Throwable t2) {
2351 // Logger.defaultLogError(t2);
2356 // public void finished(ReadGraphImpl graph) {
2358 // procedure.finished();
2359 // } catch (Throwable t2) {
2360 // Logger.defaultLogError(t2);
2362 //// impl.state.barrier.dec();
2366 // public void exception(ReadGraphImpl graph, Throwable t) {
2368 // procedure.exception(t);
2369 // } catch (Throwable t2) {
2370 // Logger.defaultLogError(t2);
2372 //// impl.state.barrier.dec();
2376 // } catch (DatabaseException e) {
2377 // Logger.defaultLogError(e);
2383 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2384 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2388 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2389 final Resource predicate, final MultiProcedure<Statement> procedure) {
2391 assert(subject != null);
2392 assert(predicate != null);
2393 assert(procedure != null);
2395 final ListenerBase listener = getListenerBase(procedure);
2397 // impl.state.barrier.inc();
2400 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2403 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2405 procedure.execute(querySupport.getStatement(s, p, o));
2406 } catch (Throwable t2) {
2407 Logger.defaultLogError(t2);
2412 public void finished(ReadGraphImpl graph) {
2414 procedure.finished();
2415 } catch (Throwable t2) {
2416 Logger.defaultLogError(t2);
2418 // impl.state.barrier.dec();
2422 public void exception(ReadGraphImpl graph, Throwable t) {
2424 procedure.exception(t);
2425 } catch (Throwable t2) {
2426 Logger.defaultLogError(t2);
2428 // impl.state.barrier.dec();
2432 } catch (DatabaseException e) {
2433 Logger.defaultLogError(e);
2439 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2440 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2442 assert(subject != null);
2443 assert(predicate != null);
2444 assert(procedure != null);
2446 final ListenerBase listener = getListenerBase(procedure);
2448 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2450 boolean first = true;
2453 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2456 procedure.execute(graph, querySupport.getStatement(s, p, o));
2458 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2460 } catch (Throwable t2) {
2461 Logger.defaultLogError(t2);
2466 public void finished(ReadGraphImpl graph) {
2471 procedure.finished(graph);
2472 // impl.state.barrier.dec(this);
2474 procedure.finished(impl.newRestart(graph));
2476 } catch (Throwable t2) {
2477 Logger.defaultLogError(t2);
2483 public void exception(ReadGraphImpl graph, Throwable t) {
2488 procedure.exception(graph, t);
2489 // impl.state.barrier.dec(this);
2491 procedure.exception(impl.newRestart(graph), t);
2493 } catch (Throwable t2) {
2494 Logger.defaultLogError(t2);
2501 int sId = querySupport.getId(subject);
2502 int pId = querySupport.getId(predicate);
2504 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2505 // else impl.state.barrier.inc(null, null);
2508 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2509 } catch (DatabaseException e) {
2510 Logger.defaultLogError(e);
2516 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2517 final Resource predicate, final StatementProcedure procedure) {
2519 assert(subject != null);
2520 assert(predicate != null);
2521 assert(procedure != null);
2523 final ListenerBase listener = getListenerBase(procedure);
2525 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2527 boolean first = true;
2530 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2533 procedure.execute(graph, s, p, o);
2535 procedure.execute(impl.newRestart(graph), s, p, o);
2537 } catch (Throwable t2) {
2538 Logger.defaultLogError(t2);
2543 public void finished(ReadGraphImpl graph) {
2548 procedure.finished(graph);
2549 // impl.state.barrier.dec(this);
2551 procedure.finished(impl.newRestart(graph));
2553 } catch (Throwable t2) {
2554 Logger.defaultLogError(t2);
2560 public void exception(ReadGraphImpl graph, Throwable t) {
2565 procedure.exception(graph, t);
2566 // impl.state.barrier.dec(this);
2568 procedure.exception(impl.newRestart(graph), t);
2570 } catch (Throwable t2) {
2571 Logger.defaultLogError(t2);
2578 int sId = querySupport.getId(subject);
2579 int pId = querySupport.getId(predicate);
2581 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2582 // else impl.state.barrier.inc(null, null);
2585 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2586 } catch (DatabaseException e) {
2587 Logger.defaultLogError(e);
2593 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2595 assert(subject != null);
2596 assert(predicate != null);
2597 assert(procedure != null);
2599 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2601 private Set<Statement> current = null;
2602 private Set<Statement> run = new HashSet<Statement>();
2605 public void execute(AsyncReadGraph graph, Statement result) {
2607 boolean found = false;
2609 if(current != null) {
2611 found = current.remove(result);
2615 if(!found) procedure.add(graph, result);
2622 public void finished(AsyncReadGraph graph) {
2624 if(current != null) {
2625 for(Statement r : current) procedure.remove(graph, r);
2630 run = new HashSet<Statement>();
2635 public void exception(AsyncReadGraph graph, Throwable t) {
2636 procedure.exception(graph, t);
2640 public boolean isDisposed() {
2641 return procedure.isDisposed();
2649 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2650 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2652 assert(subject != null);
2653 assert(predicate != null);
2654 assert(procedure != null);
2656 final ListenerBase listener = getListenerBase(procedure);
2658 // impl.state.barrier.inc();
2661 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2664 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2666 procedure.execute(graph, querySupport.getStatement(s, p, o));
2667 } catch (Throwable t2) {
2668 Logger.defaultLogError(t2);
2673 public void finished(ReadGraphImpl graph) {
2675 procedure.finished(graph);
2676 } catch (Throwable t2) {
2677 Logger.defaultLogError(t2);
2679 // impl.state.barrier.dec();
2683 public void exception(ReadGraphImpl graph, Throwable t) {
2685 procedure.exception(graph, t);
2686 } catch (Throwable t2) {
2687 Logger.defaultLogError(t2);
2689 // impl.state.barrier.dec();
2693 } catch (DatabaseException e) {
2694 Logger.defaultLogError(e);
2699 private static ListenerBase getListenerBase(Object procedure) {
2700 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2705 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2707 assert(subject != null);
2708 assert(predicate != null);
2709 assert(procedure != null);
2711 final ListenerBase listener = getListenerBase(procedure);
2713 // impl.state.barrier.inc();
2716 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2719 public void execute(ReadGraphImpl graph, int i) {
2721 procedure.execute(querySupport.getResource(i));
2722 } catch (Throwable t2) {
2723 Logger.defaultLogError(t2);
2728 public void finished(ReadGraphImpl graph) {
2730 procedure.finished();
2731 } catch (Throwable t2) {
2732 Logger.defaultLogError(t2);
2734 // impl.state.barrier.dec();
2738 public void exception(ReadGraphImpl graph, Throwable t) {
2739 System.out.println("forEachObject exception " + t);
2741 procedure.exception(t);
2742 } catch (Throwable t2) {
2743 Logger.defaultLogError(t2);
2745 // impl.state.barrier.dec();
2749 } catch (DatabaseException e) {
2750 Logger.defaultLogError(e);
2756 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
2758 assert(subject != null);
2759 assert(procedure != null);
2761 final ListenerBase listener = getListenerBase(procedure);
2763 int sId = querySupport.getId(subject);
2766 QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
2769 public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
2770 procedure.execute(graph, result);
2774 public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
2775 procedure.exception(graph, throwable);
2779 } catch (DatabaseException e) {
2780 Logger.defaultLogError(e);
2785 final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
2787 // assert(subject != null);
2788 // assert(procedure != null);
2790 // final ListenerBase listener = getListenerBase(procedure);
2792 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2794 return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
2799 // final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2801 // assert(subject != null);
2802 // assert(procedure != null);
2804 // final ListenerBase listener = getListenerBase(procedure);
2806 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2810 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2813 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2815 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2817 private Resource single = null;
2820 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2821 if(single == null) {
2824 single = INVALID_RESOURCE;
2829 public synchronized void finished(AsyncReadGraph graph) {
2830 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2831 else procedure.execute(graph, single);
2835 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2836 procedure.exception(graph, throwable);
2843 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2845 final int sId = querySupport.getId(subject);
2846 final int pId = querySupport.getId(predicate);
2849 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2850 } catch (DatabaseException e) {
2851 Logger.defaultLogError(e);
2856 static class Runner2Procedure implements IntProcedure {
2858 public int single = 0;
2859 public Throwable t = null;
2861 public void clear() {
2867 public void execute(ReadGraphImpl graph, int i) {
2868 if(single == 0) single = i;
2873 public void finished(ReadGraphImpl graph) {
2874 if(single == -1) single = 0;
2878 public void exception(ReadGraphImpl graph, Throwable throwable) {
2883 public int get() throws DatabaseException {
2885 if(t instanceof DatabaseException) throw (DatabaseException)t;
2886 else throw new DatabaseException(t);
2893 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2895 final int sId = querySupport.getId(subject);
2896 final int pId = querySupport.getId(predicate);
2898 Runner2Procedure proc = new Runner2Procedure();
2899 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2904 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2906 assert(subject != null);
2907 assert(predicate != null);
2909 final ListenerBase listener = getListenerBase(procedure);
2911 if(impl.parent != null || listener != null) {
2913 IntProcedure ip = new IntProcedure() {
2915 AtomicBoolean first = new AtomicBoolean(true);
2918 public void execute(ReadGraphImpl graph, int i) {
2921 procedure.execute(impl, querySupport.getResource(i));
2923 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2925 } catch (Throwable t2) {
2926 Logger.defaultLogError(t2);
2932 public void finished(ReadGraphImpl graph) {
2934 if(first.compareAndSet(true, false)) {
2935 procedure.finished(impl);
2936 // impl.state.barrier.dec(this);
2938 procedure.finished(impl.newRestart(graph));
2940 } catch (Throwable t2) {
2941 Logger.defaultLogError(t2);
2946 public void exception(ReadGraphImpl graph, Throwable t) {
2948 procedure.exception(graph, t);
2949 } catch (Throwable t2) {
2950 Logger.defaultLogError(t2);
2952 // impl.state.barrier.dec(this);
2956 public String toString() {
2957 return "forEachObject with " + procedure;
2962 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2963 // else impl.state.barrier.inc(null, null);
2965 forEachObject(impl, subject, predicate, listener, ip);
2969 IntProcedure ip = new IntProcedure() {
2972 public void execute(ReadGraphImpl graph, int i) {
2973 procedure.execute(graph, querySupport.getResource(i));
2977 public void finished(ReadGraphImpl graph) {
2978 procedure.finished(graph);
2982 public void exception(ReadGraphImpl graph, Throwable t) {
2983 procedure.exception(graph, t);
2987 public String toString() {
2988 return "forEachObject with " + procedure;
2993 forEachObject(impl, subject, predicate, listener, ip);
3000 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3002 assert(subject != null);
3003 assert(predicate != null);
3004 assert(procedure != null);
3006 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3008 private Set<Resource> current = null;
3009 private Set<Resource> run = new HashSet<Resource>();
3012 public void execute(AsyncReadGraph graph, Resource result) {
3014 boolean found = false;
3016 if(current != null) {
3018 found = current.remove(result);
3022 if(!found) procedure.add(graph, result);
3029 public void finished(AsyncReadGraph graph) {
3031 if(current != null) {
3032 for(Resource r : current) procedure.remove(graph, r);
3037 run = new HashSet<Resource>();
3042 public boolean isDisposed() {
3043 return procedure.isDisposed();
3047 public void exception(AsyncReadGraph graph, Throwable t) {
3048 procedure.exception(graph, t);
3052 public String toString() {
3053 return "forObjectSet " + procedure;
3061 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3063 assert(subject != null);
3064 assert(procedure != null);
3066 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
3068 private Set<Resource> current = null;
3069 private Set<Resource> run = new HashSet<Resource>();
3072 public void execute(AsyncReadGraph graph, Resource result) {
3074 boolean found = false;
3076 if(current != null) {
3078 found = current.remove(result);
3082 if(!found) procedure.add(graph, result);
3089 public void finished(AsyncReadGraph graph) {
3091 if(current != null) {
3092 for(Resource r : current) procedure.remove(graph, r);
3097 run = new HashSet<Resource>();
3102 public boolean isDisposed() {
3103 return procedure.isDisposed();
3107 public void exception(AsyncReadGraph graph, Throwable t) {
3108 procedure.exception(graph, t);
3112 public String toString() {
3113 return "forPredicateSet " + procedure;
3121 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3123 assert(subject != null);
3124 assert(procedure != null);
3126 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
3128 private Set<Resource> current = null;
3129 private Set<Resource> run = new HashSet<Resource>();
3132 public void execute(AsyncReadGraph graph, Resource result) {
3134 boolean found = false;
3136 if(current != null) {
3138 found = current.remove(result);
3142 if(!found) procedure.add(graph, result);
3149 public void finished(AsyncReadGraph graph) {
3151 if(current != null) {
3152 for(Resource r : current) procedure.remove(graph, r);
3157 run = new HashSet<Resource>();
3162 public boolean isDisposed() {
3163 return procedure.isDisposed();
3167 public void exception(AsyncReadGraph graph, Throwable t) {
3168 procedure.exception(graph, t);
3172 public String toString() {
3173 return "forPrincipalTypeSet " + procedure;
3181 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3183 assert(subject != null);
3184 assert(predicate != null);
3185 assert(procedure != null);
3187 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3189 private Set<Resource> current = null;
3190 private Set<Resource> run = new HashSet<Resource>();
3193 public void execute(AsyncReadGraph graph, Resource result) {
3195 boolean found = false;
3197 if(current != null) {
3199 found = current.remove(result);
3203 if(!found) procedure.add(graph, result);
3210 public void finished(AsyncReadGraph graph) {
3212 if(current != null) {
3213 for(Resource r : current) procedure.remove(graph, r);
3218 run = new HashSet<Resource>();
3223 public boolean isDisposed() {
3224 return procedure.isDisposed();
3228 public void exception(AsyncReadGraph graph, Throwable t) {
3229 procedure.exception(graph, t);
3233 public String toString() {
3234 return "forObjectSet " + procedure;
3242 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3244 assert(subject != null);
3245 assert(predicate != null);
3246 assert(procedure != null);
3248 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3250 private Set<Statement> current = null;
3251 private Set<Statement> run = new HashSet<Statement>();
3254 public void execute(AsyncReadGraph graph, Statement result) {
3256 boolean found = false;
3258 if(current != null) {
3260 found = current.remove(result);
3264 if(!found) procedure.add(graph, result);
3271 public void finished(AsyncReadGraph graph) {
3273 if(current != null) {
3274 for(Statement s : current) procedure.remove(graph, s);
3279 run = new HashSet<Statement>();
3284 public boolean isDisposed() {
3285 return procedure.isDisposed();
3289 public void exception(AsyncReadGraph graph, Throwable t) {
3290 procedure.exception(graph, t);
3294 public String toString() {
3295 return "forStatementSet " + procedure;
3303 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
3304 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3306 assert(subject != null);
3307 assert(predicate != null);
3308 assert(procedure != null);
3310 final ListenerBase listener = getListenerBase(procedure);
3313 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3316 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3318 procedure.execute(graph, querySupport.getResource(o));
3319 } catch (Throwable t2) {
3320 Logger.defaultLogError(t2);
3325 public void finished(ReadGraphImpl graph) {
3327 procedure.finished(graph);
3328 } catch (Throwable t2) {
3329 Logger.defaultLogError(t2);
3331 // impl.state.barrier.dec();
3335 public void exception(ReadGraphImpl graph, Throwable t) {
3337 procedure.exception(graph, t);
3338 } catch (Throwable t2) {
3339 Logger.defaultLogError(t2);
3341 // impl.state.barrier.dec();
3345 } catch (DatabaseException e) {
3346 Logger.defaultLogError(e);
3352 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3354 assert(subject != null);
3355 assert(procedure != null);
3357 final ListenerBase listener = getListenerBase(procedure);
3359 IntProcedure ip = new IntProcedure() {
3362 public void execute(ReadGraphImpl graph, int i) {
3364 procedure.execute(graph, querySupport.getResource(i));
3365 } catch (Throwable t2) {
3366 Logger.defaultLogError(t2);
3371 public void finished(ReadGraphImpl graph) {
3373 procedure.finished(graph);
3374 } catch (Throwable t2) {
3375 Logger.defaultLogError(t2);
3377 // impl.state.barrier.dec(this);
3381 public void exception(ReadGraphImpl graph, Throwable t) {
3383 procedure.exception(graph, t);
3384 } catch (Throwable t2) {
3385 Logger.defaultLogError(t2);
3387 // impl.state.barrier.dec(this);
3392 int sId = querySupport.getId(subject);
3394 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3395 // else impl.state.barrier.inc(null, null);
3398 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3399 } catch (DatabaseException e) {
3400 Logger.defaultLogError(e);
3406 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3408 assert(subject != null);
3409 assert(procedure != null);
3411 final ListenerBase listener = getListenerBase(procedure);
3413 // impl.state.barrier.inc();
3416 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3419 public void execute(ReadGraphImpl graph, int i) {
3421 procedure.execute(querySupport.getResource(i));
3422 } catch (Throwable t2) {
3423 Logger.defaultLogError(t2);
3428 public void finished(ReadGraphImpl graph) {
3430 procedure.finished();
3431 } catch (Throwable t2) {
3432 Logger.defaultLogError(t2);
3434 // impl.state.barrier.dec();
3438 public void exception(ReadGraphImpl graph, Throwable t) {
3440 procedure.exception(t);
3441 } catch (Throwable t2) {
3442 Logger.defaultLogError(t2);
3444 // impl.state.barrier.dec();
3448 } catch (DatabaseException e) {
3449 Logger.defaultLogError(e);
3453 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3455 assert(subject != null);
3456 assert(procedure != null);
3458 final ListenerBase listener = getListenerBase(procedure);
3459 assert(listener == null);
3461 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3464 public void execute(final ReadGraphImpl graph, IntSet set) {
3465 procedure.execute(graph, set);
3469 public void exception(ReadGraphImpl graph, Throwable t) {
3470 procedure.exception(graph, t);
3475 int sId = querySupport.getId(subject);
3478 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3479 } catch (DatabaseException e) {
3480 Logger.defaultLogError(e);
3486 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3488 assert(subject != null);
3490 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3495 final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3497 assert(subject != null);
3499 return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null);
3504 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3506 assert(subject != null);
3507 assert(procedure != null);
3509 final ListenerBase listener = getListenerBase(procedure);
3512 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3514 AtomicBoolean first = new AtomicBoolean(true);
3517 public void execute(final ReadGraphImpl graph, IntSet set) {
3518 // final HashSet<Resource> result = new HashSet<Resource>();
3519 // set.forEach(new TIntProcedure() {
3522 // public boolean execute(int type) {
3523 // result.add(querySupport.getResource(type));
3529 if(first.compareAndSet(true, false)) {
3530 procedure.execute(graph, set);
3531 // impl.state.barrier.dec();
3533 procedure.execute(impl.newRestart(graph), set);
3535 } catch (Throwable t2) {
3536 Logger.defaultLogError(t2);
3541 public void exception(ReadGraphImpl graph, Throwable t) {
3543 if(first.compareAndSet(true, false)) {
3544 procedure.exception(graph, t);
3545 // impl.state.barrier.dec();
3547 procedure.exception(impl.newRestart(graph), t);
3549 } catch (Throwable t2) {
3550 Logger.defaultLogError(t2);
3555 } catch (DatabaseException e) {
3556 Logger.defaultLogError(e);
3562 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3564 assert(subject != null);
3565 assert(procedure != null);
3567 final ListenerBase listener = getListenerBase(procedure);
3569 IntProcedure ip = new IntProcedureAdapter() {
3572 public void execute(final ReadGraphImpl graph, int superRelation) {
3574 procedure.execute(graph, querySupport.getResource(superRelation));
3575 } catch (Throwable t2) {
3576 Logger.defaultLogError(t2);
3581 public void finished(final ReadGraphImpl graph) {
3583 procedure.finished(graph);
3584 } catch (Throwable t2) {
3585 Logger.defaultLogError(t2);
3587 // impl.state.barrier.dec(this);
3592 public void exception(ReadGraphImpl graph, Throwable t) {
3594 procedure.exception(graph, t);
3595 } catch (Throwable t2) {
3596 Logger.defaultLogError(t2);
3598 // impl.state.barrier.dec(this);
3603 int sId = querySupport.getId(subject);
3605 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3606 // else impl.state.barrier.inc(null, null);
3609 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3610 } catch (DatabaseException e) {
3611 Logger.defaultLogError(e);
3614 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3619 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3621 assert(subject != null);
3622 assert(procedure != null);
3624 final ListenerBase listener = getListenerBase(procedure);
3626 // impl.state.barrier.inc();
3628 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3633 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3635 assert(subject != null);
3636 assert(procedure != null);
3638 final ListenerBase listener = getListenerBase(procedure);
3640 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3643 public void execute(final ReadGraphImpl graph, IntSet set) {
3644 // final HashSet<Resource> result = new HashSet<Resource>();
3645 // set.forEach(new TIntProcedure() {
3648 // public boolean execute(int type) {
3649 // result.add(querySupport.getResource(type));
3655 procedure.execute(graph, set);
3656 } catch (Throwable t2) {
3657 Logger.defaultLogError(t2);
3659 // impl.state.barrier.dec(this);
3663 public void exception(ReadGraphImpl graph, Throwable t) {
3665 procedure.exception(graph, t);
3666 } catch (Throwable t2) {
3667 Logger.defaultLogError(t2);
3669 // impl.state.barrier.dec(this);
3674 int sId = querySupport.getId(subject);
3676 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3677 // else impl.state.barrier.inc(null, null);
3680 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3681 } catch (DatabaseException e) {
3682 Logger.defaultLogError(e);
3687 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3688 return getValue(impl, querySupport.getId(subject));
3691 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3692 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3696 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3698 assert(subject != null);
3699 assert(procedure != null);
3701 int sId = querySupport.getId(subject);
3703 // if(procedure != null) {
3705 final ListenerBase listener = getListenerBase(procedure);
3707 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3709 AtomicBoolean first = new AtomicBoolean(true);
3712 public void execute(ReadGraphImpl graph, byte[] result) {
3714 if(first.compareAndSet(true, false)) {
3715 procedure.execute(graph, result);
3716 // impl.state.barrier.dec(this);
3718 procedure.execute(impl.newRestart(graph), result);
3720 } catch (Throwable t2) {
3721 Logger.defaultLogError(t2);
3726 public void exception(ReadGraphImpl graph, Throwable t) {
3728 if(first.compareAndSet(true, false)) {
3729 procedure.exception(graph, t);
3730 // impl.state.barrier.dec(this);
3732 procedure.exception(impl.newRestart(graph), t);
3734 } catch (Throwable t2) {
3735 Logger.defaultLogError(t2);
3741 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3742 // else impl.state.barrier.inc(null, null);
3745 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3746 } catch (DatabaseException e) {
3747 throw new IllegalStateException("Internal error");
3752 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3756 // throw new IllegalStateException("Internal error");
3761 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3763 assert(subject != null);
3764 assert(procedure != null);
3766 final ListenerBase listener = getListenerBase(procedure);
3768 if(impl.parent != null || listener != null) {
3770 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3772 AtomicBoolean first = new AtomicBoolean(true);
3775 public void execute(ReadGraphImpl graph, byte[] result) {
3777 if(first.compareAndSet(true, false)) {
3778 procedure.execute(graph, result);
3779 // impl.state.barrier.dec(this);
3781 procedure.execute(impl.newRestart(graph), result);
3783 } catch (Throwable t2) {
3784 Logger.defaultLogError(t2);
3789 public void exception(ReadGraphImpl graph, Throwable t) {
3791 if(first.compareAndSet(true, false)) {
3792 procedure.exception(graph, t);
3793 // impl.state.barrier.dec(this);
3795 procedure.exception(impl.newRestart(graph), t);
3797 } catch (Throwable t2) {
3798 Logger.defaultLogError(t2);
3804 int sId = querySupport.getId(subject);
3806 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3807 // else impl.state.barrier.inc(null, null);
3810 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3811 } catch (DatabaseException e) {
3812 Logger.defaultLogError(e);
3817 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3820 public void execute(ReadGraphImpl graph, byte[] result) {
3822 procedure.execute(graph, result);
3827 public void exception(ReadGraphImpl graph, Throwable t) {
3829 procedure.exception(graph, t);
3835 int sId = querySupport.getId(subject);
3838 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3839 } catch (DatabaseException e) {
3840 Logger.defaultLogError(e);
3848 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3850 assert(relation != null);
3851 assert(procedure != null);
3853 final ListenerBase listener = getListenerBase(procedure);
3855 IntProcedure ip = new IntProcedure() {
3857 private int result = 0;
3859 final AtomicBoolean found = new AtomicBoolean(false);
3860 final AtomicBoolean done = new AtomicBoolean(false);
3863 public void finished(ReadGraphImpl graph) {
3865 // Shall fire exactly once!
3866 if(done.compareAndSet(false, true)) {
3869 procedure.exception(graph, new NoInverseException(""));
3870 // impl.state.barrier.dec(this);
3872 procedure.execute(graph, querySupport.getResource(result));
3873 // impl.state.barrier.dec(this);
3875 } catch (Throwable t) {
3876 Logger.defaultLogError(t);
3883 public void execute(ReadGraphImpl graph, int i) {
3885 if(found.compareAndSet(false, true)) {
3888 // Shall fire exactly once!
3889 if(done.compareAndSet(false, true)) {
3891 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3892 // impl.state.barrier.dec(this);
3893 } catch (Throwable t) {
3894 Logger.defaultLogError(t);
3902 public void exception(ReadGraphImpl graph, Throwable t) {
3903 // Shall fire exactly once!
3904 if(done.compareAndSet(false, true)) {
3906 procedure.exception(graph, t);
3907 // impl.state.barrier.dec(this);
3908 } catch (Throwable t2) {
3909 Logger.defaultLogError(t2);
3916 int sId = querySupport.getId(relation);
3918 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3919 // else impl.state.barrier.inc(null, null);
3922 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3923 } catch (DatabaseException e) {
3924 Logger.defaultLogError(e);
3930 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3933 assert(procedure != null);
3935 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3938 public void execute(ReadGraphImpl graph, Integer result) {
3940 procedure.execute(graph, querySupport.getResource(result));
3941 } catch (Throwable t2) {
3942 Logger.defaultLogError(t2);
3944 // impl.state.barrier.dec(this);
3948 public void exception(ReadGraphImpl graph, Throwable t) {
3951 procedure.exception(graph, t);
3952 } catch (Throwable t2) {
3953 Logger.defaultLogError(t2);
3955 // impl.state.barrier.dec(this);
3960 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3961 // else impl.state.barrier.inc(null, null);
3963 forResource(impl, id, impl.parent, ip);
3968 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3971 assert(procedure != null);
3973 // impl.state.barrier.inc();
3976 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
3979 public void execute(ReadGraphImpl graph, Integer result) {
3981 procedure.execute(graph, querySupport.getResource(result));
3982 } catch (Throwable t2) {
3983 Logger.defaultLogError(t2);
3985 // impl.state.barrier.dec();
3989 public void exception(ReadGraphImpl graph, Throwable t) {
3991 procedure.exception(graph, t);
3992 } catch (Throwable t2) {
3993 Logger.defaultLogError(t2);
3995 // impl.state.barrier.dec();
3999 } catch (DatabaseException e) {
4000 Logger.defaultLogError(e);
4006 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4008 assert(subject != null);
4009 assert(procedure != null);
4011 final ListenerBase listener = getListenerBase(procedure);
4014 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
4015 procedure.execute(impl, !result.isEmpty());
4016 } catch (DatabaseException e) {
4017 procedure.exception(impl, e);
4023 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
4025 assert(subject != null);
4026 assert(predicate != null);
4027 assert(procedure != null);
4029 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
4031 boolean found = false;
4034 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4039 synchronized public void finished(AsyncReadGraph graph) {
4041 procedure.execute(graph, found);
4042 } catch (Throwable t2) {
4043 Logger.defaultLogError(t2);
4045 // impl.state.barrier.dec(this);
4049 public void exception(AsyncReadGraph graph, Throwable t) {
4051 procedure.exception(graph, t);
4052 } catch (Throwable t2) {
4053 Logger.defaultLogError(t2);
4055 // impl.state.barrier.dec(this);
4060 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
4061 // else impl.state.barrier.inc(null, null);
4063 forEachObject(impl, subject, predicate, ip);
4068 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
4070 assert(subject != null);
4071 assert(predicate != null);
4072 assert(procedure != null);
4074 // impl.state.barrier.inc();
4076 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
4078 boolean found = false;
4081 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4082 if(resource.equals(object)) found = true;
4086 synchronized public void finished(AsyncReadGraph graph) {
4088 procedure.execute(graph, found);
4089 } catch (Throwable t2) {
4090 Logger.defaultLogError(t2);
4092 // impl.state.barrier.dec();
4096 public void exception(AsyncReadGraph graph, Throwable t) {
4098 procedure.exception(graph, t);
4099 } catch (Throwable t2) {
4100 Logger.defaultLogError(t2);
4102 // impl.state.barrier.dec();
4110 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4112 assert(subject != null);
4113 assert(procedure != null);
4115 final ListenerBase listener = getListenerBase(procedure);
4117 // impl.state.barrier.inc();
4120 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
4123 public void execute(ReadGraphImpl graph, byte[] object) {
4124 boolean result = object != null;
4126 procedure.execute(graph, result);
4127 } catch (Throwable t2) {
4128 Logger.defaultLogError(t2);
4130 // impl.state.barrier.dec();
4134 public void exception(ReadGraphImpl graph, Throwable t) {
4136 procedure.exception(graph, t);
4137 } catch (Throwable t2) {
4138 Logger.defaultLogError(t2);
4140 // impl.state.barrier.dec();
4144 } catch (DatabaseException e) {
4145 Logger.defaultLogError(e);
4151 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4153 assert(subject != null);
4154 assert(procedure != null);
4156 final ListenerBase listener = getListenerBase(procedure);
4160 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
4163 public void exception(ReadGraphImpl graph, Throwable t) {
4165 procedure.exception(graph, t);
4166 } catch (Throwable t2) {
4167 Logger.defaultLogError(t2);
4169 // impl.state.barrier.dec();
4173 public void execute(ReadGraphImpl graph, int i) {
4175 procedure.execute(graph, querySupport.getResource(i));
4176 } catch (Throwable t2) {
4177 Logger.defaultLogError(t2);
4182 public void finished(ReadGraphImpl graph) {
4184 procedure.finished(graph);
4185 } catch (Throwable t2) {
4186 Logger.defaultLogError(t2);
4188 // impl.state.barrier.dec();
4192 } catch (DatabaseException e) {
4193 Logger.defaultLogError(e);
4199 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
4201 // assert(request != null);
4202 // assert(procedure != null);
4204 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
4209 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
4211 // assert(graph != null);
4212 // assert(request != null);
4214 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
4215 // if(entry != null && entry.isReady()) {
4216 // return (T)entry.get(graph, this, null);
4218 // return request.perform(graph);
4223 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
4225 // assert(graph != null);
4226 // assert(request != null);
4228 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
4229 // if(entry != null && entry.isReady()) {
4230 // if(entry.isExcepted()) {
4231 // Throwable t = (Throwable)entry.getResult();
4232 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4233 // else throw new DatabaseException(t);
4235 // return (T)entry.getResult();
4239 // final DataContainer<T> result = new DataContainer<T>();
4240 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
4242 // request.register(graph, new Listener<T>() {
4245 // public void exception(Throwable t) {
4246 // exception.set(t);
4250 // public void execute(T t) {
4255 // public boolean isDisposed() {
4261 // Throwable t = exception.get();
4263 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4264 // else throw new DatabaseException(t);
4267 // return result.get();
4274 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
4276 // assert(graph != null);
4277 // assert(request != null);
4279 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
4280 // if(entry != null && entry.isReady()) {
4281 // if(entry.isExcepted()) {
4282 // procedure.exception(graph, (Throwable)entry.getResult());
4284 // procedure.execute(graph, (T)entry.getResult());
4287 // request.perform(graph, procedure);
4293 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final SyncMultiProcedure<T> procedure, ListenerBase listener) {
4295 assert(request != null);
4296 assert(procedure != null);
4300 queryMultiRead(impl, request, parent, listener, procedure);
4302 } catch (DatabaseException e) {
4304 throw new IllegalStateException(e);
4311 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4313 assert(request != null);
4314 assert(procedure != null);
4316 // impl.state.barrier.inc();
4318 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4320 public void execute(AsyncReadGraph graph, T result) {
4323 procedure.execute(graph, result);
4324 } catch (Throwable t2) {
4325 Logger.defaultLogError(t2);
4330 public void finished(AsyncReadGraph graph) {
4333 procedure.finished(graph);
4334 } catch (Throwable t2) {
4335 Logger.defaultLogError(t2);
4338 // impl.state.barrier.dec();
4343 public String toString() {
4344 return procedure.toString();
4348 public void exception(AsyncReadGraph graph, Throwable t) {
4351 procedure.exception(graph, t);
4352 } catch (Throwable t2) {
4353 Logger.defaultLogError(t2);
4356 // impl.state.barrier.dec();
4365 // final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
4367 // assert(request != null);
4368 // assert(procedure != null);
4372 // queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4375 // public String toString() {
4376 // return procedure.toString();
4380 // public void execute(AsyncReadGraph graph, T result) {
4382 // procedure.execute(result);
4383 // } catch (Throwable t2) {
4384 // Logger.defaultLogError(t2);
4389 // public void exception(AsyncReadGraph graph, Throwable throwable) {
4391 // procedure.exception(throwable);
4392 // } catch (Throwable t2) {
4393 // Logger.defaultLogError(t2);
4399 // } catch (DatabaseException e) {
4401 // throw new IllegalStateException(e);
4408 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4410 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4415 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4417 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4422 public VirtualGraph getValueProvider(Resource subject) {
4424 return querySupport.getValueProvider(querySupport.getId(subject));
4428 public boolean resumeTasks(ReadGraphImpl graph) {
4430 return querySupport.resume(graph);
4434 public boolean isImmutable(int resourceId) {
4435 return querySupport.isImmutable(resourceId);
4438 public boolean isImmutable(Resource resource) {
4439 ResourceImpl impl = (ResourceImpl)resource;
4440 return isImmutable(impl.id);
4445 public Layer0 getL0(ReadGraph graph) {
4447 L0 = Layer0.getInstance(graph);
4452 public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
4453 protected Integer initialValue() {