1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
35 import org.simantics.databoard.Bindings;
36 import org.simantics.db.AsyncReadGraph;
37 import org.simantics.db.DevelopmentKeys;
38 import org.simantics.db.DirectStatements;
39 import org.simantics.db.ReadGraph;
40 import org.simantics.db.RelationInfo;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
46 import org.simantics.db.common.utils.Logger;
47 import org.simantics.db.debug.ListenerReport;
48 import org.simantics.db.exception.DatabaseException;
49 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
50 import org.simantics.db.exception.NoInverseException;
51 import org.simantics.db.exception.ResourceNotFoundException;
52 import org.simantics.db.impl.DebugPolicy;
53 import org.simantics.db.impl.ResourceImpl;
54 import org.simantics.db.impl.graph.BarrierTracing;
55 import org.simantics.db.impl.graph.ReadGraphImpl;
56 import org.simantics.db.impl.graph.ReadGraphSupport;
57 import org.simantics.db.impl.graph.WriteGraphImpl;
58 import org.simantics.db.impl.procedure.IntProcedureAdapter;
59 import org.simantics.db.impl.procedure.InternalProcedure;
60 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
61 import org.simantics.db.impl.support.ResourceSupport;
62 import org.simantics.db.procedure.AsyncMultiListener;
63 import org.simantics.db.procedure.AsyncMultiProcedure;
64 import org.simantics.db.procedure.AsyncProcedure;
65 import org.simantics.db.procedure.AsyncSetListener;
66 import org.simantics.db.procedure.ListenerBase;
67 import org.simantics.db.procedure.MultiProcedure;
68 import org.simantics.db.procedure.StatementProcedure;
69 import org.simantics.db.procedure.SyncMultiProcedure;
70 import org.simantics.db.request.AsyncMultiRead;
71 import org.simantics.db.request.ExternalRead;
72 import org.simantics.db.request.MultiRead;
73 import org.simantics.db.request.RequestFlags;
74 import org.simantics.layer0.Layer0;
75 import org.simantics.utils.DataContainer;
76 import org.simantics.utils.Development;
77 import org.simantics.utils.datastructures.Pair;
78 import org.simantics.utils.datastructures.collections.CollectionUtils;
79 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
81 import gnu.trove.procedure.TIntProcedure;
82 import gnu.trove.procedure.TLongProcedure;
83 import gnu.trove.procedure.TObjectProcedure;
84 import gnu.trove.set.hash.THashSet;
85 import gnu.trove.set.hash.TIntHashSet;
87 @SuppressWarnings({"rawtypes", "unchecked"})
88 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
90 public static int indent = 0;
95 public int boundQueries = 0;
98 final private int functionalRelation;
100 final private int superrelationOf;
102 final private int instanceOf;
104 final private int inverseOf;
106 final private int asserts;
108 final private int hasPredicate;
110 final private int hasPredicateInverse;
112 final private int hasObject;
114 final private int inherits;
116 final private int subrelationOf;
118 final private int rootLibrary;
121 * A cache for the root library resource. Initialized in
122 * {@link #getRootLibraryResource()}.
124 private volatile ResourceImpl rootLibraryResource;
126 final private int library;
128 final private int consistsOf;
130 final private int hasName;
132 AtomicInteger sleepers = new AtomicInteger(0);
134 private boolean updating = false;
137 private boolean firingListeners = false;
139 final public QueryCache cache;
140 final public QuerySupport querySupport;
141 final public Session session;
142 final public ResourceSupport resourceSupport;
144 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
146 QueryThread[] executors;
148 // public ArrayList<SessionTask>[] queues;
150 public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
154 INIT, RUN, SLEEP, DISPOSED
158 public ThreadState[] threadStates;
159 // public ReentrantLock[] threadLocks;
160 // public Condition[] threadConditions;
162 //public ArrayList<SessionTask>[] ownTasks;
164 //public ArrayList<SessionTask>[] ownSyncTasks;
166 //ArrayList<SessionTask>[] delayQueues;
168 final Object querySupportLock;
170 public Long modificationCounter = 0L;
172 public void close() {
175 public SessionTask getOwnTask(ReadGraphImpl impl) {
176 Set<ReadGraphImpl> ancestors = impl.ancestorSet();
177 synchronized(querySupportLock) {
179 while(index < freeScheduling.size()) {
180 SessionTask task = freeScheduling.get(index);
181 if(task.hasCommonParent(ancestors)) {
182 return freeScheduling.remove(index);
190 public SessionTask getSubTask(ReadGraphImpl impl) {
191 Set<ReadGraphImpl> onlyThis = Collections.singleton(impl);
192 synchronized(querySupportLock) {
194 while(index < freeScheduling.size()) {
195 SessionTask task = freeScheduling.get(index);
196 if(task.hasCommonParent(onlyThis)) {
197 return freeScheduling.remove(index);
205 public boolean performPending(ReadGraphImpl graph) {
206 SessionTask task = getOwnTask(graph);
208 task.run(QueryProcessor.thread.get());
215 // final public void scheduleOwn(int caller, SessionTask request) {
216 // ownTasks[caller].add(request);
219 final public void schedule(SessionTask request) {
221 //int performer = request.thread;
223 // if(DebugPolicy.SCHEDULE)
224 // System.out.println("schedule " + request + " " + " -> " + performer);
226 //assert(performer >= 0);
228 assert(request != null);
230 // if(caller == performer) {
231 // request.run(caller);
234 // if(performer == THREADS) {
236 synchronized(querySupportLock) {
238 if(BarrierTracing.BOOKKEEPING) {
239 Exception current = new Exception();
240 Exception previous = BarrierTracing.tasks.put(request, current);
241 if(previous != null) {
242 previous.printStackTrace();
243 current.printStackTrace();
247 freeScheduling.add(request);
249 querySupportLock.notifyAll();
257 // ReentrantLock queueLock = threadLocks[performer];
259 // queues[performer].add(request);
260 // // This thread could have been sleeping
261 // if(queues[performer].size() == 1) {
262 // //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
263 // threadConditions[performer].signalAll();
265 // queueLock.unlock();
272 final public int THREAD_MASK;
274 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
276 public static abstract class SessionTask {
278 public final ReadGraphImpl graph;
279 private Set<ReadGraphImpl> ancestors;
280 private int counter = 0;
281 private Exception trace;
283 public SessionTask(ReadGraphImpl graph) {
285 if(graph != null) graph.asyncBarrier.inc();
288 public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
289 if(graph == null) return false;
290 if(ancestors == null) ancestors = graph.ancestorSet();
291 return !Collections.disjoint(ancestors, otherAncestors);
294 public abstract void run0(int thread);
296 public final void run(int thread) {
298 if(BarrierTracing.BOOKKEEPING) {
299 trace.printStackTrace();
300 new Exception().printStackTrace();
302 throw new IllegalStateException("Multiple invocations of SessionTask!");
304 if(BarrierTracing.BOOKKEEPING) {
305 trace = new Exception();
308 if(graph != null) graph.asyncBarrier.dec();
312 public String toString() {
313 return "SessionTask[" + graph.parent + "]";
318 public static abstract class SessionRead extends SessionTask {
320 final public Semaphore notify;
321 final public DataContainer<Throwable> throwable;
323 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
325 this.throwable = throwable;
326 this.notify = notify;
331 long waitingTime = 0;
334 static int koss2 = 0;
336 public boolean resume(ReadGraphImpl graph) {
337 return executors[0].runSynchronized();
340 //private WeakReference<GarbageTracker> garbageTracker;
342 private class GarbageTracker {
345 protected void finalize() throws Throwable {
347 // System.err.println("GarbageTracker");
349 // garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
357 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
358 throws DatabaseException {
360 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
363 THREAD_MASK = threads - 1;
366 cache = new QueryCache(core, threads);
367 session = querySupport.getSession();
368 resourceSupport = querySupport.getSupport();
369 querySupportLock = core.getLock();
371 executors = new QueryThread[THREADS];
372 // queues = new ArrayList[THREADS];
373 // threadLocks = new ReentrantLock[THREADS];
374 // threadConditions = new Condition[THREADS];
375 threadStates = new ThreadState[THREADS];
376 // ownTasks = new ArrayList[THREADS];
377 // ownSyncTasks = new ArrayList[THREADS];
378 // delayQueues = new ArrayList[THREADS * THREADS];
380 // freeSchedule = new AtomicInteger(0);
382 // for (int i = 0; i < THREADS * THREADS; i++) {
383 // delayQueues[i] = new ArrayList<SessionTask>();
386 for (int i = 0; i < THREADS; i++) {
388 // tasks[i] = new ArrayList<Runnable>();
389 // ownTasks[i] = new ArrayList<SessionTask>();
390 // ownSyncTasks[i] = new ArrayList<SessionTask>();
391 // queues[i] = new ArrayList<SessionTask>();
392 // threadLocks[i] = new ReentrantLock();
393 // threadConditions[i] = threadLocks[i].newCondition();
394 // limits[i] = false;
395 threadStates[i] = ThreadState.INIT;
399 for (int i = 0; i < THREADS; i++) {
403 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
405 threadSet.add(executors[i]);
410 for (int i = 0; i < THREADS; i++) {
411 executors[i].start();
414 // Make sure that query threads are up and running
415 while(sleepers.get() != THREADS) {
418 } catch (InterruptedException e) {
423 rootLibrary = core.getBuiltin("http:/");
424 boolean builtinsInstalled = rootLibrary != 0;
426 if (builtinsInstalled) {
427 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
428 assert (functionalRelation != 0);
430 functionalRelation = 0;
432 if (builtinsInstalled) {
433 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
434 assert (instanceOf != 0);
438 if (builtinsInstalled) {
439 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
440 assert (inverseOf != 0);
445 if (builtinsInstalled) {
446 inherits = core.getBuiltin(Layer0.URIs.Inherits);
447 assert (inherits != 0);
451 if (builtinsInstalled) {
452 asserts = core.getBuiltin(Layer0.URIs.Asserts);
453 assert (asserts != 0);
457 if (builtinsInstalled) {
458 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
459 assert (hasPredicate != 0);
463 if (builtinsInstalled) {
464 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
465 assert (hasPredicateInverse != 0);
467 hasPredicateInverse = 0;
469 if (builtinsInstalled) {
470 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
471 assert (hasObject != 0);
475 if (builtinsInstalled) {
476 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
477 assert (subrelationOf != 0);
481 if (builtinsInstalled) {
482 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
483 assert (superrelationOf != 0);
487 if (builtinsInstalled) {
488 library = core.getBuiltin(Layer0.URIs.Library);
489 assert (library != 0);
493 if (builtinsInstalled) {
494 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
495 assert (consistsOf != 0);
499 if (builtinsInstalled) {
500 hasName = core.getBuiltin(Layer0.URIs.HasName);
501 assert (hasName != 0);
507 final public void releaseWrite(ReadGraphImpl graph) {
508 performDirtyUpdates(graph);
509 modificationCounter++;
512 final public int getId(final Resource r) {
513 return querySupport.getId(r);
516 public QuerySupport getCore() {
520 public int getFunctionalRelation() {
521 return functionalRelation;
524 public int getInherits() {
528 public int getInstanceOf() {
532 public int getInverseOf() {
536 public int getSubrelationOf() {
537 return subrelationOf;
540 public int getSuperrelationOf() {
541 return superrelationOf;
544 public int getAsserts() {
548 public int getHasPredicate() {
552 public int getHasPredicateInverse() {
553 return hasPredicateInverse;
556 public int getHasObject() {
560 public int getRootLibrary() {
564 public Resource getRootLibraryResource() {
565 if (rootLibraryResource == null) {
566 // Synchronization is not needed here, it doesn't matter if multiple
567 // threads simultaneously set rootLibraryResource once.
568 int root = getRootLibrary();
570 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
571 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
573 return rootLibraryResource;
576 public int getLibrary() {
580 public int getConsistsOf() {
584 public int getHasName() {
588 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
592 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
595 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
597 if (result != null && result != 0) {
598 procedure.execute(graph, result);
602 // Fall back to using the fixed builtins.
603 // result = querySupport.getBuiltin(id);
604 // if (result != 0) {
605 // procedure.execute(graph, result);
610 // result = querySupport.getRandomAccessReference(id);
611 // } catch (ResourceNotFoundException e) {
612 // procedure.exception(graph, e);
617 procedure.execute(graph, result);
619 procedure.exception(graph, new ResourceNotFoundException(id));
625 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
626 procedure.exception(graph, t);
630 } catch (DatabaseException e) {
634 procedure.exception(graph, e);
636 } catch (DatabaseException e1) {
638 Logger.defaultLogError(e1);
646 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
648 Integer result = querySupport.getBuiltin(id);
650 procedure.execute(graph, result);
652 procedure.exception(graph, new ResourceNotFoundException(id));
657 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
660 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
661 } catch (DatabaseException e) {
662 throw new IllegalStateException(e);
667 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
671 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
672 } catch (DatabaseException e) {
673 throw new IllegalStateException(e);
678 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
679 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
683 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
685 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
689 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
691 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
695 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
697 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
701 boolean isBound(ExternalReadEntry<?> entry) {
702 if(entry.hasParents()) return true;
703 else if(hasListener(entry)) return true;
707 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
709 if (parent != null && !inferred) {
711 if(!child.isImmutable(graph))
712 child.addParent(parent);
713 } catch (DatabaseException e) {
714 Logger.defaultLogError(e);
716 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
719 if (listener != null) {
720 return registerListener(child, listener, procedure);
728 static class Dummy implements InternalProcedure<Object>, IntProcedure {
731 public void execute(ReadGraphImpl graph, int i) {
735 public void finished(ReadGraphImpl graph) {
739 public void execute(ReadGraphImpl graph, Object result) {
743 public void exception(ReadGraphImpl graph, Throwable throwable) {
748 private static final Dummy dummy = new Dummy();
751 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
753 if (DebugPolicy.PERFORM)
754 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
757 assert (!collecting);
759 assert(query.assertNotDiscarded());
761 registerDependencies(graph, query, parent, listener, procedure, false);
763 // FRESH, REFUTED, EXCEPTED go here
764 if (!query.isReady()) {
769 query.computeForEach(graph, this, (Procedure)dummy, true);
770 return query.get(graph, this, null);
776 return query.get(graph, this, procedure);
784 interface QueryCollectorSupport {
785 public CacheCollectionResult allCaches();
786 public Collection<CacheEntry> getRootList();
787 public int getCurrentSize();
788 public int calculateCurrentSize();
789 public CacheEntryBase iterate(int level);
790 public void remove();
791 public void setLevel(CacheEntryBase entry, int level);
792 public boolean start(boolean flush);
795 interface QueryCollector {
797 public void collect(int youngTarget, int allowedTimeInMs);
801 class QueryCollectorSupportImpl implements QueryCollectorSupport {
803 private static final boolean DEBUG = false;
804 private static final double ITERATION_RATIO = 0.2;
806 private CacheCollectionResult iteration = new CacheCollectionResult();
807 private boolean fresh = true;
808 private boolean needDataInStart = true;
810 QueryCollectorSupportImpl() {
814 public CacheCollectionResult allCaches() {
815 CacheCollectionResult result = new CacheCollectionResult();
816 QueryProcessor.this.allCaches(result);
821 public boolean start(boolean flush) {
822 // We need new data from query maps
824 if(needDataInStart || flush) {
825 // Last run ended after processing all queries => refresh data
826 restart(flush ? 0.0 : ITERATION_RATIO);
828 // continue with previous big data
830 // Notify caller about iteration situation
831 return iteration.isAtStart();
834 private void restart(double targetRatio) {
836 needDataInStart = true;
838 long start = System.nanoTime();
841 // We need new data from query maps
843 int iterationSize = iteration.size()+1;
844 int diff = calculateCurrentSize()-iterationSize;
846 double ratio = (double)diff / (double)iterationSize;
847 boolean dirty = Math.abs(ratio) >= targetRatio;
850 iteration = allCaches();
852 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
853 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
854 System.err.print(" " + iteration.levels[i].size());
855 System.err.println("");
862 needDataInStart = false;
864 // We are returning here within the same GC round - reuse the cache table
873 public CacheEntryBase iterate(int level) {
875 CacheEntryBase entry = iteration.next(level);
877 restart(ITERATION_RATIO);
881 while(entry != null && entry.isDiscarded()) {
882 entry = iteration.next(level);
890 public void remove() {
895 public void setLevel(CacheEntryBase entry, int level) {
896 iteration.setLevel(entry, level);
899 public Collection<CacheEntry> getRootList() {
900 return cache.getRootList();
904 public int calculateCurrentSize() {
905 return cache.calculateCurrentSize();
909 public int getCurrentSize() {
914 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
916 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
917 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
919 public int querySize() {
923 public void gc(int youngTarget, int allowedTimeInMs) {
925 collector.collect(youngTarget, allowedTimeInMs);
929 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
931 assert (entry != null);
933 if (base.isDisposed())
936 return addListener(entry, base, procedure);
940 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
941 entry.setLastKnown(result);
944 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
946 assert (entry != null);
947 assert (procedure != null);
949 ArrayList<ListenerEntry> list = cache.listeners.get(entry);
951 list = new ArrayList<ListenerEntry>(1);
952 cache.listeners.put(entry, list);
955 ListenerEntry result = new ListenerEntry(entry, base, procedure);
956 int currentIndex = list.indexOf(result);
957 // There was already a listener
958 if(currentIndex > -1) {
959 ListenerEntry current = list.get(currentIndex);
960 if(!current.base.isDisposed()) return null;
961 list.set(currentIndex, result);
966 if(DebugPolicy.LISTENER) {
967 new Exception().printStackTrace();
968 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
975 private void scheduleListener(ListenerEntry entry) {
976 assert (entry != null);
977 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
978 scheduledListeners.add(entry);
981 private void removeListener(ListenerEntry entry) {
982 assert (entry != null);
983 ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
984 if(list == null) return;
985 boolean success = list.remove(entry);
988 cache.listeners.remove(entry.entry);
991 private boolean hasListener(CacheEntry entry) {
992 if(cache.listeners.get(entry) != null) return true;
996 boolean hasListenerAfterDisposing(CacheEntry entry) {
997 if(cache.listeners.get(entry) != null) {
998 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
999 ArrayList<ListenerEntry> list = null;
1000 for (ListenerEntry e : entries) {
1001 if (e.base.isDisposed()) {
1002 if(list == null) list = new ArrayList<ListenerEntry>();
1007 for (ListenerEntry e : list) {
1011 if (entries.isEmpty()) {
1012 cache.listeners.remove(entry);
1020 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
1021 hasListenerAfterDisposing(entry);
1022 if(cache.listeners.get(entry) != null)
1023 return cache.listeners.get(entry);
1025 return Collections.emptyList();
1028 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
1030 if(!workarea.containsKey(entry)) {
1032 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
1033 for(ListenerEntry e : getListenerEntries(entry))
1036 workarea.put(entry, ls);
1038 for(CacheEntry parent : entry.getParents(this)) {
1039 processListenerReport(parent, workarea);
1040 ls.addAll(workarea.get(parent));
1047 public synchronized ListenerReport getListenerReport() throws IOException {
1049 class ListenerReportImpl implements ListenerReport {
1051 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
1054 public void print(PrintStream b) {
1055 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
1056 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
1057 for(ListenerBase l : e.getValue()) {
1058 Integer i = hist.get(l);
1059 hist.put(l, i != null ? i-1 : -1);
1063 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1064 b.print("" + -p.second + " " + p.first + "\n");
1072 ListenerReportImpl result = new ListenerReportImpl();
1074 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1075 for(CacheEntryBase entry : all) {
1076 hasListenerAfterDisposing(entry);
1078 for(CacheEntryBase entry : all) {
1079 processListenerReport(entry, result.workarea);
1086 public synchronized String reportListeners(File file) throws IOException {
1091 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1092 ListenerReport report = getListenerReport();
1095 return "Done reporting listeners.";
1099 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1101 if(entry.isDiscarded()) return;
1102 if(workarea.containsKey(entry)) return;
1104 Iterable<CacheEntry> parents = entry.getParents(this);
1105 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1106 for(CacheEntry e : parents) {
1107 if(e.isDiscarded()) continue;
1109 processParentReport(e, workarea);
1111 workarea.put(entry, ps);
1115 public synchronized String reportQueryActivity(File file) throws IOException {
1117 System.err.println("reportQueries " + file.getAbsolutePath());
1122 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1124 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
1125 Collections.reverse(entries);
1127 for(Pair<String,Integer> entry : entries) {
1128 b.println(entry.first + ": " + entry.second);
1133 Development.histogram.clear();
1139 public synchronized String reportQueries(File file) throws IOException {
1141 System.err.println("reportQueries " + file.getAbsolutePath());
1146 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1148 long start = System.nanoTime();
1150 // ArrayList<CacheEntry> all = ;
1152 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
1153 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1154 for(CacheEntryBase entry : caches) {
1155 processParentReport(entry, workarea);
1158 // for(CacheEntry e : all) System.err.println("entry: " + e);
1160 long duration = System.nanoTime() - start;
1161 System.err.println("Query root set in " + 1e-9*duration + "s.");
1163 start = System.nanoTime();
1165 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
1169 for(CacheEntry entry : workarea.keySet()) {
1170 boolean listener = hasListenerAfterDisposing(entry);
1171 boolean hasParents = entry.getParents(this).iterator().hasNext();
1174 flagMap.put(entry, 0);
1175 } else if (!hasParents) {
1177 flagMap.put(entry, 1);
1180 flagMap.put(entry, 2);
1182 // // Write leaf bit
1183 // entry.flags |= 4;
1186 boolean done = true;
1193 long start2 = System.nanoTime();
1195 int boundCounter = 0;
1196 int unboundCounter = 0;
1197 int unknownCounter = 0;
1199 for(CacheEntry<?> entry : workarea.keySet()) {
1201 //System.err.println("process " + entry);
1203 int flags = flagMap.get(entry);
1204 int bindStatus = flags & 3;
1206 if(bindStatus == 0) boundCounter++;
1207 else if(bindStatus == 1) unboundCounter++;
1208 else if(bindStatus == 2) unknownCounter++;
1210 if(bindStatus < 2) continue;
1213 for(CacheEntry parent : entry.getParents(this)) {
1215 if(parent.isDiscarded()) flagMap.put(parent, 1);
1217 int flags2 = flagMap.get(parent);
1218 int bindStatus2 = flags2 & 3;
1219 // Parent is bound => child is bound
1220 if(bindStatus2 == 0) {
1224 // Parent is unknown => child is unknown
1225 else if (bindStatus2 == 2) {
1232 flagMap.put(entry, newStatus);
1236 duration = System.nanoTime() - start2;
1237 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1238 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1240 } while(!done && loops++ < 20);
1244 for(CacheEntry entry : workarea.keySet()) {
1246 int bindStatus = flagMap.get(entry);
1247 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1253 duration = System.nanoTime() - start;
1254 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1256 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1258 for(CacheEntry entry : workarea.keySet()) {
1259 Class<?> clazz = entry.getClass();
1260 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
1261 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
1262 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
1263 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
1264 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
1265 Integer c = counts.get(clazz);
1266 if(c == null) counts.put(clazz, -1);
1267 else counts.put(clazz, c-1);
1270 b.print("// Simantics DB client query report file\n");
1271 b.print("// This file contains the following information\n");
1272 b.print("// -The amount of cached query instances per query class\n");
1273 b.print("// -The sizes of retained child sets\n");
1274 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1275 b.print("// -Followed by status, where\n");
1276 b.print("// -0=bound\n");
1277 b.print("// -1=free\n");
1278 b.print("// -2=unknown\n");
1279 b.print("// -L=has listener\n");
1280 b.print("// -List of children for each query (search for 'C <query name>')\n");
1282 b.print("----------------------------------------\n");
1284 b.print("// Queries by class\n");
1285 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1286 b.print(-p.second + " " + p.first.getName() + "\n");
1289 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1290 for(CacheEntry e : workarea.keySet())
1293 boolean changed = true;
1295 while(changed && iter++<50) {
1299 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1300 for(CacheEntry e : workarea.keySet())
1303 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1304 Integer c = hist.get(e.getKey());
1305 for(CacheEntry p : e.getValue()) {
1306 Integer i = newHist.get(p);
1307 newHist.put(p, i+c);
1310 for(CacheEntry e : workarea.keySet()) {
1311 Integer value = newHist.get(e);
1312 Integer old = hist.get(e);
1313 if(!value.equals(old)) {
1315 // System.err.println("hist " + e + ": " + old + " => " + value);
1320 System.err.println("Retained set iteration " + iter);
1324 b.print("// Queries by retained set\n");
1325 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1326 b.print("" + -p.second + " " + p.first + "\n");
1329 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1331 b.print("// Entry parent listing\n");
1332 for(CacheEntry entry : workarea.keySet()) {
1333 int status = flagMap.get(entry);
1334 boolean hasListener = hasListenerAfterDisposing(entry);
1335 b.print("Q " + entry.toString());
1337 b.print(" (L" + status + ")");
1340 b.print(" (" + status + ")");
1343 for(CacheEntry parent : workarea.get(entry)) {
1344 Collection<CacheEntry> inv = inverse.get(parent);
1346 inv = new ArrayList<CacheEntry>();
1347 inverse.put(parent, inv);
1350 b.print(" " + parent.toString());
1355 b.print("// Entry child listing\n");
1356 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1357 b.print("C " + entry.getKey().toString());
1359 for(CacheEntry child : entry.getValue()) {
1360 Integer h = hist.get(child);
1364 b.print(" <no children>");
1366 b.print(" " + child.toString());
1371 b.print("#queries: " + workarea.keySet().size() + "\n");
1372 b.print("#listeners: " + listeners + "\n");
1376 return "Dumped " + workarea.keySet().size() + " queries.";
1382 public CacheEntry caller;
1384 public CacheEntry entry;
1388 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
1389 this.caller = caller;
1391 this.indent = indent;
1396 boolean removeQuery(CacheEntry entry) {
1398 // This entry has been removed before. No need to do anything here.
1399 if(entry.isDiscarded()) return false;
1401 assert (!entry.isDiscarded());
1403 Query query = entry.getQuery();
1405 query.removeEntry(this);
1410 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1421 * @return true if this entry is being listened
1423 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1427 CacheEntry entry = e.entry;
1429 //System.err.println("updateQuery " + entry);
1432 * If the dependency graph forms a DAG, some entries are inserted in the
1433 * todo list many times. They only need to be processed once though.
1435 if (entry.isDiscarded()) {
1436 if (Development.DEVELOPMENT) {
1437 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1438 System.out.print("D");
1439 for (int i = 0; i < e.indent; i++)
1440 System.out.print(" ");
1441 System.out.println(entry.getQuery());
1444 // System.err.println(" => DISCARDED");
1448 if (entry.isRefuted()) {
1449 if (Development.DEVELOPMENT) {
1450 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1451 System.out.print("R");
1452 for (int i = 0; i < e.indent; i++)
1453 System.out.print(" ");
1454 System.out.println(entry.getQuery());
1460 if (entry.isExcepted()) {
1461 if (Development.DEVELOPMENT) {
1462 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1463 System.out.print("E");
1468 if (entry.isPending()) {
1469 if (Development.DEVELOPMENT) {
1470 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1471 System.out.print("P");
1478 if (Development.DEVELOPMENT) {
1479 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1480 System.out.print("U ");
1481 for (int i = 0; i < e.indent; i++)
1482 System.out.print(" ");
1483 System.out.print(entry.getQuery());
1487 Query query = entry.getQuery();
1488 int type = query.type();
1490 boolean hasListener = hasListener(entry);
1492 if (Development.DEVELOPMENT) {
1493 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1494 if(hasListener(entry)) {
1495 System.out.println(" (L)");
1497 System.out.println("");
1502 if(entry.isPending() || entry.isExcepted()) {
1505 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1507 immediates.put(entry, entry);
1522 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1524 immediates.put(entry, entry);
1538 // System.err.println(" => FOO " + type);
1541 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1542 if(entries != null) {
1543 for (ListenerEntry le : entries) {
1544 scheduleListener(le);
1549 // If invalid, update parents
1550 if (type == RequestFlags.INVALIDATE) {
1551 updateParents(e.indent, entry, todo);
1558 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
1560 Iterable<CacheEntry> oldParents = entry.getParents(this);
1561 for (CacheEntry parent : oldParents) {
1562 // System.err.println("updateParents " + entry + " => " + parent);
1563 if(!parent.isDiscarded())
1564 todo.push(new UpdateEntry(entry, parent, indent + 2));
1569 private boolean pruneListener(ListenerEntry entry) {
1570 if (entry.base.isDisposed()) {
1571 removeListener(entry);
1579 * @param av1 an array (guaranteed)
1580 * @param av2 any object
1581 * @return <code>true</code> if the two arrays are equal
1583 private final boolean arrayEquals(Object av1, Object av2) {
1586 Class<?> c1 = av1.getClass().getComponentType();
1587 Class<?> c2 = av2.getClass().getComponentType();
1588 if (c2 == null || !c1.equals(c2))
1590 boolean p1 = c1.isPrimitive();
1591 boolean p2 = c2.isPrimitive();
1595 return Arrays.equals((Object[]) av1, (Object[]) av2);
1596 if (boolean.class.equals(c1))
1597 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1598 else if (byte.class.equals(c1))
1599 return Arrays.equals((byte[]) av1, (byte[]) av2);
1600 else if (int.class.equals(c1))
1601 return Arrays.equals((int[]) av1, (int[]) av2);
1602 else if (long.class.equals(c1))
1603 return Arrays.equals((long[]) av1, (long[]) av2);
1604 else if (float.class.equals(c1))
1605 return Arrays.equals((float[]) av1, (float[]) av2);
1606 else if (double.class.equals(c1))
1607 return Arrays.equals((double[]) av1, (double[]) av2);
1608 throw new RuntimeException("??? Contact application querySupport.");
1613 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1617 Query query = entry.getQuery();
1619 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
1621 entry.prepareRecompute(querySupport);
1623 ReadGraphImpl parentGraph = graph.forRecompute(entry);
1625 query.recompute(parentGraph);
1627 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1629 Object newValue = entry.getResult();
1631 if (ListenerEntry.NO_VALUE == oldValue) {
1632 if(DebugPolicy.CHANGES) {
1633 System.out.println("C " + query);
1634 System.out.println("- " + oldValue);
1635 System.out.println("- " + newValue);
1640 boolean changed = false;
1642 if (newValue != null) {
1643 if (newValue.getClass().isArray()) {
1644 changed = !arrayEquals(newValue, oldValue);
1646 changed = !newValue.equals(oldValue);
1649 changed = (oldValue != null);
1651 if(DebugPolicy.CHANGES && changed) {
1652 System.out.println("C " + query);
1653 System.out.println("- " + oldValue);
1654 System.out.println("- " + newValue);
1657 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1659 } catch (Throwable t) {
1661 Logger.defaultLogError(t);
1663 return ListenerEntry.NO_VALUE;
1669 public boolean hasScheduledUpdates() {
1670 return !scheduledListeners.isEmpty();
1673 public void performScheduledUpdates(WriteGraphImpl graph) {
1676 assert (!cache.collecting);
1677 assert (!firingListeners);
1679 firingListeners = true;
1683 // Performing may cause further events to be scheduled.
1684 while (!scheduledListeners.isEmpty()) {
1687 // graph.state.barrier.inc();
1689 // Clone current events to make new entries possible during
1691 THashSet<ListenerEntry> entries = scheduledListeners;
1692 scheduledListeners = new THashSet<ListenerEntry>();
1694 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
1696 for (ListenerEntry listenerEntry : entries) {
1698 if (pruneListener(listenerEntry)) {
1699 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
1703 final CacheEntry entry = listenerEntry.entry;
1704 assert (entry != null);
1706 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
1708 if (newValue != ListenerEntry.NOT_CHANGED) {
1709 if(DebugPolicy.LISTENER)
1710 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
1711 schedule.add(listenerEntry);
1712 listenerEntry.setLastKnown(entry.getResult());
1717 for(ListenerEntry listenerEntry : schedule) {
1718 final CacheEntry entry = listenerEntry.entry;
1719 if(DebugPolicy.LISTENER)
1720 System.out.println("Firing " + listenerEntry.procedure);
1722 if(DebugPolicy.LISTENER)
1723 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
1724 entry.performFromCache(graph, listenerEntry.procedure);
1725 } catch (Throwable t) {
1726 t.printStackTrace();
1730 // graph.state.barrier.dec();
1731 // graph.waitAsync(null);
1732 // graph.state.barrier.assertReady();
1737 firingListeners = false;
1744 * @return true if this entry still has listeners
1746 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1748 assert (!cache.collecting);
1752 boolean hadListeners = false;
1753 boolean listenersUnknown = false;
1757 assert(entry != null);
1758 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1759 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1760 todo.add(new UpdateEntry(null, entry, 0));
1764 // Walk the tree and collect immediate updates
1765 while (!todo.isEmpty()) {
1766 UpdateEntry e = todo.pop();
1767 hadListeners |= updateQuery(e, todo, immediates);
1770 if(immediates.isEmpty()) break;
1772 // Evaluate all immediate updates and collect parents to update
1773 for(CacheEntry immediate : immediates.values()) {
1775 if(immediate.isDiscarded()) {
1779 if(immediate.isExcepted()) {
1781 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1782 if (newValue != ListenerEntry.NOT_CHANGED)
1783 updateParents(0, immediate, todo);
1787 Object oldValue = immediate.getResult();
1788 Object newValue = compareTo(graph, immediate, oldValue);
1790 if (newValue != ListenerEntry.NOT_CHANGED) {
1791 updateParents(0, immediate, todo);
1793 // If not changed, keep the old value
1794 immediate.setResult(oldValue);
1795 immediate.setReady();
1796 listenersUnknown = true;
1806 } catch (Throwable t) {
1807 Logger.defaultLogError(t);
1813 return hadListeners | listenersUnknown;
1817 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1818 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1819 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1820 // Maybe use a mutex from util.concurrent?
1821 private Object primitiveUpdateLock = new Object();
1822 private THashSet scheduledPrimitiveUpdates = new THashSet();
1824 public void performDirtyUpdates(final ReadGraphImpl graph) {
1826 cache.dirty = false;
1829 if (Development.DEVELOPMENT) {
1830 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1831 System.err.println("== Query update ==");
1835 // Special case - one statement
1836 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1838 long arg0 = scheduledObjectUpdates.getFirst();
1840 final int subject = (int)(arg0 >>> 32);
1841 final int predicate = (int)(arg0 & 0xffffffff);
1843 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1844 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1845 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1847 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1848 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1849 if(principalTypes != null) update(graph, principalTypes);
1850 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1851 if(types != null) update(graph, types);
1854 if(predicate == subrelationOf) {
1855 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1856 if(superRelations != null) update(graph, superRelations);
1859 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1860 if(dp != null) update(graph, dp);
1861 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1862 if(os != null) update(graph, os);
1864 scheduledObjectUpdates.clear();
1869 // Special case - one value
1870 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1872 int arg0 = scheduledValueUpdates.getFirst();
1874 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1875 if(valueQuery != null) update(graph, valueQuery);
1877 scheduledValueUpdates.clear();
1882 final TIntHashSet predicates = new TIntHashSet();
1883 final TIntHashSet orderedSets = new TIntHashSet();
1885 THashSet primitiveUpdates;
1886 synchronized (primitiveUpdateLock) {
1887 primitiveUpdates = scheduledPrimitiveUpdates;
1888 scheduledPrimitiveUpdates = new THashSet();
1891 primitiveUpdates.forEach(new TObjectProcedure() {
1894 public boolean execute(Object arg0) {
1896 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1897 if (query != null) {
1898 boolean listening = update(graph, query);
1899 if (!listening && !query.hasParents()) {
1900 cache.externalReadEntryMap.remove(arg0);
1909 scheduledValueUpdates.forEach(new TIntProcedure() {
1912 public boolean execute(int arg0) {
1913 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1914 if(valueQuery != null) update(graph, valueQuery);
1920 scheduledInvalidates.forEach(new TIntProcedure() {
1923 public boolean execute(int resource) {
1925 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1926 if(valueQuery != null) update(graph, valueQuery);
1928 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1929 if(principalTypes != null) update(graph, principalTypes);
1930 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1931 if(types != null) update(graph, types);
1933 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1934 if(superRelations != null) update(graph, superRelations);
1936 predicates.add(resource);
1943 scheduledObjectUpdates.forEach(new TLongProcedure() {
1946 public boolean execute(long arg0) {
1948 final int subject = (int)(arg0 >>> 32);
1949 final int predicate = (int)(arg0 & 0xffffffff);
1951 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1952 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1953 if(principalTypes != null) update(graph, principalTypes);
1954 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1955 if(types != null) update(graph, types);
1958 if(predicate == subrelationOf) {
1959 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1960 if(superRelations != null) update(graph, superRelations);
1963 predicates.add(subject);
1964 orderedSets.add(predicate);
1972 predicates.forEach(new TIntProcedure() {
1975 public boolean execute(final int subject) {
1977 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1978 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1979 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1981 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1982 if(entry != null) update(graph, entry);
1990 orderedSets.forEach(new TIntProcedure() {
1993 public boolean execute(int orderedSet) {
1995 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1996 if(entry != null) update(graph, entry);
2004 // for (Integer subject : predicates) {
2005 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
2006 // if(entry != null) update(graph, entry);
2010 if (Development.DEVELOPMENT) {
2011 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2012 System.err.println("== Query update ends ==");
2016 scheduledValueUpdates.clear();
2017 scheduledObjectUpdates.clear();
2018 scheduledInvalidates.clear();
2022 public void updateValue(final int resource) {
2023 scheduledValueUpdates.add(resource);
2027 public void updateStatements(final int resource, final int predicate) {
2028 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
2032 private int lastInvalidate = 0;
2034 public void invalidateResource(final int resource) {
2035 if(lastInvalidate == resource) return;
2036 scheduledValueUpdates.add(resource);
2037 lastInvalidate = resource;
2041 public void updatePrimitive(final ExternalRead primitive) {
2043 // External reads may be updated from arbitrary threads.
2044 // Synchronize to prevent race-conditions.
2045 synchronized (primitiveUpdateLock) {
2046 scheduledPrimitiveUpdates.add(primitive);
2048 querySupport.dirtyPrimitives();
2053 public synchronized String toString() {
2054 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
2058 protected void doDispose() {
2060 for(int index = 0; index < THREADS; index++) {
2061 executors[index].dispose();
2065 for(int i=0;i<100;i++) {
2067 boolean alive = false;
2068 for(int index = 0; index < THREADS; index++) {
2069 alive |= executors[index].isAlive();
2074 } catch (InterruptedException e) {
2075 Logger.defaultLogError(e);
2080 // Then start interrupting
2081 for(int i=0;i<100;i++) {
2083 boolean alive = false;
2084 for(int index = 0; index < THREADS; index++) {
2085 alive |= executors[index].isAlive();
2088 for(int index = 0; index < THREADS; index++) {
2089 executors[index].interrupt();
2093 // // Then just destroy
2094 // for(int index = 0; index < THREADS; index++) {
2095 // executors[index].destroy();
2098 for(int index = 0; index < THREADS; index++) {
2100 executors[index].join(5000);
2101 } catch (InterruptedException e) {
2102 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2104 executors[index] = null;
2109 public int getHits() {
2113 public int getMisses() {
2114 return cache.misses;
2117 public int getSize() {
2121 public Set<Long> getReferencedClusters() {
2122 HashSet<Long> result = new HashSet<Long>();
2123 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
2124 Objects query = (Objects) entry.getQuery();
2125 result.add(querySupport.getClusterId(query.r1()));
2127 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
2128 DirectPredicates query = (DirectPredicates) entry.getQuery();
2129 result.add(querySupport.getClusterId(query.id));
2131 for (CacheEntry entry : cache.valueQueryMap.values()) {
2132 ValueQuery query = (ValueQuery) entry.getQuery();
2133 result.add(querySupport.getClusterId(query.id));
2138 public void assertDone() {
2141 CacheCollectionResult allCaches(CacheCollectionResult result) {
2143 return cache.allCaches(result);
2147 public void printDiagnostics() {
2150 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
2151 querySupport.requestCluster(graph, clusterId, runnable);
2154 public int clean() {
2155 collector.collect(0, Integer.MAX_VALUE);
2159 public void clean(final Collection<ExternalRead<?>> requests) {
2160 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
2161 Iterator<ExternalRead<?>> iterator = requests.iterator();
2163 public CacheCollectionResult allCaches() {
2164 throw new UnsupportedOperationException();
2167 public CacheEntryBase iterate(int level) {
2168 if(iterator.hasNext()) {
2169 ExternalRead<?> request = iterator.next();
2170 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2171 if (entry != null) return entry;
2172 else return iterate(level);
2174 iterator = requests.iterator();
2179 public void remove() {
2180 throw new UnsupportedOperationException();
2183 public void setLevel(CacheEntryBase entry, int level) {
2184 throw new UnsupportedOperationException();
2187 public Collection<CacheEntry> getRootList() {
2188 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
2189 for (ExternalRead<?> request : requests) {
2190 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2197 public int getCurrentSize() {
2201 public int calculateCurrentSize() {
2202 // This tells the collector to attempt collecting everything.
2203 return Integer.MAX_VALUE;
2206 public boolean start(boolean flush) {
2210 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
2213 public void scanPending() {
2215 cache.scanPending();
2219 public ReadGraphImpl graphForVirtualRequest() {
2220 return ReadGraphImpl.createAsync(this);
2224 private HashMap<Resource, Class<?>> builtinValues;
2226 public Class<?> getBuiltinValue(Resource r) {
2227 if(builtinValues == null) initBuiltinValues();
2228 return builtinValues.get(r);
2231 Exception callerException = null;
2233 public interface AsyncBarrier {
2236 // public void inc(String debug);
2237 // public void dec(String debug);
2240 // final public QueryProcessor processor;
2241 // final public QuerySupport support;
2243 // boolean disposed = false;
2245 private void initBuiltinValues() {
2247 Layer0 b = getSession().peekService(Layer0.class);
2248 if(b == null) return;
2250 builtinValues = new HashMap<Resource, Class<?>>();
2252 builtinValues.put(b.String, String.class);
2253 builtinValues.put(b.Double, Double.class);
2254 builtinValues.put(b.Float, Float.class);
2255 builtinValues.put(b.Long, Long.class);
2256 builtinValues.put(b.Integer, Integer.class);
2257 builtinValues.put(b.Byte, Byte.class);
2258 builtinValues.put(b.Boolean, Boolean.class);
2260 builtinValues.put(b.StringArray, String[].class);
2261 builtinValues.put(b.DoubleArray, double[].class);
2262 builtinValues.put(b.FloatArray, float[].class);
2263 builtinValues.put(b.LongArray, long[].class);
2264 builtinValues.put(b.IntegerArray, int[].class);
2265 builtinValues.put(b.ByteArray, byte[].class);
2266 builtinValues.put(b.BooleanArray, boolean[].class);
2270 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
2272 // if (null == provider2) {
2273 // this.processor = null;
2277 // this.processor = provider2;
2278 // support = provider2.getCore();
2279 // initBuiltinValues();
2283 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2284 // return new ReadGraphSupportImpl(impl.processor);
2288 final public Session getSession() {
2292 final public ResourceSupport getResourceSupport() {
2293 return resourceSupport;
2297 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2299 throw new UnsupportedOperationException();
2301 // assert(subject != null);
2302 // assert(procedure != null);
2304 // final ListenerBase listener = getListenerBase(procedure);
2306 // IntProcedure ip = new IntProcedure() {
2308 // AtomicBoolean first = new AtomicBoolean(true);
2311 // public void execute(ReadGraphImpl graph, int i) {
2313 // if(first.get()) {
2314 // procedure.execute(graph, querySupport.getResource(i));
2316 // procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2318 // } catch (Throwable t2) {
2319 // Logger.defaultLogError(t2);
2324 // public void finished(ReadGraphImpl graph) {
2326 // if(first.compareAndSet(true, false)) {
2327 // procedure.finished(graph);
2328 //// impl.state.barrier.dec(this);
2330 // procedure.finished(impl.newRestart(graph));
2333 // } catch (Throwable t2) {
2334 // Logger.defaultLogError(t2);
2339 // public void exception(ReadGraphImpl graph, Throwable t) {
2341 // if(first.compareAndSet(true, false)) {
2342 // procedure.exception(graph, t);
2344 // procedure.exception(impl.newRestart(graph), t);
2346 // } catch (Throwable t2) {
2347 // Logger.defaultLogError(t2);
2353 // int sId = querySupport.getId(subject);
2356 // QueryCache.runnerPredicates(impl, sId, impl.parent, listener, ip);
2357 // } catch (DatabaseException e) {
2358 // Logger.defaultLogError(e);
2364 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2366 throw new UnsupportedOperationException();
2368 // assert(subject != null);
2369 // assert(procedure != null);
2371 // final ListenerBase listener = getListenerBase(procedure);
2374 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2377 // public void execute(ReadGraphImpl graph, int i) {
2379 // procedure.execute(querySupport.getResource(i));
2380 // } catch (Throwable t2) {
2381 // Logger.defaultLogError(t2);
2386 // public void finished(ReadGraphImpl graph) {
2388 // procedure.finished();
2389 // } catch (Throwable t2) {
2390 // Logger.defaultLogError(t2);
2392 //// impl.state.barrier.dec();
2396 // public void exception(ReadGraphImpl graph, Throwable t) {
2398 // procedure.exception(t);
2399 // } catch (Throwable t2) {
2400 // Logger.defaultLogError(t2);
2402 //// impl.state.barrier.dec();
2406 // } catch (DatabaseException e) {
2407 // Logger.defaultLogError(e);
2413 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2414 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2418 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2419 final Resource predicate, final MultiProcedure<Statement> procedure) {
2421 assert(subject != null);
2422 assert(predicate != null);
2423 assert(procedure != null);
2425 final ListenerBase listener = getListenerBase(procedure);
2427 // impl.state.barrier.inc();
2430 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2433 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2435 procedure.execute(querySupport.getStatement(s, p, o));
2436 } catch (Throwable t2) {
2437 Logger.defaultLogError(t2);
2442 public void finished(ReadGraphImpl graph) {
2444 procedure.finished();
2445 } catch (Throwable t2) {
2446 Logger.defaultLogError(t2);
2448 // impl.state.barrier.dec();
2452 public void exception(ReadGraphImpl graph, Throwable t) {
2454 procedure.exception(t);
2455 } catch (Throwable t2) {
2456 Logger.defaultLogError(t2);
2458 // impl.state.barrier.dec();
2462 } catch (DatabaseException e) {
2463 Logger.defaultLogError(e);
2469 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2470 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2472 assert(subject != null);
2473 assert(predicate != null);
2474 assert(procedure != null);
2476 final ListenerBase listener = getListenerBase(procedure);
2478 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2480 boolean first = true;
2483 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2486 procedure.execute(graph, querySupport.getStatement(s, p, o));
2488 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2490 } catch (Throwable t2) {
2491 Logger.defaultLogError(t2);
2496 public void finished(ReadGraphImpl graph) {
2501 procedure.finished(graph);
2502 // impl.state.barrier.dec(this);
2504 procedure.finished(impl.newRestart(graph));
2506 } catch (Throwable t2) {
2507 Logger.defaultLogError(t2);
2513 public void exception(ReadGraphImpl graph, Throwable t) {
2518 procedure.exception(graph, t);
2519 // impl.state.barrier.dec(this);
2521 procedure.exception(impl.newRestart(graph), t);
2523 } catch (Throwable t2) {
2524 Logger.defaultLogError(t2);
2531 int sId = querySupport.getId(subject);
2532 int pId = querySupport.getId(predicate);
2534 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2535 // else impl.state.barrier.inc(null, null);
2538 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2539 } catch (DatabaseException e) {
2540 Logger.defaultLogError(e);
2546 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2547 final Resource predicate, final StatementProcedure procedure) {
2549 assert(subject != null);
2550 assert(predicate != null);
2551 assert(procedure != null);
2553 final ListenerBase listener = getListenerBase(procedure);
2555 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2557 boolean first = true;
2560 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2563 procedure.execute(graph, s, p, o);
2565 procedure.execute(impl.newRestart(graph), s, p, o);
2567 } catch (Throwable t2) {
2568 Logger.defaultLogError(t2);
2573 public void finished(ReadGraphImpl graph) {
2578 procedure.finished(graph);
2579 // impl.state.barrier.dec(this);
2581 procedure.finished(impl.newRestart(graph));
2583 } catch (Throwable t2) {
2584 Logger.defaultLogError(t2);
2590 public void exception(ReadGraphImpl graph, Throwable t) {
2595 procedure.exception(graph, t);
2596 // impl.state.barrier.dec(this);
2598 procedure.exception(impl.newRestart(graph), t);
2600 } catch (Throwable t2) {
2601 Logger.defaultLogError(t2);
2608 int sId = querySupport.getId(subject);
2609 int pId = querySupport.getId(predicate);
2611 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2612 // else impl.state.barrier.inc(null, null);
2615 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2616 } catch (DatabaseException e) {
2617 Logger.defaultLogError(e);
2623 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2625 assert(subject != null);
2626 assert(predicate != null);
2627 assert(procedure != null);
2629 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2631 private Set<Statement> current = null;
2632 private Set<Statement> run = new HashSet<Statement>();
2635 public void execute(AsyncReadGraph graph, Statement result) {
2637 boolean found = false;
2639 if(current != null) {
2641 found = current.remove(result);
2645 if(!found) procedure.add(graph, result);
2652 public void finished(AsyncReadGraph graph) {
2654 if(current != null) {
2655 for(Statement r : current) procedure.remove(graph, r);
2660 run = new HashSet<Statement>();
2665 public void exception(AsyncReadGraph graph, Throwable t) {
2666 procedure.exception(graph, t);
2670 public boolean isDisposed() {
2671 return procedure.isDisposed();
2679 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2680 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2682 assert(subject != null);
2683 assert(predicate != null);
2684 assert(procedure != null);
2686 final ListenerBase listener = getListenerBase(procedure);
2688 // impl.state.barrier.inc();
2691 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2694 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2696 procedure.execute(graph, querySupport.getStatement(s, p, o));
2697 } catch (Throwable t2) {
2698 Logger.defaultLogError(t2);
2703 public void finished(ReadGraphImpl graph) {
2705 procedure.finished(graph);
2706 } catch (Throwable t2) {
2707 Logger.defaultLogError(t2);
2709 // impl.state.barrier.dec();
2713 public void exception(ReadGraphImpl graph, Throwable t) {
2715 procedure.exception(graph, t);
2716 } catch (Throwable t2) {
2717 Logger.defaultLogError(t2);
2719 // impl.state.barrier.dec();
2723 } catch (DatabaseException e) {
2724 Logger.defaultLogError(e);
2729 private static ListenerBase getListenerBase(Object procedure) {
2730 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2735 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2737 assert(subject != null);
2738 assert(predicate != null);
2739 assert(procedure != null);
2741 final ListenerBase listener = getListenerBase(procedure);
2743 // impl.state.barrier.inc();
2746 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2749 public void execute(ReadGraphImpl graph, int i) {
2751 procedure.execute(querySupport.getResource(i));
2752 } catch (Throwable t2) {
2753 Logger.defaultLogError(t2);
2758 public void finished(ReadGraphImpl graph) {
2760 procedure.finished();
2761 } catch (Throwable t2) {
2762 Logger.defaultLogError(t2);
2764 // impl.state.barrier.dec();
2768 public void exception(ReadGraphImpl graph, Throwable t) {
2769 System.out.println("forEachObject exception " + t);
2771 procedure.exception(t);
2772 } catch (Throwable t2) {
2773 Logger.defaultLogError(t2);
2775 // impl.state.barrier.dec();
2779 } catch (DatabaseException e) {
2780 Logger.defaultLogError(e);
2786 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
2788 assert(subject != null);
2789 assert(procedure != null);
2791 final ListenerBase listener = getListenerBase(procedure);
2793 int sId = querySupport.getId(subject);
2796 QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
2799 public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
2800 procedure.execute(graph, result);
2804 public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
2805 procedure.exception(graph, throwable);
2809 } catch (DatabaseException e) {
2810 Logger.defaultLogError(e);
2815 final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
2817 // assert(subject != null);
2818 // assert(procedure != null);
2820 // final ListenerBase listener = getListenerBase(procedure);
2822 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2824 return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
2829 // final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2831 // assert(subject != null);
2832 // assert(procedure != null);
2834 // final ListenerBase listener = getListenerBase(procedure);
2836 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2840 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2843 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2845 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2847 private Resource single = null;
2850 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2851 if(single == null) {
2854 single = INVALID_RESOURCE;
2859 public synchronized void finished(AsyncReadGraph graph) {
2860 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2861 else procedure.execute(graph, single);
2865 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2866 procedure.exception(graph, throwable);
2873 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2875 final int sId = querySupport.getId(subject);
2876 final int pId = querySupport.getId(predicate);
2879 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2880 } catch (DatabaseException e) {
2881 Logger.defaultLogError(e);
2886 static class Runner2Procedure implements IntProcedure {
2888 public int single = 0;
2889 public Throwable t = null;
2891 public void clear() {
2897 public void execute(ReadGraphImpl graph, int i) {
2898 if(single == 0) single = i;
2903 public void finished(ReadGraphImpl graph) {
2904 if(single == -1) single = 0;
2908 public void exception(ReadGraphImpl graph, Throwable throwable) {
2913 public int get() throws DatabaseException {
2915 if(t instanceof DatabaseException) throw (DatabaseException)t;
2916 else throw new DatabaseException(t);
2923 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2925 final int sId = querySupport.getId(subject);
2926 final int pId = querySupport.getId(predicate);
2928 Runner2Procedure proc = new Runner2Procedure();
2929 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2934 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2936 assert(subject != null);
2937 assert(predicate != null);
2939 final ListenerBase listener = getListenerBase(procedure);
2941 if(impl.parent != null || listener != null) {
2943 IntProcedure ip = new IntProcedure() {
2945 AtomicBoolean first = new AtomicBoolean(true);
2948 public void execute(ReadGraphImpl graph, int i) {
2951 procedure.execute(impl, querySupport.getResource(i));
2953 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2955 } catch (Throwable t2) {
2956 Logger.defaultLogError(t2);
2962 public void finished(ReadGraphImpl graph) {
2964 if(first.compareAndSet(true, false)) {
2965 procedure.finished(impl);
2966 // impl.state.barrier.dec(this);
2968 procedure.finished(impl.newRestart(graph));
2970 } catch (Throwable t2) {
2971 Logger.defaultLogError(t2);
2976 public void exception(ReadGraphImpl graph, Throwable t) {
2978 procedure.exception(graph, t);
2979 } catch (Throwable t2) {
2980 Logger.defaultLogError(t2);
2982 // impl.state.barrier.dec(this);
2986 public String toString() {
2987 return "forEachObject with " + procedure;
2992 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2993 // else impl.state.barrier.inc(null, null);
2995 forEachObject(impl, subject, predicate, listener, ip);
2999 IntProcedure ip = new IntProcedure() {
3002 public void execute(ReadGraphImpl graph, int i) {
3003 procedure.execute(graph, querySupport.getResource(i));
3007 public void finished(ReadGraphImpl graph) {
3008 procedure.finished(graph);
3012 public void exception(ReadGraphImpl graph, Throwable t) {
3013 procedure.exception(graph, t);
3017 public String toString() {
3018 return "forEachObject with " + procedure;
3023 forEachObject(impl, subject, predicate, listener, ip);
3030 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3032 assert(subject != null);
3033 assert(predicate != null);
3034 assert(procedure != null);
3036 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3038 private Set<Resource> current = null;
3039 private Set<Resource> run = new HashSet<Resource>();
3042 public void execute(AsyncReadGraph graph, Resource result) {
3044 boolean found = false;
3046 if(current != null) {
3048 found = current.remove(result);
3052 if(!found) procedure.add(graph, result);
3059 public void finished(AsyncReadGraph graph) {
3061 if(current != null) {
3062 for(Resource r : current) procedure.remove(graph, r);
3067 run = new HashSet<Resource>();
3072 public boolean isDisposed() {
3073 return procedure.isDisposed();
3077 public void exception(AsyncReadGraph graph, Throwable t) {
3078 procedure.exception(graph, t);
3082 public String toString() {
3083 return "forObjectSet " + procedure;
3091 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3093 assert(subject != null);
3094 assert(procedure != null);
3096 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
3098 private Set<Resource> current = null;
3099 private Set<Resource> run = new HashSet<Resource>();
3102 public void execute(AsyncReadGraph graph, Resource result) {
3104 boolean found = false;
3106 if(current != null) {
3108 found = current.remove(result);
3112 if(!found) procedure.add(graph, result);
3119 public void finished(AsyncReadGraph graph) {
3121 if(current != null) {
3122 for(Resource r : current) procedure.remove(graph, r);
3127 run = new HashSet<Resource>();
3132 public boolean isDisposed() {
3133 return procedure.isDisposed();
3137 public void exception(AsyncReadGraph graph, Throwable t) {
3138 procedure.exception(graph, t);
3142 public String toString() {
3143 return "forPredicateSet " + procedure;
3151 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3153 assert(subject != null);
3154 assert(procedure != null);
3156 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
3158 private Set<Resource> current = null;
3159 private Set<Resource> run = new HashSet<Resource>();
3162 public void execute(AsyncReadGraph graph, Resource result) {
3164 boolean found = false;
3166 if(current != null) {
3168 found = current.remove(result);
3172 if(!found) procedure.add(graph, result);
3179 public void finished(AsyncReadGraph graph) {
3181 if(current != null) {
3182 for(Resource r : current) procedure.remove(graph, r);
3187 run = new HashSet<Resource>();
3192 public boolean isDisposed() {
3193 return procedure.isDisposed();
3197 public void exception(AsyncReadGraph graph, Throwable t) {
3198 procedure.exception(graph, t);
3202 public String toString() {
3203 return "forPrincipalTypeSet " + procedure;
3211 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3213 assert(subject != null);
3214 assert(predicate != null);
3215 assert(procedure != null);
3217 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3219 private Set<Resource> current = null;
3220 private Set<Resource> run = new HashSet<Resource>();
3223 public void execute(AsyncReadGraph graph, Resource result) {
3225 boolean found = false;
3227 if(current != null) {
3229 found = current.remove(result);
3233 if(!found) procedure.add(graph, result);
3240 public void finished(AsyncReadGraph graph) {
3242 if(current != null) {
3243 for(Resource r : current) procedure.remove(graph, r);
3248 run = new HashSet<Resource>();
3253 public boolean isDisposed() {
3254 return procedure.isDisposed();
3258 public void exception(AsyncReadGraph graph, Throwable t) {
3259 procedure.exception(graph, t);
3263 public String toString() {
3264 return "forObjectSet " + procedure;
3272 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3274 assert(subject != null);
3275 assert(predicate != null);
3276 assert(procedure != null);
3278 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3280 private Set<Statement> current = null;
3281 private Set<Statement> run = new HashSet<Statement>();
3284 public void execute(AsyncReadGraph graph, Statement result) {
3286 boolean found = false;
3288 if(current != null) {
3290 found = current.remove(result);
3294 if(!found) procedure.add(graph, result);
3301 public void finished(AsyncReadGraph graph) {
3303 if(current != null) {
3304 for(Statement s : current) procedure.remove(graph, s);
3309 run = new HashSet<Statement>();
3314 public boolean isDisposed() {
3315 return procedure.isDisposed();
3319 public void exception(AsyncReadGraph graph, Throwable t) {
3320 procedure.exception(graph, t);
3324 public String toString() {
3325 return "forStatementSet " + procedure;
3333 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
3334 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3336 assert(subject != null);
3337 assert(predicate != null);
3338 assert(procedure != null);
3340 final ListenerBase listener = getListenerBase(procedure);
3343 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3346 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3348 procedure.execute(graph, querySupport.getResource(o));
3349 } catch (Throwable t2) {
3350 Logger.defaultLogError(t2);
3355 public void finished(ReadGraphImpl graph) {
3357 procedure.finished(graph);
3358 } catch (Throwable t2) {
3359 Logger.defaultLogError(t2);
3361 // impl.state.barrier.dec();
3365 public void exception(ReadGraphImpl graph, Throwable t) {
3367 procedure.exception(graph, t);
3368 } catch (Throwable t2) {
3369 Logger.defaultLogError(t2);
3371 // impl.state.barrier.dec();
3375 } catch (DatabaseException e) {
3376 Logger.defaultLogError(e);
3382 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3384 assert(subject != null);
3385 assert(procedure != null);
3387 final ListenerBase listener = getListenerBase(procedure);
3389 IntProcedure ip = new IntProcedure() {
3392 public void execute(ReadGraphImpl graph, int i) {
3394 procedure.execute(graph, querySupport.getResource(i));
3395 } catch (Throwable t2) {
3396 Logger.defaultLogError(t2);
3401 public void finished(ReadGraphImpl graph) {
3403 procedure.finished(graph);
3404 } catch (Throwable t2) {
3405 Logger.defaultLogError(t2);
3407 // impl.state.barrier.dec(this);
3411 public void exception(ReadGraphImpl graph, Throwable t) {
3413 procedure.exception(graph, t);
3414 } catch (Throwable t2) {
3415 Logger.defaultLogError(t2);
3417 // impl.state.barrier.dec(this);
3422 int sId = querySupport.getId(subject);
3424 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3425 // else impl.state.barrier.inc(null, null);
3428 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3429 } catch (DatabaseException e) {
3430 Logger.defaultLogError(e);
3436 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3438 assert(subject != null);
3439 assert(procedure != null);
3441 final ListenerBase listener = getListenerBase(procedure);
3443 // impl.state.barrier.inc();
3446 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3449 public void execute(ReadGraphImpl graph, int i) {
3451 procedure.execute(querySupport.getResource(i));
3452 } catch (Throwable t2) {
3453 Logger.defaultLogError(t2);
3458 public void finished(ReadGraphImpl graph) {
3460 procedure.finished();
3461 } catch (Throwable t2) {
3462 Logger.defaultLogError(t2);
3464 // impl.state.barrier.dec();
3468 public void exception(ReadGraphImpl graph, Throwable t) {
3470 procedure.exception(t);
3471 } catch (Throwable t2) {
3472 Logger.defaultLogError(t2);
3474 // impl.state.barrier.dec();
3478 } catch (DatabaseException e) {
3479 Logger.defaultLogError(e);
3483 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3485 assert(subject != null);
3486 assert(procedure != null);
3488 final ListenerBase listener = getListenerBase(procedure);
3489 assert(listener == null);
3491 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3494 public void execute(final ReadGraphImpl graph, IntSet set) {
3495 procedure.execute(graph, set);
3499 public void exception(ReadGraphImpl graph, Throwable t) {
3500 procedure.exception(graph, t);
3505 int sId = querySupport.getId(subject);
3508 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3509 } catch (DatabaseException e) {
3510 Logger.defaultLogError(e);
3516 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3518 assert(subject != null);
3520 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3525 final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3527 assert(subject != null);
3529 return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null);
3534 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3536 assert(subject != null);
3537 assert(procedure != null);
3539 final ListenerBase listener = getListenerBase(procedure);
3542 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3544 AtomicBoolean first = new AtomicBoolean(true);
3547 public void execute(final ReadGraphImpl graph, IntSet set) {
3548 // final HashSet<Resource> result = new HashSet<Resource>();
3549 // set.forEach(new TIntProcedure() {
3552 // public boolean execute(int type) {
3553 // result.add(querySupport.getResource(type));
3559 if(first.compareAndSet(true, false)) {
3560 procedure.execute(graph, set);
3561 // impl.state.barrier.dec();
3563 procedure.execute(impl.newRestart(graph), set);
3565 } catch (Throwable t2) {
3566 Logger.defaultLogError(t2);
3571 public void exception(ReadGraphImpl graph, Throwable t) {
3573 if(first.compareAndSet(true, false)) {
3574 procedure.exception(graph, t);
3575 // impl.state.barrier.dec();
3577 procedure.exception(impl.newRestart(graph), t);
3579 } catch (Throwable t2) {
3580 Logger.defaultLogError(t2);
3585 } catch (DatabaseException e) {
3586 Logger.defaultLogError(e);
3592 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3594 assert(subject != null);
3595 assert(procedure != null);
3597 final ListenerBase listener = getListenerBase(procedure);
3599 IntProcedure ip = new IntProcedureAdapter() {
3602 public void execute(final ReadGraphImpl graph, int superRelation) {
3604 procedure.execute(graph, querySupport.getResource(superRelation));
3605 } catch (Throwable t2) {
3606 Logger.defaultLogError(t2);
3611 public void finished(final ReadGraphImpl graph) {
3613 procedure.finished(graph);
3614 } catch (Throwable t2) {
3615 Logger.defaultLogError(t2);
3617 // impl.state.barrier.dec(this);
3622 public void exception(ReadGraphImpl graph, Throwable t) {
3624 procedure.exception(graph, t);
3625 } catch (Throwable t2) {
3626 Logger.defaultLogError(t2);
3628 // impl.state.barrier.dec(this);
3633 int sId = querySupport.getId(subject);
3635 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3636 // else impl.state.barrier.inc(null, null);
3639 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3640 } catch (DatabaseException e) {
3641 Logger.defaultLogError(e);
3644 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3649 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3651 assert(subject != null);
3652 assert(procedure != null);
3654 final ListenerBase listener = getListenerBase(procedure);
3656 // impl.state.barrier.inc();
3658 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3663 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3665 assert(subject != null);
3666 assert(procedure != null);
3668 final ListenerBase listener = getListenerBase(procedure);
3670 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3673 public void execute(final ReadGraphImpl graph, IntSet set) {
3674 // final HashSet<Resource> result = new HashSet<Resource>();
3675 // set.forEach(new TIntProcedure() {
3678 // public boolean execute(int type) {
3679 // result.add(querySupport.getResource(type));
3685 procedure.execute(graph, set);
3686 } catch (Throwable t2) {
3687 Logger.defaultLogError(t2);
3689 // impl.state.barrier.dec(this);
3693 public void exception(ReadGraphImpl graph, Throwable t) {
3695 procedure.exception(graph, t);
3696 } catch (Throwable t2) {
3697 Logger.defaultLogError(t2);
3699 // impl.state.barrier.dec(this);
3704 int sId = querySupport.getId(subject);
3706 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3707 // else impl.state.barrier.inc(null, null);
3710 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3711 } catch (DatabaseException e) {
3712 Logger.defaultLogError(e);
3717 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3718 return getValue(impl, querySupport.getId(subject));
3721 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3722 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3726 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3728 assert(subject != null);
3729 assert(procedure != null);
3731 int sId = querySupport.getId(subject);
3733 // if(procedure != null) {
3735 final ListenerBase listener = getListenerBase(procedure);
3737 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3739 AtomicBoolean first = new AtomicBoolean(true);
3742 public void execute(ReadGraphImpl graph, byte[] result) {
3744 if(first.compareAndSet(true, false)) {
3745 procedure.execute(graph, result);
3746 // impl.state.barrier.dec(this);
3748 procedure.execute(impl.newRestart(graph), result);
3750 } catch (Throwable t2) {
3751 Logger.defaultLogError(t2);
3756 public void exception(ReadGraphImpl graph, Throwable t) {
3758 if(first.compareAndSet(true, false)) {
3759 procedure.exception(graph, t);
3760 // impl.state.barrier.dec(this);
3762 procedure.exception(impl.newRestart(graph), t);
3764 } catch (Throwable t2) {
3765 Logger.defaultLogError(t2);
3771 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3772 // else impl.state.barrier.inc(null, null);
3775 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3776 } catch (DatabaseException e) {
3777 throw new IllegalStateException("Internal error");
3782 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3786 // throw new IllegalStateException("Internal error");
3791 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3793 assert(subject != null);
3794 assert(procedure != null);
3796 final ListenerBase listener = getListenerBase(procedure);
3798 if(impl.parent != null || listener != null) {
3800 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3802 AtomicBoolean first = new AtomicBoolean(true);
3805 public void execute(ReadGraphImpl graph, byte[] result) {
3807 if(first.compareAndSet(true, false)) {
3808 procedure.execute(graph, result);
3809 // impl.state.barrier.dec(this);
3811 procedure.execute(impl.newRestart(graph), result);
3813 } catch (Throwable t2) {
3814 Logger.defaultLogError(t2);
3819 public void exception(ReadGraphImpl graph, Throwable t) {
3821 if(first.compareAndSet(true, false)) {
3822 procedure.exception(graph, t);
3823 // impl.state.barrier.dec(this);
3825 procedure.exception(impl.newRestart(graph), t);
3827 } catch (Throwable t2) {
3828 Logger.defaultLogError(t2);
3834 int sId = querySupport.getId(subject);
3836 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3837 // else impl.state.barrier.inc(null, null);
3840 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3841 } catch (DatabaseException e) {
3842 Logger.defaultLogError(e);
3847 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3850 public void execute(ReadGraphImpl graph, byte[] result) {
3852 procedure.execute(graph, result);
3857 public void exception(ReadGraphImpl graph, Throwable t) {
3859 procedure.exception(graph, t);
3865 int sId = querySupport.getId(subject);
3868 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3869 } catch (DatabaseException e) {
3870 Logger.defaultLogError(e);
3878 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3880 assert(relation != null);
3881 assert(procedure != null);
3883 final ListenerBase listener = getListenerBase(procedure);
3885 IntProcedure ip = new IntProcedure() {
3887 private int result = 0;
3889 final AtomicBoolean found = new AtomicBoolean(false);
3890 final AtomicBoolean done = new AtomicBoolean(false);
3893 public void finished(ReadGraphImpl graph) {
3895 // Shall fire exactly once!
3896 if(done.compareAndSet(false, true)) {
3899 procedure.exception(graph, new NoInverseException(""));
3900 // impl.state.barrier.dec(this);
3902 procedure.execute(graph, querySupport.getResource(result));
3903 // impl.state.barrier.dec(this);
3905 } catch (Throwable t) {
3906 Logger.defaultLogError(t);
3913 public void execute(ReadGraphImpl graph, int i) {
3915 if(found.compareAndSet(false, true)) {
3918 // Shall fire exactly once!
3919 if(done.compareAndSet(false, true)) {
3921 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3922 // impl.state.barrier.dec(this);
3923 } catch (Throwable t) {
3924 Logger.defaultLogError(t);
3932 public void exception(ReadGraphImpl graph, Throwable t) {
3933 // Shall fire exactly once!
3934 if(done.compareAndSet(false, true)) {
3936 procedure.exception(graph, t);
3937 // impl.state.barrier.dec(this);
3938 } catch (Throwable t2) {
3939 Logger.defaultLogError(t2);
3946 int sId = querySupport.getId(relation);
3948 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3949 // else impl.state.barrier.inc(null, null);
3952 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3953 } catch (DatabaseException e) {
3954 Logger.defaultLogError(e);
3960 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3963 assert(procedure != null);
3965 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3968 public void execute(ReadGraphImpl graph, Integer result) {
3970 procedure.execute(graph, querySupport.getResource(result));
3971 } catch (Throwable t2) {
3972 Logger.defaultLogError(t2);
3974 // impl.state.barrier.dec(this);
3978 public void exception(ReadGraphImpl graph, Throwable t) {
3981 procedure.exception(graph, t);
3982 } catch (Throwable t2) {
3983 Logger.defaultLogError(t2);
3985 // impl.state.barrier.dec(this);
3990 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3991 // else impl.state.barrier.inc(null, null);
3993 forResource(impl, id, impl.parent, ip);
3998 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
4001 assert(procedure != null);
4003 // impl.state.barrier.inc();
4006 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
4009 public void execute(ReadGraphImpl graph, Integer result) {
4011 procedure.execute(graph, querySupport.getResource(result));
4012 } catch (Throwable t2) {
4013 Logger.defaultLogError(t2);
4015 // impl.state.barrier.dec();
4019 public void exception(ReadGraphImpl graph, Throwable t) {
4021 procedure.exception(graph, t);
4022 } catch (Throwable t2) {
4023 Logger.defaultLogError(t2);
4025 // impl.state.barrier.dec();
4029 } catch (DatabaseException e) {
4030 Logger.defaultLogError(e);
4036 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4038 assert(subject != null);
4039 assert(procedure != null);
4041 final ListenerBase listener = getListenerBase(procedure);
4044 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
4045 procedure.execute(impl, !result.isEmpty());
4046 } catch (DatabaseException e) {
4047 procedure.exception(impl, e);
4053 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
4055 assert(subject != null);
4056 assert(predicate != null);
4057 assert(procedure != null);
4059 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
4061 boolean found = false;
4064 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4069 synchronized public void finished(AsyncReadGraph graph) {
4071 procedure.execute(graph, found);
4072 } catch (Throwable t2) {
4073 Logger.defaultLogError(t2);
4075 // impl.state.barrier.dec(this);
4079 public void exception(AsyncReadGraph graph, Throwable t) {
4081 procedure.exception(graph, t);
4082 } catch (Throwable t2) {
4083 Logger.defaultLogError(t2);
4085 // impl.state.barrier.dec(this);
4090 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
4091 // else impl.state.barrier.inc(null, null);
4093 forEachObject(impl, subject, predicate, ip);
4098 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
4100 assert(subject != null);
4101 assert(predicate != null);
4102 assert(procedure != null);
4104 // impl.state.barrier.inc();
4106 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
4108 boolean found = false;
4111 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4112 if(resource.equals(object)) found = true;
4116 synchronized public void finished(AsyncReadGraph graph) {
4118 procedure.execute(graph, found);
4119 } catch (Throwable t2) {
4120 Logger.defaultLogError(t2);
4122 // impl.state.barrier.dec();
4126 public void exception(AsyncReadGraph graph, Throwable t) {
4128 procedure.exception(graph, t);
4129 } catch (Throwable t2) {
4130 Logger.defaultLogError(t2);
4132 // impl.state.barrier.dec();
4140 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4142 assert(subject != null);
4143 assert(procedure != null);
4145 final ListenerBase listener = getListenerBase(procedure);
4147 // impl.state.barrier.inc();
4150 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
4153 public void execute(ReadGraphImpl graph, byte[] object) {
4154 boolean result = object != null;
4156 procedure.execute(graph, result);
4157 } catch (Throwable t2) {
4158 Logger.defaultLogError(t2);
4160 // impl.state.barrier.dec();
4164 public void exception(ReadGraphImpl graph, Throwable t) {
4166 procedure.exception(graph, t);
4167 } catch (Throwable t2) {
4168 Logger.defaultLogError(t2);
4170 // impl.state.barrier.dec();
4174 } catch (DatabaseException e) {
4175 Logger.defaultLogError(e);
4181 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4183 assert(subject != null);
4184 assert(procedure != null);
4186 final ListenerBase listener = getListenerBase(procedure);
4190 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
4193 public void exception(ReadGraphImpl graph, Throwable t) {
4195 procedure.exception(graph, t);
4196 } catch (Throwable t2) {
4197 Logger.defaultLogError(t2);
4199 // impl.state.barrier.dec();
4203 public void execute(ReadGraphImpl graph, int i) {
4205 procedure.execute(graph, querySupport.getResource(i));
4206 } catch (Throwable t2) {
4207 Logger.defaultLogError(t2);
4212 public void finished(ReadGraphImpl graph) {
4214 procedure.finished(graph);
4215 } catch (Throwable t2) {
4216 Logger.defaultLogError(t2);
4218 // impl.state.barrier.dec();
4222 } catch (DatabaseException e) {
4223 Logger.defaultLogError(e);
4229 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
4231 // assert(request != null);
4232 // assert(procedure != null);
4234 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
4239 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
4241 // assert(graph != null);
4242 // assert(request != null);
4244 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
4245 // if(entry != null && entry.isReady()) {
4246 // return (T)entry.get(graph, this, null);
4248 // return request.perform(graph);
4253 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
4255 // assert(graph != null);
4256 // assert(request != null);
4258 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
4259 // if(entry != null && entry.isReady()) {
4260 // if(entry.isExcepted()) {
4261 // Throwable t = (Throwable)entry.getResult();
4262 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4263 // else throw new DatabaseException(t);
4265 // return (T)entry.getResult();
4269 // final DataContainer<T> result = new DataContainer<T>();
4270 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
4272 // request.register(graph, new Listener<T>() {
4275 // public void exception(Throwable t) {
4276 // exception.set(t);
4280 // public void execute(T t) {
4285 // public boolean isDisposed() {
4291 // Throwable t = exception.get();
4293 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4294 // else throw new DatabaseException(t);
4297 // return result.get();
4304 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
4306 // assert(graph != null);
4307 // assert(request != null);
4309 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
4310 // if(entry != null && entry.isReady()) {
4311 // if(entry.isExcepted()) {
4312 // procedure.exception(graph, (Throwable)entry.getResult());
4314 // procedure.execute(graph, (T)entry.getResult());
4317 // request.perform(graph, procedure);
4323 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final SyncMultiProcedure<T> procedure, ListenerBase listener) {
4325 assert(request != null);
4326 assert(procedure != null);
4330 queryMultiRead(impl, request, parent, listener, procedure);
4332 } catch (DatabaseException e) {
4334 throw new IllegalStateException(e);
4341 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4343 assert(request != null);
4344 assert(procedure != null);
4346 // impl.state.barrier.inc();
4348 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4350 public void execute(AsyncReadGraph graph, T result) {
4353 procedure.execute(graph, result);
4354 } catch (Throwable t2) {
4355 Logger.defaultLogError(t2);
4360 public void finished(AsyncReadGraph graph) {
4363 procedure.finished(graph);
4364 } catch (Throwable t2) {
4365 Logger.defaultLogError(t2);
4368 // impl.state.barrier.dec();
4373 public String toString() {
4374 return procedure.toString();
4378 public void exception(AsyncReadGraph graph, Throwable t) {
4381 procedure.exception(graph, t);
4382 } catch (Throwable t2) {
4383 Logger.defaultLogError(t2);
4386 // impl.state.barrier.dec();
4395 // final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
4397 // assert(request != null);
4398 // assert(procedure != null);
4402 // queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4405 // public String toString() {
4406 // return procedure.toString();
4410 // public void execute(AsyncReadGraph graph, T result) {
4412 // procedure.execute(result);
4413 // } catch (Throwable t2) {
4414 // Logger.defaultLogError(t2);
4419 // public void exception(AsyncReadGraph graph, Throwable throwable) {
4421 // procedure.exception(throwable);
4422 // } catch (Throwable t2) {
4423 // Logger.defaultLogError(t2);
4429 // } catch (DatabaseException e) {
4431 // throw new IllegalStateException(e);
4438 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4440 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4445 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4447 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4452 public VirtualGraph getValueProvider(Resource subject) {
4454 return querySupport.getValueProvider(querySupport.getId(subject));
4458 public boolean resumeTasks(ReadGraphImpl graph) {
4460 return querySupport.resume(graph);
4464 public boolean isImmutable(int resourceId) {
4465 return querySupport.isImmutable(resourceId);
4468 public boolean isImmutable(Resource resource) {
4469 ResourceImpl impl = (ResourceImpl)resource;
4470 return isImmutable(impl.id);
4475 public Layer0 getL0(ReadGraph graph) {
4477 L0 = Layer0.getInstance(graph);
4482 public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
4483 protected Integer initialValue() {