1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.lang.ref.WeakReference;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.IdentityHashMap;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
32 import java.util.concurrent.Semaphore;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.locks.Condition;
36 import java.util.concurrent.locks.ReentrantLock;
38 import org.simantics.databoard.Bindings;
39 import org.simantics.db.AsyncReadGraph;
40 import org.simantics.db.DevelopmentKeys;
41 import org.simantics.db.DirectStatements;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.RelationInfo;
44 import org.simantics.db.Resource;
45 import org.simantics.db.Session;
46 import org.simantics.db.Statement;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
49 import org.simantics.db.common.utils.Logger;
50 import org.simantics.db.debug.ListenerReport;
51 import org.simantics.db.exception.DatabaseException;
52 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
53 import org.simantics.db.exception.NoInverseException;
54 import org.simantics.db.exception.ResourceNotFoundException;
55 import org.simantics.db.impl.DebugPolicy;
56 import org.simantics.db.impl.ResourceImpl;
57 import org.simantics.db.impl.graph.MultiIntProcedure;
58 import org.simantics.db.impl.graph.ReadGraphImpl;
59 import org.simantics.db.impl.graph.ReadGraphSupport;
60 import org.simantics.db.impl.graph.WriteGraphImpl;
61 import org.simantics.db.impl.procedure.IntProcedureAdapter;
62 import org.simantics.db.impl.procedure.InternalProcedure;
63 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
64 import org.simantics.db.impl.support.ResourceSupport;
65 import org.simantics.db.procedure.AsyncMultiListener;
66 import org.simantics.db.procedure.AsyncMultiProcedure;
67 import org.simantics.db.procedure.AsyncProcedure;
68 import org.simantics.db.procedure.AsyncSetListener;
69 import org.simantics.db.procedure.ListenerBase;
70 import org.simantics.db.procedure.MultiProcedure;
71 import org.simantics.db.procedure.Procedure;
72 import org.simantics.db.procedure.StatementProcedure;
73 import org.simantics.db.request.AsyncMultiRead;
74 import org.simantics.db.request.AsyncRead;
75 import org.simantics.db.request.ExternalRead;
76 import org.simantics.db.request.MultiRead;
77 import org.simantics.db.request.Read;
78 import org.simantics.db.request.RequestFlags;
79 import org.simantics.db.request.WriteTraits;
80 import org.simantics.layer0.Layer0;
81 import org.simantics.utils.DataContainer;
82 import org.simantics.utils.Development;
83 import org.simantics.utils.datastructures.Pair;
84 import org.simantics.utils.datastructures.collections.CollectionUtils;
85 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
87 import gnu.trove.procedure.TIntProcedure;
88 import gnu.trove.procedure.TLongProcedure;
89 import gnu.trove.procedure.TObjectProcedure;
90 import gnu.trove.set.hash.THashSet;
91 import gnu.trove.set.hash.TIntHashSet;
93 @SuppressWarnings({"rawtypes", "unchecked"})
94 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
96 public static int indent = 0;
101 public int boundQueries = 0;
104 final private int functionalRelation;
106 final private int superrelationOf;
108 final private int instanceOf;
110 final private int inverseOf;
112 final private int asserts;
114 final private int hasPredicate;
116 final private int hasPredicateInverse;
118 final private int hasObject;
120 final private int inherits;
122 final private int subrelationOf;
124 final private int rootLibrary;
127 * A cache for the root library resource. Initialized in
128 * {@link #getRootLibraryResource()}.
130 private volatile ResourceImpl rootLibraryResource;
132 final private int library;
134 final private int consistsOf;
136 final private int hasName;
138 AtomicInteger sleepers = new AtomicInteger(0);
140 private boolean updating = false;
143 private boolean firingListeners = false;
145 final public QueryCache cache;
146 final public QuerySupport querySupport;
147 final public Session session;
148 final public ResourceSupport resourceSupport;
150 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
152 QueryThread[] executors;
154 // public ArrayList<SessionTask>[] queues;
156 public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
160 INIT, RUN, SLEEP, DISPOSED
164 public ThreadState[] threadStates;
165 public ReentrantLock[] threadLocks;
166 public Condition[] threadConditions;
168 //public ArrayList<SessionTask>[] ownTasks;
170 //public ArrayList<SessionTask>[] ownSyncTasks;
172 //ArrayList<SessionTask>[] delayQueues;
174 final Object querySupportLock;
176 public Long modificationCounter = 0L;
178 public void close() {
181 // final public void scheduleOwn(int caller, SessionTask request) {
182 // ownTasks[caller].add(request);
185 final public void scheduleAlways(int caller, SessionTask request) {
187 // int performer = request.thread;
188 // if(caller == performer) {
189 // ownTasks[caller].add(request);
191 // schedule(caller, request);
194 schedule(caller, request);
198 final public void schedule(int caller, SessionTask request) {
200 int performer = request.thread;
202 if(DebugPolicy.SCHEDULE)
203 System.out.println("schedule " + request + " " + caller + " -> " + performer);
205 assert(performer >= 0);
207 assert(request != null);
209 // if(caller == performer) {
210 // request.run(caller);
213 // if(performer == THREADS) {
215 synchronized(querySupportLock) {
217 freeScheduling.add(request);
219 //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
221 for(int i=0;i<THREADS;i++) {
222 ReentrantLock queueLock = threadLocks[i];
224 //queues[performer].add(request);
225 //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
226 threadConditions[i].signalAll();
236 // ReentrantLock queueLock = threadLocks[performer];
238 // queues[performer].add(request);
239 // // This thread could have been sleeping
240 // if(queues[performer].size() == 1) {
241 // //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
242 // threadConditions[performer].signalAll();
244 // queueLock.unlock();
251 final public int THREAD_MASK;
253 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
255 public static abstract class SessionTask {
257 final public int thread;
258 final public int syncCaller;
259 final public Object object;
261 public SessionTask(WriteTraits object, int thread) {
262 this.thread = thread;
263 this.syncCaller = -1;
264 this.object = object;
267 public SessionTask(Object object, int thread, int syncCaller) {
268 this.thread = thread;
269 this.syncCaller = syncCaller;
270 this.object = object;
273 public abstract void run(int thread);
276 public String toString() {
277 return "SessionTask[" + object + "]";
282 public static abstract class SessionRead extends SessionTask {
284 final public Semaphore notify;
285 final public DataContainer<Throwable> throwable;
287 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
288 super(object, thread, thread);
289 this.throwable = throwable;
290 this.notify = notify;
293 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
294 super(object, thread, syncThread);
295 this.throwable = throwable;
296 this.notify = notify;
301 long waitingTime = 0;
304 static int koss2 = 0;
306 public boolean resume(ReadGraphImpl graph) {
307 return executors[0].runSynchronized();
310 //private WeakReference<GarbageTracker> garbageTracker;
312 private class GarbageTracker {
315 protected void finalize() throws Throwable {
317 // System.err.println("GarbageTracker");
319 // garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
327 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
328 throws DatabaseException {
330 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
333 THREAD_MASK = threads - 1;
336 cache = new QueryCache(core, threads);
337 session = querySupport.getSession();
338 resourceSupport = querySupport.getSupport();
339 querySupportLock = core.getLock();
341 executors = new QueryThread[THREADS];
342 // queues = new ArrayList[THREADS];
343 threadLocks = new ReentrantLock[THREADS];
344 threadConditions = new Condition[THREADS];
345 threadStates = new ThreadState[THREADS];
346 // ownTasks = new ArrayList[THREADS];
347 // ownSyncTasks = new ArrayList[THREADS];
348 // delayQueues = new ArrayList[THREADS * THREADS];
350 // freeSchedule = new AtomicInteger(0);
352 // for (int i = 0; i < THREADS * THREADS; i++) {
353 // delayQueues[i] = new ArrayList<SessionTask>();
356 for (int i = 0; i < THREADS; i++) {
358 // tasks[i] = new ArrayList<Runnable>();
359 // ownTasks[i] = new ArrayList<SessionTask>();
360 // ownSyncTasks[i] = new ArrayList<SessionTask>();
361 // queues[i] = new ArrayList<SessionTask>();
362 threadLocks[i] = new ReentrantLock();
363 threadConditions[i] = threadLocks[i].newCondition();
364 // limits[i] = false;
365 threadStates[i] = ThreadState.INIT;
369 for (int i = 0; i < THREADS; i++) {
373 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
375 threadSet.add(executors[i]);
380 for (int i = 0; i < THREADS; i++) {
381 executors[i].start();
384 // Make sure that query threads are up and running
385 while(sleepers.get() != THREADS) {
388 } catch (InterruptedException e) {
393 rootLibrary = core.getBuiltin("http:/");
394 boolean builtinsInstalled = rootLibrary != 0;
396 if (builtinsInstalled) {
397 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
398 assert (functionalRelation != 0);
400 functionalRelation = 0;
402 if (builtinsInstalled) {
403 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
404 assert (instanceOf != 0);
408 if (builtinsInstalled) {
409 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
410 assert (inverseOf != 0);
415 if (builtinsInstalled) {
416 inherits = core.getBuiltin(Layer0.URIs.Inherits);
417 assert (inherits != 0);
421 if (builtinsInstalled) {
422 asserts = core.getBuiltin(Layer0.URIs.Asserts);
423 assert (asserts != 0);
427 if (builtinsInstalled) {
428 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
429 assert (hasPredicate != 0);
433 if (builtinsInstalled) {
434 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
435 assert (hasPredicateInverse != 0);
437 hasPredicateInverse = 0;
439 if (builtinsInstalled) {
440 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
441 assert (hasObject != 0);
445 if (builtinsInstalled) {
446 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
447 assert (subrelationOf != 0);
451 if (builtinsInstalled) {
452 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
453 assert (superrelationOf != 0);
457 if (builtinsInstalled) {
458 library = core.getBuiltin(Layer0.URIs.Library);
459 assert (library != 0);
463 if (builtinsInstalled) {
464 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
465 assert (consistsOf != 0);
469 if (builtinsInstalled) {
470 hasName = core.getBuiltin(Layer0.URIs.HasName);
471 assert (hasName != 0);
477 final public void releaseWrite(ReadGraphImpl graph) {
478 performDirtyUpdates(graph);
479 modificationCounter++;
482 final public int getId(final Resource r) {
483 return querySupport.getId(r);
486 public QuerySupport getCore() {
490 public int getFunctionalRelation() {
491 return functionalRelation;
494 public int getInherits() {
498 public int getInstanceOf() {
502 public int getInverseOf() {
506 public int getSubrelationOf() {
507 return subrelationOf;
510 public int getSuperrelationOf() {
511 return superrelationOf;
514 public int getAsserts() {
518 public int getHasPredicate() {
522 public int getHasPredicateInverse() {
523 return hasPredicateInverse;
526 public int getHasObject() {
530 public int getRootLibrary() {
534 public Resource getRootLibraryResource() {
535 if (rootLibraryResource == null) {
536 // Synchronization is not needed here, it doesn't matter if multiple
537 // threads simultaneously set rootLibraryResource once.
538 int root = getRootLibrary();
540 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
541 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
543 return rootLibraryResource;
546 public int getLibrary() {
550 public int getConsistsOf() {
554 public int getHasName() {
558 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
562 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
565 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
567 if (result != null && result != 0) {
568 procedure.execute(graph, result);
572 // Fall back to using the fixed builtins.
573 // result = querySupport.getBuiltin(id);
574 // if (result != 0) {
575 // procedure.execute(graph, result);
580 // result = querySupport.getRandomAccessReference(id);
581 // } catch (ResourceNotFoundException e) {
582 // procedure.exception(graph, e);
587 procedure.execute(graph, result);
589 procedure.exception(graph, new ResourceNotFoundException(id));
595 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
596 procedure.exception(graph, t);
600 } catch (DatabaseException e) {
604 procedure.exception(graph, e);
606 } catch (DatabaseException e1) {
608 Logger.defaultLogError(e1);
616 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
618 Integer result = querySupport.getBuiltin(id);
620 procedure.execute(graph, result);
622 procedure.exception(graph, new ResourceNotFoundException(id));
627 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
630 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
631 } catch (DatabaseException e) {
632 throw new IllegalStateException(e);
637 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
641 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
642 } catch (DatabaseException e) {
643 throw new IllegalStateException(e);
648 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
649 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
653 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
655 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
659 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
661 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
665 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
667 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
671 boolean isBound(ExternalReadEntry<?> entry) {
672 if(entry.hasParents()) return true;
673 else if(hasListener(entry)) return true;
677 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
679 if (parent != null && !inferred) {
681 if(!child.isImmutable(graph))
682 child.addParent(parent);
683 } catch (DatabaseException e) {
684 Logger.defaultLogError(e);
686 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
689 if (listener != null) {
690 return registerListener(child, listener, procedure);
698 static class Dummy implements InternalProcedure<Object>, IntProcedure {
701 public void execute(ReadGraphImpl graph, int i) {
705 public void finished(ReadGraphImpl graph) {
709 public void execute(ReadGraphImpl graph, Object result) {
713 public void exception(ReadGraphImpl graph, Throwable throwable) {
718 private static final Dummy dummy = new Dummy();
721 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
723 if (DebugPolicy.PERFORM)
724 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
727 assert (!collecting);
729 assert(query.assertNotDiscarded());
731 registerDependencies(graph, query, parent, listener, procedure, false);
733 // FRESH, REFUTED, EXCEPTED go here
734 if (!query.isReady()) {
739 query.computeForEach(graph, this, (Procedure)dummy, true);
740 return query.get(graph, this, null);
746 return query.get(graph, this, procedure);
754 interface QueryCollectorSupport {
755 public CacheCollectionResult allCaches();
756 public Collection<CacheEntry> getRootList();
757 public int getCurrentSize();
758 public int calculateCurrentSize();
759 public CacheEntryBase iterate(int level);
760 public void remove();
761 public void setLevel(CacheEntryBase entry, int level);
762 public boolean start(boolean flush);
765 interface QueryCollector {
767 public void collect(int youngTarget, int allowedTimeInMs);
771 class QueryCollectorSupportImpl implements QueryCollectorSupport {
773 private static final boolean DEBUG = false;
774 private static final double ITERATION_RATIO = 0.2;
776 private CacheCollectionResult iteration = new CacheCollectionResult();
777 private boolean fresh = true;
778 private boolean needDataInStart = true;
780 QueryCollectorSupportImpl() {
784 public CacheCollectionResult allCaches() {
785 CacheCollectionResult result = new CacheCollectionResult();
786 QueryProcessor.this.allCaches(result);
791 public boolean start(boolean flush) {
792 // We need new data from query maps
794 if(needDataInStart || flush) {
795 // Last run ended after processing all queries => refresh data
796 restart(flush ? 0.0 : ITERATION_RATIO);
798 // continue with previous big data
800 // Notify caller about iteration situation
801 return iteration.isAtStart();
804 private void restart(double targetRatio) {
806 needDataInStart = true;
808 long start = System.nanoTime();
811 // We need new data from query maps
813 int iterationSize = iteration.size()+1;
814 int diff = calculateCurrentSize()-iterationSize;
816 double ratio = (double)diff / (double)iterationSize;
817 boolean dirty = Math.abs(ratio) >= targetRatio;
820 iteration = allCaches();
822 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
823 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
824 System.err.print(" " + iteration.levels[i].size());
825 System.err.println("");
832 needDataInStart = false;
834 // We are returning here within the same GC round - reuse the cache table
843 public CacheEntryBase iterate(int level) {
845 CacheEntryBase entry = iteration.next(level);
847 restart(ITERATION_RATIO);
851 while(entry != null && entry.isDiscarded()) {
852 entry = iteration.next(level);
860 public void remove() {
865 public void setLevel(CacheEntryBase entry, int level) {
866 iteration.setLevel(entry, level);
869 public Collection<CacheEntry> getRootList() {
870 return cache.getRootList();
874 public int calculateCurrentSize() {
875 return cache.calculateCurrentSize();
879 public int getCurrentSize() {
884 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
886 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
887 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
889 public int querySize() {
893 public void gc(int youngTarget, int allowedTimeInMs) {
895 collector.collect(youngTarget, allowedTimeInMs);
899 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
901 assert (entry != null);
903 if (base.isDisposed())
906 return addListener(entry, base, procedure);
910 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
911 entry.setLastKnown(result);
914 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
916 assert (entry != null);
917 assert (procedure != null);
919 ArrayList<ListenerEntry> list = cache.listeners.get(entry);
921 list = new ArrayList<ListenerEntry>(1);
922 cache.listeners.put(entry, list);
925 ListenerEntry result = new ListenerEntry(entry, base, procedure);
926 int currentIndex = list.indexOf(result);
927 // There was already a listener
928 if(currentIndex > -1) {
929 ListenerEntry current = list.get(currentIndex);
930 if(!current.base.isDisposed()) return null;
931 list.set(currentIndex, result);
936 if(DebugPolicy.LISTENER) {
937 new Exception().printStackTrace();
938 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
945 private void scheduleListener(ListenerEntry entry) {
946 assert (entry != null);
947 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
948 scheduledListeners.add(entry);
951 private void removeListener(ListenerEntry entry) {
952 assert (entry != null);
953 ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
954 if(list == null) return;
955 boolean success = list.remove(entry);
958 cache.listeners.remove(entry.entry);
961 private boolean hasListener(CacheEntry entry) {
962 if(cache.listeners.get(entry) != null) return true;
966 boolean hasListenerAfterDisposing(CacheEntry entry) {
967 if(cache.listeners.get(entry) != null) {
968 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
969 ArrayList<ListenerEntry> list = null;
970 for (ListenerEntry e : entries) {
971 if (e.base.isDisposed()) {
972 if(list == null) list = new ArrayList<ListenerEntry>();
977 for (ListenerEntry e : list) {
981 if (entries.isEmpty()) {
982 cache.listeners.remove(entry);
990 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
991 hasListenerAfterDisposing(entry);
992 if(cache.listeners.get(entry) != null)
993 return cache.listeners.get(entry);
995 return Collections.emptyList();
998 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
1000 if(!workarea.containsKey(entry)) {
1002 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
1003 for(ListenerEntry e : getListenerEntries(entry))
1006 workarea.put(entry, ls);
1008 for(CacheEntry parent : entry.getParents(this)) {
1009 processListenerReport(parent, workarea);
1010 ls.addAll(workarea.get(parent));
1017 public synchronized ListenerReport getListenerReport() throws IOException {
1019 class ListenerReportImpl implements ListenerReport {
1021 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
1024 public void print(PrintStream b) {
1025 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
1026 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
1027 for(ListenerBase l : e.getValue()) {
1028 Integer i = hist.get(l);
1029 hist.put(l, i != null ? i-1 : -1);
1033 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1034 b.print("" + -p.second + " " + p.first + "\n");
1042 ListenerReportImpl result = new ListenerReportImpl();
1044 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1045 for(CacheEntryBase entry : all) {
1046 hasListenerAfterDisposing(entry);
1048 for(CacheEntryBase entry : all) {
1049 processListenerReport(entry, result.workarea);
1056 public synchronized String reportListeners(File file) throws IOException {
1061 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1062 ListenerReport report = getListenerReport();
1065 return "Done reporting listeners.";
1069 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1071 if(entry.isDiscarded()) return;
1072 if(workarea.containsKey(entry)) return;
1074 Iterable<CacheEntry> parents = entry.getParents(this);
1075 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1076 for(CacheEntry e : parents) {
1077 if(e.isDiscarded()) continue;
1079 processParentReport(e, workarea);
1081 workarea.put(entry, ps);
1085 public synchronized String reportQueryActivity(File file) throws IOException {
1087 System.err.println("reportQueries " + file.getAbsolutePath());
1092 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1094 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
1095 Collections.reverse(entries);
1097 for(Pair<String,Integer> entry : entries) {
1098 b.println(entry.first + ": " + entry.second);
1103 Development.histogram.clear();
1109 public synchronized String reportQueries(File file) throws IOException {
1111 System.err.println("reportQueries " + file.getAbsolutePath());
1116 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1118 long start = System.nanoTime();
1120 // ArrayList<CacheEntry> all = ;
1122 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
1123 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1124 for(CacheEntryBase entry : caches) {
1125 processParentReport(entry, workarea);
1128 // for(CacheEntry e : all) System.err.println("entry: " + e);
1130 long duration = System.nanoTime() - start;
1131 System.err.println("Query root set in " + 1e-9*duration + "s.");
1133 start = System.nanoTime();
1135 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
1139 for(CacheEntry entry : workarea.keySet()) {
1140 boolean listener = hasListenerAfterDisposing(entry);
1141 boolean hasParents = entry.getParents(this).iterator().hasNext();
1144 flagMap.put(entry, 0);
1145 } else if (!hasParents) {
1147 flagMap.put(entry, 1);
1150 flagMap.put(entry, 2);
1152 // // Write leaf bit
1153 // entry.flags |= 4;
1156 boolean done = true;
1163 long start2 = System.nanoTime();
1165 int boundCounter = 0;
1166 int unboundCounter = 0;
1167 int unknownCounter = 0;
1169 for(CacheEntry<?> entry : workarea.keySet()) {
1171 //System.err.println("process " + entry);
1173 int flags = flagMap.get(entry);
1174 int bindStatus = flags & 3;
1176 if(bindStatus == 0) boundCounter++;
1177 else if(bindStatus == 1) unboundCounter++;
1178 else if(bindStatus == 2) unknownCounter++;
1180 if(bindStatus < 2) continue;
1183 for(CacheEntry parent : entry.getParents(this)) {
1185 if(parent.isDiscarded()) flagMap.put(parent, 1);
1187 int flags2 = flagMap.get(parent);
1188 int bindStatus2 = flags2 & 3;
1189 // Parent is bound => child is bound
1190 if(bindStatus2 == 0) {
1194 // Parent is unknown => child is unknown
1195 else if (bindStatus2 == 2) {
1202 flagMap.put(entry, newStatus);
1206 duration = System.nanoTime() - start2;
1207 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1208 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1210 } while(!done && loops++ < 20);
1214 for(CacheEntry entry : workarea.keySet()) {
1216 int bindStatus = flagMap.get(entry);
1217 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1223 duration = System.nanoTime() - start;
1224 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1226 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1228 for(CacheEntry entry : workarea.keySet()) {
1229 Class<?> clazz = entry.getClass();
1230 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
1231 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
1232 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
1233 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
1234 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
1235 Integer c = counts.get(clazz);
1236 if(c == null) counts.put(clazz, -1);
1237 else counts.put(clazz, c-1);
1240 b.print("// Simantics DB client query report file\n");
1241 b.print("// This file contains the following information\n");
1242 b.print("// -The amount of cached query instances per query class\n");
1243 b.print("// -The sizes of retained child sets\n");
1244 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1245 b.print("// -Followed by status, where\n");
1246 b.print("// -0=bound\n");
1247 b.print("// -1=free\n");
1248 b.print("// -2=unknown\n");
1249 b.print("// -L=has listener\n");
1250 b.print("// -List of children for each query (search for 'C <query name>')\n");
1252 b.print("----------------------------------------\n");
1254 b.print("// Queries by class\n");
1255 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1256 b.print(-p.second + " " + p.first.getName() + "\n");
1259 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1260 for(CacheEntry e : workarea.keySet())
1263 boolean changed = true;
1265 while(changed && iter++<50) {
1269 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1270 for(CacheEntry e : workarea.keySet())
1273 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1274 Integer c = hist.get(e.getKey());
1275 for(CacheEntry p : e.getValue()) {
1276 Integer i = newHist.get(p);
1277 newHist.put(p, i+c);
1280 for(CacheEntry e : workarea.keySet()) {
1281 Integer value = newHist.get(e);
1282 Integer old = hist.get(e);
1283 if(!value.equals(old)) {
1285 // System.err.println("hist " + e + ": " + old + " => " + value);
1290 System.err.println("Retained set iteration " + iter);
1294 b.print("// Queries by retained set\n");
1295 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1296 b.print("" + -p.second + " " + p.first + "\n");
1299 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1301 b.print("// Entry parent listing\n");
1302 for(CacheEntry entry : workarea.keySet()) {
1303 int status = flagMap.get(entry);
1304 boolean hasListener = hasListenerAfterDisposing(entry);
1305 b.print("Q " + entry.toString());
1307 b.print(" (L" + status + ")");
1310 b.print(" (" + status + ")");
1313 for(CacheEntry parent : workarea.get(entry)) {
1314 Collection<CacheEntry> inv = inverse.get(parent);
1316 inv = new ArrayList<CacheEntry>();
1317 inverse.put(parent, inv);
1320 b.print(" " + parent.toString());
1325 b.print("// Entry child listing\n");
1326 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1327 b.print("C " + entry.getKey().toString());
1329 for(CacheEntry child : entry.getValue()) {
1330 Integer h = hist.get(child);
1334 b.print(" <no children>");
1336 b.print(" " + child.toString());
1341 b.print("#queries: " + workarea.keySet().size() + "\n");
1342 b.print("#listeners: " + listeners + "\n");
1346 return "Dumped " + workarea.keySet().size() + " queries.";
1352 public CacheEntry caller;
1354 public CacheEntry entry;
1358 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
1359 this.caller = caller;
1361 this.indent = indent;
1366 boolean removeQuery(CacheEntry entry) {
1368 // This entry has been removed before. No need to do anything here.
1369 if(entry.isDiscarded()) return false;
1371 assert (!entry.isDiscarded());
1373 Query query = entry.getQuery();
1375 query.removeEntry(this);
1380 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1391 * @return true if this entry is being listened
1393 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1397 CacheEntry entry = e.entry;
1399 //System.err.println("updateQuery " + entry);
1402 * If the dependency graph forms a DAG, some entries are inserted in the
1403 * todo list many times. They only need to be processed once though.
1405 if (entry.isDiscarded()) {
1406 if (Development.DEVELOPMENT) {
1407 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1408 System.out.print("D");
1409 for (int i = 0; i < e.indent; i++)
1410 System.out.print(" ");
1411 System.out.println(entry.getQuery());
1414 // System.err.println(" => DISCARDED");
1418 if (entry.isRefuted()) {
1419 if (Development.DEVELOPMENT) {
1420 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1421 System.out.print("R");
1422 for (int i = 0; i < e.indent; i++)
1423 System.out.print(" ");
1424 System.out.println(entry.getQuery());
1430 if (entry.isExcepted()) {
1431 if (Development.DEVELOPMENT) {
1432 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1433 System.out.print("E");
1438 if (entry.isPending()) {
1439 if (Development.DEVELOPMENT) {
1440 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1441 System.out.print("P");
1448 if (Development.DEVELOPMENT) {
1449 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1450 System.out.print("U ");
1451 for (int i = 0; i < e.indent; i++)
1452 System.out.print(" ");
1453 System.out.print(entry.getQuery());
1457 Query query = entry.getQuery();
1458 int type = query.type();
1460 boolean hasListener = hasListener(entry);
1462 if (Development.DEVELOPMENT) {
1463 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1464 if(hasListener(entry)) {
1465 System.out.println(" (L)");
1467 System.out.println("");
1472 if(entry.isPending() || entry.isExcepted()) {
1475 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1477 immediates.put(entry, entry);
1492 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1494 immediates.put(entry, entry);
1508 // System.err.println(" => FOO " + type);
1511 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1512 if(entries != null) {
1513 for (ListenerEntry le : entries) {
1514 scheduleListener(le);
1519 // If invalid, update parents
1520 if (type == RequestFlags.INVALIDATE) {
1521 updateParents(e.indent, entry, todo);
1528 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
1530 Iterable<CacheEntry> oldParents = entry.getParents(this);
1531 for (CacheEntry parent : oldParents) {
1532 // System.err.println("updateParents " + entry + " => " + parent);
1533 if(!parent.isDiscarded())
1534 todo.push(new UpdateEntry(entry, parent, indent + 2));
1539 private boolean pruneListener(ListenerEntry entry) {
1540 if (entry.base.isDisposed()) {
1541 removeListener(entry);
1549 * @param av1 an array (guaranteed)
1550 * @param av2 any object
1551 * @return <code>true</code> if the two arrays are equal
1553 private final boolean arrayEquals(Object av1, Object av2) {
1556 Class<?> c1 = av1.getClass().getComponentType();
1557 Class<?> c2 = av2.getClass().getComponentType();
1558 if (c2 == null || !c1.equals(c2))
1560 boolean p1 = c1.isPrimitive();
1561 boolean p2 = c2.isPrimitive();
1565 return Arrays.equals((Object[]) av1, (Object[]) av2);
1566 if (boolean.class.equals(c1))
1567 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1568 else if (byte.class.equals(c1))
1569 return Arrays.equals((byte[]) av1, (byte[]) av2);
1570 else if (int.class.equals(c1))
1571 return Arrays.equals((int[]) av1, (int[]) av2);
1572 else if (long.class.equals(c1))
1573 return Arrays.equals((long[]) av1, (long[]) av2);
1574 else if (float.class.equals(c1))
1575 return Arrays.equals((float[]) av1, (float[]) av2);
1576 else if (double.class.equals(c1))
1577 return Arrays.equals((double[]) av1, (double[]) av2);
1578 throw new RuntimeException("??? Contact application querySupport.");
1583 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1587 Query query = entry.getQuery();
1589 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
1591 entry.prepareRecompute(querySupport);
1593 ReadGraphImpl parentGraph = graph.withParent(entry);
1595 query.recompute(parentGraph);
1597 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1599 Object newValue = entry.getResult();
1601 if (ListenerEntry.NO_VALUE == oldValue) {
1602 if(DebugPolicy.CHANGES) {
1603 System.out.println("C " + query);
1604 System.out.println("- " + oldValue);
1605 System.out.println("- " + newValue);
1610 boolean changed = false;
1612 if (newValue != null) {
1613 if (newValue.getClass().isArray()) {
1614 changed = !arrayEquals(newValue, oldValue);
1616 changed = !newValue.equals(oldValue);
1619 changed = (oldValue != null);
1621 if(DebugPolicy.CHANGES && changed) {
1622 System.out.println("C " + query);
1623 System.out.println("- " + oldValue);
1624 System.out.println("- " + newValue);
1627 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1629 } catch (Throwable t) {
1631 Logger.defaultLogError(t);
1633 return ListenerEntry.NO_VALUE;
1639 public boolean hasScheduledUpdates() {
1640 return !scheduledListeners.isEmpty();
1643 public void performScheduledUpdates(WriteGraphImpl graph) {
1646 assert (!cache.collecting);
1647 assert (!firingListeners);
1649 firingListeners = true;
1653 // Performing may cause further events to be scheduled.
1654 while (!scheduledListeners.isEmpty()) {
1657 // graph.state.barrier.inc();
1659 // Clone current events to make new entries possible during
1661 THashSet<ListenerEntry> entries = scheduledListeners;
1662 scheduledListeners = new THashSet<ListenerEntry>();
1664 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
1666 for (ListenerEntry listenerEntry : entries) {
1668 if (pruneListener(listenerEntry)) {
1669 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
1673 final CacheEntry entry = listenerEntry.entry;
1674 assert (entry != null);
1676 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
1678 if (newValue != ListenerEntry.NOT_CHANGED) {
1679 if(DebugPolicy.LISTENER)
1680 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
1681 schedule.add(listenerEntry);
1682 listenerEntry.setLastKnown(entry.getResult());
1687 for(ListenerEntry listenerEntry : schedule) {
1688 final CacheEntry entry = listenerEntry.entry;
1689 if(DebugPolicy.LISTENER)
1690 System.out.println("Firing " + listenerEntry.procedure);
1692 if(DebugPolicy.LISTENER)
1693 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
1694 entry.performFromCache(graph, listenerEntry.procedure);
1695 } catch (Throwable t) {
1696 t.printStackTrace();
1700 // graph.state.barrier.dec();
1701 // graph.waitAsync(null);
1702 // graph.state.barrier.assertReady();
1707 firingListeners = false;
1714 * @return true if this entry still has listeners
1716 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1718 assert (!cache.collecting);
1722 boolean hadListeners = false;
1723 boolean listenersUnknown = false;
1727 assert(entry != null);
1728 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1729 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1730 todo.add(new UpdateEntry(null, entry, 0));
1734 // Walk the tree and collect immediate updates
1735 while (!todo.isEmpty()) {
1736 UpdateEntry e = todo.pop();
1737 hadListeners |= updateQuery(e, todo, immediates);
1740 if(immediates.isEmpty()) break;
1742 // Evaluate all immediate updates and collect parents to update
1743 for(CacheEntry immediate : immediates.values()) {
1745 if(immediate.isDiscarded()) {
1749 if(immediate.isExcepted()) {
1751 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1752 if (newValue != ListenerEntry.NOT_CHANGED)
1753 updateParents(0, immediate, todo);
1757 Object oldValue = immediate.getResult();
1758 Object newValue = compareTo(graph, immediate, oldValue);
1760 if (newValue != ListenerEntry.NOT_CHANGED) {
1761 updateParents(0, immediate, todo);
1763 // If not changed, keep the old value
1764 immediate.setResult(oldValue);
1765 listenersUnknown = true;
1775 } catch (Throwable t) {
1776 Logger.defaultLogError(t);
1782 return hadListeners | listenersUnknown;
1786 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1787 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1788 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1789 // Maybe use a mutex from util.concurrent?
1790 private Object primitiveUpdateLock = new Object();
1791 private THashSet scheduledPrimitiveUpdates = new THashSet();
1793 public void performDirtyUpdates(final ReadGraphImpl graph) {
1795 cache.dirty = false;
1798 if (Development.DEVELOPMENT) {
1799 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1800 System.err.println("== Query update ==");
1804 // Special case - one statement
1805 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1807 long arg0 = scheduledObjectUpdates.getFirst();
1809 final int subject = (int)(arg0 >>> 32);
1810 final int predicate = (int)(arg0 & 0xffffffff);
1812 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1813 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1814 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1816 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1817 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1818 if(principalTypes != null) update(graph, principalTypes);
1819 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1820 if(types != null) update(graph, types);
1823 if(predicate == subrelationOf) {
1824 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1825 if(superRelations != null) update(graph, superRelations);
1828 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1829 if(dp != null) update(graph, dp);
1830 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1831 if(os != null) update(graph, os);
1833 scheduledObjectUpdates.clear();
1838 // Special case - one value
1839 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1841 int arg0 = scheduledValueUpdates.getFirst();
1843 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1844 if(valueQuery != null) update(graph, valueQuery);
1846 scheduledValueUpdates.clear();
1851 final TIntHashSet predicates = new TIntHashSet();
1852 final TIntHashSet orderedSets = new TIntHashSet();
1854 THashSet primitiveUpdates;
1855 synchronized (primitiveUpdateLock) {
1856 primitiveUpdates = scheduledPrimitiveUpdates;
1857 scheduledPrimitiveUpdates = new THashSet();
1860 primitiveUpdates.forEach(new TObjectProcedure() {
1863 public boolean execute(Object arg0) {
1865 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1866 if (query != null) {
1867 boolean listening = update(graph, query);
1868 if (!listening && !query.hasParents()) {
1869 cache.externalReadEntryMap.remove(arg0);
1878 scheduledValueUpdates.forEach(new TIntProcedure() {
1881 public boolean execute(int arg0) {
1882 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1883 if(valueQuery != null) update(graph, valueQuery);
1889 scheduledInvalidates.forEach(new TIntProcedure() {
1892 public boolean execute(int resource) {
1894 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1895 if(valueQuery != null) update(graph, valueQuery);
1897 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1898 if(principalTypes != null) update(graph, principalTypes);
1899 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1900 if(types != null) update(graph, types);
1902 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1903 if(superRelations != null) update(graph, superRelations);
1905 predicates.add(resource);
1912 scheduledObjectUpdates.forEach(new TLongProcedure() {
1915 public boolean execute(long arg0) {
1917 final int subject = (int)(arg0 >>> 32);
1918 final int predicate = (int)(arg0 & 0xffffffff);
1920 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1921 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1922 if(principalTypes != null) update(graph, principalTypes);
1923 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1924 if(types != null) update(graph, types);
1927 if(predicate == subrelationOf) {
1928 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1929 if(superRelations != null) update(graph, superRelations);
1932 predicates.add(subject);
1933 orderedSets.add(predicate);
1941 predicates.forEach(new TIntProcedure() {
1944 public boolean execute(final int subject) {
1946 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1947 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1948 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1950 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1951 if(entry != null) update(graph, entry);
1959 orderedSets.forEach(new TIntProcedure() {
1962 public boolean execute(int orderedSet) {
1964 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1965 if(entry != null) update(graph, entry);
1973 // for (Integer subject : predicates) {
1974 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
1975 // if(entry != null) update(graph, entry);
1979 if (Development.DEVELOPMENT) {
1980 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1981 System.err.println("== Query update ends ==");
1985 scheduledValueUpdates.clear();
1986 scheduledObjectUpdates.clear();
1987 scheduledInvalidates.clear();
1991 public void updateValue(final int resource) {
1992 scheduledValueUpdates.add(resource);
1996 public void updateStatements(final int resource, final int predicate) {
1997 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
2001 private int lastInvalidate = 0;
2003 public void invalidateResource(final int resource) {
2004 if(lastInvalidate == resource) return;
2005 scheduledValueUpdates.add(resource);
2006 lastInvalidate = resource;
2010 public void updatePrimitive(final ExternalRead primitive) {
2012 // External reads may be updated from arbitrary threads.
2013 // Synchronize to prevent race-conditions.
2014 synchronized (primitiveUpdateLock) {
2015 scheduledPrimitiveUpdates.add(primitive);
2017 querySupport.dirtyPrimitives();
2022 public synchronized String toString() {
2023 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
2027 protected void doDispose() {
2029 for(int index = 0; index < THREADS; index++) {
2030 executors[index].dispose();
2034 for(int i=0;i<100;i++) {
2036 boolean alive = false;
2037 for(int index = 0; index < THREADS; index++) {
2038 alive |= executors[index].isAlive();
2043 } catch (InterruptedException e) {
2044 Logger.defaultLogError(e);
2049 // Then start interrupting
2050 for(int i=0;i<100;i++) {
2052 boolean alive = false;
2053 for(int index = 0; index < THREADS; index++) {
2054 alive |= executors[index].isAlive();
2057 for(int index = 0; index < THREADS; index++) {
2058 executors[index].interrupt();
2062 // // Then just destroy
2063 // for(int index = 0; index < THREADS; index++) {
2064 // executors[index].destroy();
2067 for(int index = 0; index < THREADS; index++) {
2069 executors[index].join(5000);
2070 } catch (InterruptedException e) {
2071 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2073 executors[index] = null;
2078 public int getHits() {
2082 public int getMisses() {
2083 return cache.misses;
2086 public int getSize() {
2090 public Set<Long> getReferencedClusters() {
2091 HashSet<Long> result = new HashSet<Long>();
2092 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
2093 Objects query = (Objects) entry.getQuery();
2094 result.add(querySupport.getClusterId(query.r1()));
2096 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
2097 DirectPredicates query = (DirectPredicates) entry.getQuery();
2098 result.add(querySupport.getClusterId(query.id));
2100 for (CacheEntry entry : cache.valueQueryMap.values()) {
2101 ValueQuery query = (ValueQuery) entry.getQuery();
2102 result.add(querySupport.getClusterId(query.id));
2107 public void assertDone() {
2110 CacheCollectionResult allCaches(CacheCollectionResult result) {
2112 return cache.allCaches(result);
2116 public void printDiagnostics() {
2119 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
2120 querySupport.requestCluster(graph, clusterId, runnable);
2123 public int clean() {
2124 collector.collect(0, Integer.MAX_VALUE);
2128 public void clean(final Collection<ExternalRead<?>> requests) {
2129 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
2130 Iterator<ExternalRead<?>> iterator = requests.iterator();
2132 public CacheCollectionResult allCaches() {
2133 throw new UnsupportedOperationException();
2136 public CacheEntryBase iterate(int level) {
2137 if(iterator.hasNext()) {
2138 ExternalRead<?> request = iterator.next();
2139 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2140 if (entry != null) return entry;
2141 else return iterate(level);
2143 iterator = requests.iterator();
2148 public void remove() {
2149 throw new UnsupportedOperationException();
2152 public void setLevel(CacheEntryBase entry, int level) {
2153 throw new UnsupportedOperationException();
2156 public Collection<CacheEntry> getRootList() {
2157 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
2158 for (ExternalRead<?> request : requests) {
2159 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2166 public int getCurrentSize() {
2170 public int calculateCurrentSize() {
2171 // This tells the collector to attempt collecting everything.
2172 return Integer.MAX_VALUE;
2175 public boolean start(boolean flush) {
2179 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
2182 public void scanPending() {
2184 cache.scanPending();
2188 public ReadGraphImpl graphForVirtualRequest() {
2189 return ReadGraphImpl.createAsync(this);
2193 private HashMap<Resource, Class<?>> builtinValues;
2195 public Class<?> getBuiltinValue(Resource r) {
2196 if(builtinValues == null) initBuiltinValues();
2197 return builtinValues.get(r);
2200 Exception callerException = null;
2202 public interface AsyncBarrier {
2205 // public void inc(String debug);
2206 // public void dec(String debug);
2209 // final public QueryProcessor processor;
2210 // final public QuerySupport support;
2212 // boolean disposed = false;
2214 private void initBuiltinValues() {
2216 Layer0 b = getSession().peekService(Layer0.class);
2217 if(b == null) return;
2219 builtinValues = new HashMap<Resource, Class<?>>();
2221 builtinValues.put(b.String, String.class);
2222 builtinValues.put(b.Double, Double.class);
2223 builtinValues.put(b.Float, Float.class);
2224 builtinValues.put(b.Long, Long.class);
2225 builtinValues.put(b.Integer, Integer.class);
2226 builtinValues.put(b.Byte, Byte.class);
2227 builtinValues.put(b.Boolean, Boolean.class);
2229 builtinValues.put(b.StringArray, String[].class);
2230 builtinValues.put(b.DoubleArray, double[].class);
2231 builtinValues.put(b.FloatArray, float[].class);
2232 builtinValues.put(b.LongArray, long[].class);
2233 builtinValues.put(b.IntegerArray, int[].class);
2234 builtinValues.put(b.ByteArray, byte[].class);
2235 builtinValues.put(b.BooleanArray, boolean[].class);
2239 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
2241 // if (null == provider2) {
2242 // this.processor = null;
2246 // this.processor = provider2;
2247 // support = provider2.getCore();
2248 // initBuiltinValues();
2252 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2253 // return new ReadGraphSupportImpl(impl.processor);
2257 final public Session getSession() {
2261 final public ResourceSupport getResourceSupport() {
2262 return resourceSupport;
2266 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2268 throw new UnsupportedOperationException();
2270 // assert(subject != null);
2271 // assert(procedure != null);
2273 // final ListenerBase listener = getListenerBase(procedure);
2275 // IntProcedure ip = new IntProcedure() {
2277 // AtomicBoolean first = new AtomicBoolean(true);
2280 // public void execute(ReadGraphImpl graph, int i) {
2282 // if(first.get()) {
2283 // procedure.execute(graph, querySupport.getResource(i));
2285 // procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2287 // } catch (Throwable t2) {
2288 // Logger.defaultLogError(t2);
2293 // public void finished(ReadGraphImpl graph) {
2295 // if(first.compareAndSet(true, false)) {
2296 // procedure.finished(graph);
2297 //// impl.state.barrier.dec(this);
2299 // procedure.finished(impl.newRestart(graph));
2302 // } catch (Throwable t2) {
2303 // Logger.defaultLogError(t2);
2308 // public void exception(ReadGraphImpl graph, Throwable t) {
2310 // if(first.compareAndSet(true, false)) {
2311 // procedure.exception(graph, t);
2313 // procedure.exception(impl.newRestart(graph), t);
2315 // } catch (Throwable t2) {
2316 // Logger.defaultLogError(t2);
2322 // int sId = querySupport.getId(subject);
2325 // QueryCache.runnerPredicates(impl, sId, impl.parent, listener, ip);
2326 // } catch (DatabaseException e) {
2327 // Logger.defaultLogError(e);
2333 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2335 throw new UnsupportedOperationException();
2337 // assert(subject != null);
2338 // assert(procedure != null);
2340 // final ListenerBase listener = getListenerBase(procedure);
2343 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2346 // public void execute(ReadGraphImpl graph, int i) {
2348 // procedure.execute(querySupport.getResource(i));
2349 // } catch (Throwable t2) {
2350 // Logger.defaultLogError(t2);
2355 // public void finished(ReadGraphImpl graph) {
2357 // procedure.finished();
2358 // } catch (Throwable t2) {
2359 // Logger.defaultLogError(t2);
2361 //// impl.state.barrier.dec();
2365 // public void exception(ReadGraphImpl graph, Throwable t) {
2367 // procedure.exception(t);
2368 // } catch (Throwable t2) {
2369 // Logger.defaultLogError(t2);
2371 //// impl.state.barrier.dec();
2375 // } catch (DatabaseException e) {
2376 // Logger.defaultLogError(e);
2382 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2383 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2387 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2388 final Resource predicate, final MultiProcedure<Statement> procedure) {
2390 assert(subject != null);
2391 assert(predicate != null);
2392 assert(procedure != null);
2394 final ListenerBase listener = getListenerBase(procedure);
2396 // impl.state.barrier.inc();
2399 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2402 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2404 procedure.execute(querySupport.getStatement(s, p, o));
2405 } catch (Throwable t2) {
2406 Logger.defaultLogError(t2);
2411 public void finished(ReadGraphImpl graph) {
2413 procedure.finished();
2414 } catch (Throwable t2) {
2415 Logger.defaultLogError(t2);
2417 // impl.state.barrier.dec();
2421 public void exception(ReadGraphImpl graph, Throwable t) {
2423 procedure.exception(t);
2424 } catch (Throwable t2) {
2425 Logger.defaultLogError(t2);
2427 // impl.state.barrier.dec();
2431 } catch (DatabaseException e) {
2432 Logger.defaultLogError(e);
2438 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2439 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2441 assert(subject != null);
2442 assert(predicate != null);
2443 assert(procedure != null);
2445 final ListenerBase listener = getListenerBase(procedure);
2447 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2449 boolean first = true;
2452 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2455 procedure.execute(graph, querySupport.getStatement(s, p, o));
2457 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2459 } catch (Throwable t2) {
2460 Logger.defaultLogError(t2);
2465 public void finished(ReadGraphImpl graph) {
2470 procedure.finished(graph);
2471 // impl.state.barrier.dec(this);
2473 procedure.finished(impl.newRestart(graph));
2475 } catch (Throwable t2) {
2476 Logger.defaultLogError(t2);
2482 public void exception(ReadGraphImpl graph, Throwable t) {
2487 procedure.exception(graph, t);
2488 // impl.state.barrier.dec(this);
2490 procedure.exception(impl.newRestart(graph), t);
2492 } catch (Throwable t2) {
2493 Logger.defaultLogError(t2);
2500 int sId = querySupport.getId(subject);
2501 int pId = querySupport.getId(predicate);
2503 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2504 // else impl.state.barrier.inc(null, null);
2507 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2508 } catch (DatabaseException e) {
2509 Logger.defaultLogError(e);
2515 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2516 final Resource predicate, final StatementProcedure procedure) {
2518 assert(subject != null);
2519 assert(predicate != null);
2520 assert(procedure != null);
2522 final ListenerBase listener = getListenerBase(procedure);
2524 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2526 boolean first = true;
2529 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2532 procedure.execute(graph, s, p, o);
2534 procedure.execute(impl.newRestart(graph), s, p, o);
2536 } catch (Throwable t2) {
2537 Logger.defaultLogError(t2);
2542 public void finished(ReadGraphImpl graph) {
2547 procedure.finished(graph);
2548 // impl.state.barrier.dec(this);
2550 procedure.finished(impl.newRestart(graph));
2552 } catch (Throwable t2) {
2553 Logger.defaultLogError(t2);
2559 public void exception(ReadGraphImpl graph, Throwable t) {
2564 procedure.exception(graph, t);
2565 // impl.state.barrier.dec(this);
2567 procedure.exception(impl.newRestart(graph), t);
2569 } catch (Throwable t2) {
2570 Logger.defaultLogError(t2);
2577 int sId = querySupport.getId(subject);
2578 int pId = querySupport.getId(predicate);
2580 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2581 // else impl.state.barrier.inc(null, null);
2584 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2585 } catch (DatabaseException e) {
2586 Logger.defaultLogError(e);
2592 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2594 assert(subject != null);
2595 assert(predicate != null);
2596 assert(procedure != null);
2598 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2600 private Set<Statement> current = null;
2601 private Set<Statement> run = new HashSet<Statement>();
2604 public void execute(AsyncReadGraph graph, Statement result) {
2606 boolean found = false;
2608 if(current != null) {
2610 found = current.remove(result);
2614 if(!found) procedure.add(graph, result);
2621 public void finished(AsyncReadGraph graph) {
2623 if(current != null) {
2624 for(Statement r : current) procedure.remove(graph, r);
2629 run = new HashSet<Statement>();
2634 public void exception(AsyncReadGraph graph, Throwable t) {
2635 procedure.exception(graph, t);
2639 public boolean isDisposed() {
2640 return procedure.isDisposed();
2648 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2649 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2651 assert(subject != null);
2652 assert(predicate != null);
2653 assert(procedure != null);
2655 final ListenerBase listener = getListenerBase(procedure);
2657 // impl.state.barrier.inc();
2660 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2663 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2665 procedure.execute(graph, querySupport.getStatement(s, p, o));
2666 } catch (Throwable t2) {
2667 Logger.defaultLogError(t2);
2672 public void finished(ReadGraphImpl graph) {
2674 procedure.finished(graph);
2675 } catch (Throwable t2) {
2676 Logger.defaultLogError(t2);
2678 // impl.state.barrier.dec();
2682 public void exception(ReadGraphImpl graph, Throwable t) {
2684 procedure.exception(graph, t);
2685 } catch (Throwable t2) {
2686 Logger.defaultLogError(t2);
2688 // impl.state.barrier.dec();
2692 } catch (DatabaseException e) {
2693 Logger.defaultLogError(e);
2698 private static ListenerBase getListenerBase(Object procedure) {
2699 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2704 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2706 assert(subject != null);
2707 assert(predicate != null);
2708 assert(procedure != null);
2710 final ListenerBase listener = getListenerBase(procedure);
2712 // impl.state.barrier.inc();
2715 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2718 public void execute(ReadGraphImpl graph, int i) {
2720 procedure.execute(querySupport.getResource(i));
2721 } catch (Throwable t2) {
2722 Logger.defaultLogError(t2);
2727 public void finished(ReadGraphImpl graph) {
2729 procedure.finished();
2730 } catch (Throwable t2) {
2731 Logger.defaultLogError(t2);
2733 // impl.state.barrier.dec();
2737 public void exception(ReadGraphImpl graph, Throwable t) {
2738 System.out.println("forEachObject exception " + t);
2740 procedure.exception(t);
2741 } catch (Throwable t2) {
2742 Logger.defaultLogError(t2);
2744 // impl.state.barrier.dec();
2748 } catch (DatabaseException e) {
2749 Logger.defaultLogError(e);
2755 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2757 throw new UnsupportedOperationException();
2759 // assert(subject != null);
2760 // assert(procedure != null);
2762 // final ListenerBase listener = getListenerBase(procedure);
2764 // MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);
2766 // int sId = querySupport.getId(subject);
2769 // QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, proc);
2770 // } catch (DatabaseException e) {
2771 // Logger.defaultLogError(e);
2777 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {
2779 assert(subject != null);
2780 assert(procedure != null);
2782 final ListenerBase listener = getListenerBase(procedure);
2784 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2789 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2791 assert(subject != null);
2792 assert(procedure != null);
2794 final ListenerBase listener = getListenerBase(procedure);
2796 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2800 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2803 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2805 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2807 private Resource single = null;
2810 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2811 if(single == null) {
2814 single = INVALID_RESOURCE;
2819 public synchronized void finished(AsyncReadGraph graph) {
2820 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2821 else procedure.execute(graph, single);
2825 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2826 procedure.exception(graph, throwable);
2833 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2835 final int sId = querySupport.getId(subject);
2836 final int pId = querySupport.getId(predicate);
2839 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2840 } catch (DatabaseException e) {
2841 Logger.defaultLogError(e);
2846 static class Runner2Procedure implements IntProcedure {
2848 public int single = 0;
2849 public Throwable t = null;
2851 public void clear() {
2857 public void execute(ReadGraphImpl graph, int i) {
2858 if(single == 0) single = i;
2863 public void finished(ReadGraphImpl graph) {
2864 if(single == -1) single = 0;
2868 public void exception(ReadGraphImpl graph, Throwable throwable) {
2873 public int get() throws DatabaseException {
2875 if(t instanceof DatabaseException) throw (DatabaseException)t;
2876 else throw new DatabaseException(t);
2883 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2885 final int sId = querySupport.getId(subject);
2886 final int pId = querySupport.getId(predicate);
2888 Runner2Procedure proc = new Runner2Procedure();
2889 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2894 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2896 assert(subject != null);
2897 assert(predicate != null);
2899 final ListenerBase listener = getListenerBase(procedure);
2901 if(impl.parent != null || listener != null) {
2903 IntProcedure ip = new IntProcedure() {
2905 AtomicBoolean first = new AtomicBoolean(true);
2908 public void execute(ReadGraphImpl graph, int i) {
2911 procedure.execute(impl, querySupport.getResource(i));
2913 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2915 } catch (Throwable t2) {
2916 Logger.defaultLogError(t2);
2922 public void finished(ReadGraphImpl graph) {
2924 if(first.compareAndSet(true, false)) {
2925 procedure.finished(impl);
2926 // impl.state.barrier.dec(this);
2928 procedure.finished(impl.newRestart(graph));
2930 } catch (Throwable t2) {
2931 Logger.defaultLogError(t2);
2936 public void exception(ReadGraphImpl graph, Throwable t) {
2938 procedure.exception(graph, t);
2939 } catch (Throwable t2) {
2940 Logger.defaultLogError(t2);
2942 // impl.state.barrier.dec(this);
2946 public String toString() {
2947 return "forEachObject with " + procedure;
2952 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2953 // else impl.state.barrier.inc(null, null);
2955 forEachObject(impl, subject, predicate, listener, ip);
2959 IntProcedure ip = new IntProcedure() {
2962 public void execute(ReadGraphImpl graph, int i) {
2963 procedure.execute(graph, querySupport.getResource(i));
2967 public void finished(ReadGraphImpl graph) {
2968 procedure.finished(graph);
2972 public void exception(ReadGraphImpl graph, Throwable t) {
2973 procedure.exception(graph, t);
2977 public String toString() {
2978 return "forEachObject with " + procedure;
2983 forEachObject(impl, subject, predicate, listener, ip);
2990 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2992 assert(subject != null);
2993 assert(predicate != null);
2994 assert(procedure != null);
2996 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2998 private Set<Resource> current = null;
2999 private Set<Resource> run = new HashSet<Resource>();
3002 public void execute(AsyncReadGraph graph, Resource result) {
3004 boolean found = false;
3006 if(current != null) {
3008 found = current.remove(result);
3012 if(!found) procedure.add(graph, result);
3019 public void finished(AsyncReadGraph graph) {
3021 if(current != null) {
3022 for(Resource r : current) procedure.remove(graph, r);
3027 run = new HashSet<Resource>();
3032 public boolean isDisposed() {
3033 return procedure.isDisposed();
3037 public void exception(AsyncReadGraph graph, Throwable t) {
3038 procedure.exception(graph, t);
3042 public String toString() {
3043 return "forObjectSet " + procedure;
3051 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3053 assert(subject != null);
3054 assert(procedure != null);
3056 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
3058 private Set<Resource> current = null;
3059 private Set<Resource> run = new HashSet<Resource>();
3062 public void execute(AsyncReadGraph graph, Resource result) {
3064 boolean found = false;
3066 if(current != null) {
3068 found = current.remove(result);
3072 if(!found) procedure.add(graph, result);
3079 public void finished(AsyncReadGraph graph) {
3081 if(current != null) {
3082 for(Resource r : current) procedure.remove(graph, r);
3087 run = new HashSet<Resource>();
3092 public boolean isDisposed() {
3093 return procedure.isDisposed();
3097 public void exception(AsyncReadGraph graph, Throwable t) {
3098 procedure.exception(graph, t);
3102 public String toString() {
3103 return "forPredicateSet " + procedure;
3111 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3113 assert(subject != null);
3114 assert(procedure != null);
3116 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
3118 private Set<Resource> current = null;
3119 private Set<Resource> run = new HashSet<Resource>();
3122 public void execute(AsyncReadGraph graph, Resource result) {
3124 boolean found = false;
3126 if(current != null) {
3128 found = current.remove(result);
3132 if(!found) procedure.add(graph, result);
3139 public void finished(AsyncReadGraph graph) {
3141 if(current != null) {
3142 for(Resource r : current) procedure.remove(graph, r);
3147 run = new HashSet<Resource>();
3152 public boolean isDisposed() {
3153 return procedure.isDisposed();
3157 public void exception(AsyncReadGraph graph, Throwable t) {
3158 procedure.exception(graph, t);
3162 public String toString() {
3163 return "forPrincipalTypeSet " + procedure;
3171 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3173 assert(subject != null);
3174 assert(predicate != null);
3175 assert(procedure != null);
3177 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3179 private Set<Resource> current = null;
3180 private Set<Resource> run = new HashSet<Resource>();
3183 public void execute(AsyncReadGraph graph, Resource result) {
3185 boolean found = false;
3187 if(current != null) {
3189 found = current.remove(result);
3193 if(!found) procedure.add(graph, result);
3200 public void finished(AsyncReadGraph graph) {
3202 if(current != null) {
3203 for(Resource r : current) procedure.remove(graph, r);
3208 run = new HashSet<Resource>();
3213 public boolean isDisposed() {
3214 return procedure.isDisposed();
3218 public void exception(AsyncReadGraph graph, Throwable t) {
3219 procedure.exception(graph, t);
3223 public String toString() {
3224 return "forObjectSet " + procedure;
3232 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3234 assert(subject != null);
3235 assert(predicate != null);
3236 assert(procedure != null);
3238 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3240 private Set<Statement> current = null;
3241 private Set<Statement> run = new HashSet<Statement>();
3244 public void execute(AsyncReadGraph graph, Statement result) {
3246 boolean found = false;
3248 if(current != null) {
3250 found = current.remove(result);
3254 if(!found) procedure.add(graph, result);
3261 public void finished(AsyncReadGraph graph) {
3263 if(current != null) {
3264 for(Statement s : current) procedure.remove(graph, s);
3269 run = new HashSet<Statement>();
3274 public boolean isDisposed() {
3275 return procedure.isDisposed();
3279 public void exception(AsyncReadGraph graph, Throwable t) {
3280 procedure.exception(graph, t);
3284 public String toString() {
3285 return "forStatementSet " + procedure;
3293 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
3294 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3296 assert(subject != null);
3297 assert(predicate != null);
3298 assert(procedure != null);
3300 final ListenerBase listener = getListenerBase(procedure);
3303 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3306 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3308 procedure.execute(graph, querySupport.getResource(o));
3309 } catch (Throwable t2) {
3310 Logger.defaultLogError(t2);
3315 public void finished(ReadGraphImpl graph) {
3317 procedure.finished(graph);
3318 } catch (Throwable t2) {
3319 Logger.defaultLogError(t2);
3321 // impl.state.barrier.dec();
3325 public void exception(ReadGraphImpl graph, Throwable t) {
3327 procedure.exception(graph, t);
3328 } catch (Throwable t2) {
3329 Logger.defaultLogError(t2);
3331 // impl.state.barrier.dec();
3335 } catch (DatabaseException e) {
3336 Logger.defaultLogError(e);
3342 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3344 assert(subject != null);
3345 assert(procedure != null);
3347 final ListenerBase listener = getListenerBase(procedure);
3349 IntProcedure ip = new IntProcedure() {
3352 public void execute(ReadGraphImpl graph, int i) {
3354 procedure.execute(graph, querySupport.getResource(i));
3355 } catch (Throwable t2) {
3356 Logger.defaultLogError(t2);
3361 public void finished(ReadGraphImpl graph) {
3363 procedure.finished(graph);
3364 } catch (Throwable t2) {
3365 Logger.defaultLogError(t2);
3367 // impl.state.barrier.dec(this);
3371 public void exception(ReadGraphImpl graph, Throwable t) {
3373 procedure.exception(graph, t);
3374 } catch (Throwable t2) {
3375 Logger.defaultLogError(t2);
3377 // impl.state.barrier.dec(this);
3382 int sId = querySupport.getId(subject);
3384 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3385 // else impl.state.barrier.inc(null, null);
3388 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3389 } catch (DatabaseException e) {
3390 Logger.defaultLogError(e);
3396 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3398 assert(subject != null);
3399 assert(procedure != null);
3401 final ListenerBase listener = getListenerBase(procedure);
3403 // impl.state.barrier.inc();
3406 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3409 public void execute(ReadGraphImpl graph, int i) {
3411 procedure.execute(querySupport.getResource(i));
3412 } catch (Throwable t2) {
3413 Logger.defaultLogError(t2);
3418 public void finished(ReadGraphImpl graph) {
3420 procedure.finished();
3421 } catch (Throwable t2) {
3422 Logger.defaultLogError(t2);
3424 // impl.state.barrier.dec();
3428 public void exception(ReadGraphImpl graph, Throwable t) {
3430 procedure.exception(t);
3431 } catch (Throwable t2) {
3432 Logger.defaultLogError(t2);
3434 // impl.state.barrier.dec();
3438 } catch (DatabaseException e) {
3439 Logger.defaultLogError(e);
3443 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3445 assert(subject != null);
3446 assert(procedure != null);
3448 final ListenerBase listener = getListenerBase(procedure);
3449 assert(listener == null);
3451 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3454 public void execute(final ReadGraphImpl graph, IntSet set) {
3455 procedure.execute(graph, set);
3459 public void exception(ReadGraphImpl graph, Throwable t) {
3460 procedure.exception(graph, t);
3465 int sId = querySupport.getId(subject);
3468 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3469 } catch (DatabaseException e) {
3470 Logger.defaultLogError(e);
3476 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3478 assert(subject != null);
3480 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3485 final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {
3487 assert(subject != null);
3488 assert(procedure != null);
3490 final ListenerBase listener = getListenerBase(procedure);
3491 assert(listener == null);
3495 QueryCache.runnerRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<RelationInfo>() {
3498 public void execute(final ReadGraphImpl graph, RelationInfo set) {
3499 procedure.execute(graph, set);
3503 public void exception(ReadGraphImpl graph, Throwable t) {
3504 procedure.exception(graph, t);
3508 } catch (DatabaseException e) {
3509 Logger.defaultLogError(e);
3515 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3517 assert(subject != null);
3518 assert(procedure != null);
3520 final ListenerBase listener = getListenerBase(procedure);
3523 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3525 AtomicBoolean first = new AtomicBoolean(true);
3528 public void execute(final ReadGraphImpl graph, IntSet set) {
3529 // final HashSet<Resource> result = new HashSet<Resource>();
3530 // set.forEach(new TIntProcedure() {
3533 // public boolean execute(int type) {
3534 // result.add(querySupport.getResource(type));
3540 if(first.compareAndSet(true, false)) {
3541 procedure.execute(graph, set);
3542 // impl.state.barrier.dec();
3544 procedure.execute(impl.newRestart(graph), set);
3546 } catch (Throwable t2) {
3547 Logger.defaultLogError(t2);
3552 public void exception(ReadGraphImpl graph, Throwable t) {
3554 if(first.compareAndSet(true, false)) {
3555 procedure.exception(graph, t);
3556 // impl.state.barrier.dec();
3558 procedure.exception(impl.newRestart(graph), t);
3560 } catch (Throwable t2) {
3561 Logger.defaultLogError(t2);
3566 } catch (DatabaseException e) {
3567 Logger.defaultLogError(e);
3573 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3575 assert(subject != null);
3576 assert(procedure != null);
3578 final ListenerBase listener = getListenerBase(procedure);
3580 IntProcedure ip = new IntProcedureAdapter() {
3583 public void execute(final ReadGraphImpl graph, int superRelation) {
3585 procedure.execute(graph, querySupport.getResource(superRelation));
3586 } catch (Throwable t2) {
3587 Logger.defaultLogError(t2);
3592 public void finished(final ReadGraphImpl graph) {
3594 procedure.finished(graph);
3595 } catch (Throwable t2) {
3596 Logger.defaultLogError(t2);
3598 // impl.state.barrier.dec(this);
3603 public void exception(ReadGraphImpl graph, Throwable t) {
3605 procedure.exception(graph, t);
3606 } catch (Throwable t2) {
3607 Logger.defaultLogError(t2);
3609 // impl.state.barrier.dec(this);
3614 int sId = querySupport.getId(subject);
3616 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3617 // else impl.state.barrier.inc(null, null);
3620 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3621 } catch (DatabaseException e) {
3622 Logger.defaultLogError(e);
3625 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3630 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3632 assert(subject != null);
3633 assert(procedure != null);
3635 final ListenerBase listener = getListenerBase(procedure);
3637 // impl.state.barrier.inc();
3639 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3644 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3646 assert(subject != null);
3647 assert(procedure != null);
3649 final ListenerBase listener = getListenerBase(procedure);
3651 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3654 public void execute(final ReadGraphImpl graph, IntSet set) {
3655 // final HashSet<Resource> result = new HashSet<Resource>();
3656 // set.forEach(new TIntProcedure() {
3659 // public boolean execute(int type) {
3660 // result.add(querySupport.getResource(type));
3666 procedure.execute(graph, set);
3667 } catch (Throwable t2) {
3668 Logger.defaultLogError(t2);
3670 // impl.state.barrier.dec(this);
3674 public void exception(ReadGraphImpl graph, Throwable t) {
3676 procedure.exception(graph, t);
3677 } catch (Throwable t2) {
3678 Logger.defaultLogError(t2);
3680 // impl.state.barrier.dec(this);
3685 int sId = querySupport.getId(subject);
3687 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3688 // else impl.state.barrier.inc(null, null);
3691 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3692 } catch (DatabaseException e) {
3693 Logger.defaultLogError(e);
3698 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3699 return getValue(impl, querySupport.getId(subject));
3702 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3703 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3707 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3709 assert(subject != null);
3710 assert(procedure != null);
3712 int sId = querySupport.getId(subject);
3714 // if(procedure != null) {
3716 final ListenerBase listener = getListenerBase(procedure);
3718 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3720 AtomicBoolean first = new AtomicBoolean(true);
3723 public void execute(ReadGraphImpl graph, byte[] result) {
3725 if(first.compareAndSet(true, false)) {
3726 procedure.execute(graph, result);
3727 // impl.state.barrier.dec(this);
3729 procedure.execute(impl.newRestart(graph), result);
3731 } catch (Throwable t2) {
3732 Logger.defaultLogError(t2);
3737 public void exception(ReadGraphImpl graph, Throwable t) {
3739 if(first.compareAndSet(true, false)) {
3740 procedure.exception(graph, t);
3741 // impl.state.barrier.dec(this);
3743 procedure.exception(impl.newRestart(graph), t);
3745 } catch (Throwable t2) {
3746 Logger.defaultLogError(t2);
3752 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3753 // else impl.state.barrier.inc(null, null);
3756 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3757 } catch (DatabaseException e) {
3758 throw new IllegalStateException("Internal error");
3763 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3767 // throw new IllegalStateException("Internal error");
3772 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3774 assert(subject != null);
3775 assert(procedure != null);
3777 final ListenerBase listener = getListenerBase(procedure);
3779 if(impl.parent != null || listener != null) {
3781 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3783 AtomicBoolean first = new AtomicBoolean(true);
3786 public void execute(ReadGraphImpl graph, byte[] result) {
3788 if(first.compareAndSet(true, false)) {
3789 procedure.execute(graph, result);
3790 // impl.state.barrier.dec(this);
3792 procedure.execute(impl.newRestart(graph), result);
3794 } catch (Throwable t2) {
3795 Logger.defaultLogError(t2);
3800 public void exception(ReadGraphImpl graph, Throwable t) {
3802 if(first.compareAndSet(true, false)) {
3803 procedure.exception(graph, t);
3804 // impl.state.barrier.dec(this);
3806 procedure.exception(impl.newRestart(graph), t);
3808 } catch (Throwable t2) {
3809 Logger.defaultLogError(t2);
3815 int sId = querySupport.getId(subject);
3817 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3818 // else impl.state.barrier.inc(null, null);
3821 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3822 } catch (DatabaseException e) {
3823 Logger.defaultLogError(e);
3828 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3831 public void execute(ReadGraphImpl graph, byte[] result) {
3833 procedure.execute(graph, result);
3838 public void exception(ReadGraphImpl graph, Throwable t) {
3840 procedure.exception(graph, t);
3846 int sId = querySupport.getId(subject);
3849 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3850 } catch (DatabaseException e) {
3851 Logger.defaultLogError(e);
3859 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3861 assert(relation != null);
3862 assert(procedure != null);
3864 final ListenerBase listener = getListenerBase(procedure);
3866 IntProcedure ip = new IntProcedure() {
3868 private int result = 0;
3870 final AtomicBoolean found = new AtomicBoolean(false);
3871 final AtomicBoolean done = new AtomicBoolean(false);
3874 public void finished(ReadGraphImpl graph) {
3876 // Shall fire exactly once!
3877 if(done.compareAndSet(false, true)) {
3880 procedure.exception(graph, new NoInverseException(""));
3881 // impl.state.barrier.dec(this);
3883 procedure.execute(graph, querySupport.getResource(result));
3884 // impl.state.barrier.dec(this);
3886 } catch (Throwable t) {
3887 Logger.defaultLogError(t);
3894 public void execute(ReadGraphImpl graph, int i) {
3896 if(found.compareAndSet(false, true)) {
3899 // Shall fire exactly once!
3900 if(done.compareAndSet(false, true)) {
3902 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3903 // impl.state.barrier.dec(this);
3904 } catch (Throwable t) {
3905 Logger.defaultLogError(t);
3913 public void exception(ReadGraphImpl graph, Throwable t) {
3914 // Shall fire exactly once!
3915 if(done.compareAndSet(false, true)) {
3917 procedure.exception(graph, t);
3918 // impl.state.barrier.dec(this);
3919 } catch (Throwable t2) {
3920 Logger.defaultLogError(t2);
3927 int sId = querySupport.getId(relation);
3929 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3930 // else impl.state.barrier.inc(null, null);
3933 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3934 } catch (DatabaseException e) {
3935 Logger.defaultLogError(e);
3941 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3944 assert(procedure != null);
3946 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3949 public void execute(ReadGraphImpl graph, Integer result) {
3951 procedure.execute(graph, querySupport.getResource(result));
3952 } catch (Throwable t2) {
3953 Logger.defaultLogError(t2);
3955 // impl.state.barrier.dec(this);
3959 public void exception(ReadGraphImpl graph, Throwable t) {
3962 procedure.exception(graph, t);
3963 } catch (Throwable t2) {
3964 Logger.defaultLogError(t2);
3966 // impl.state.barrier.dec(this);
3971 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3972 // else impl.state.barrier.inc(null, null);
3974 forResource(impl, id, impl.parent, ip);
3979 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3982 assert(procedure != null);
3984 // impl.state.barrier.inc();
3987 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
3990 public void execute(ReadGraphImpl graph, Integer result) {
3992 procedure.execute(graph, querySupport.getResource(result));
3993 } catch (Throwable t2) {
3994 Logger.defaultLogError(t2);
3996 // impl.state.barrier.dec();
4000 public void exception(ReadGraphImpl graph, Throwable t) {
4002 procedure.exception(graph, t);
4003 } catch (Throwable t2) {
4004 Logger.defaultLogError(t2);
4006 // impl.state.barrier.dec();
4010 } catch (DatabaseException e) {
4011 Logger.defaultLogError(e);
4017 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4019 assert(subject != null);
4020 assert(procedure != null);
4022 final ListenerBase listener = getListenerBase(procedure);
4025 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
4026 procedure.execute(impl, !result.isEmpty());
4027 } catch (DatabaseException e) {
4028 procedure.exception(impl, e);
4034 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
4036 assert(subject != null);
4037 assert(predicate != null);
4038 assert(procedure != null);
4040 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
4042 boolean found = false;
4045 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4050 synchronized public void finished(AsyncReadGraph graph) {
4052 procedure.execute(graph, found);
4053 } catch (Throwable t2) {
4054 Logger.defaultLogError(t2);
4056 // impl.state.barrier.dec(this);
4060 public void exception(AsyncReadGraph graph, Throwable t) {
4062 procedure.exception(graph, t);
4063 } catch (Throwable t2) {
4064 Logger.defaultLogError(t2);
4066 // impl.state.barrier.dec(this);
4071 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
4072 // else impl.state.barrier.inc(null, null);
4074 forEachObject(impl, subject, predicate, ip);
4079 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
4081 assert(subject != null);
4082 assert(predicate != null);
4083 assert(procedure != null);
4085 // impl.state.barrier.inc();
4087 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
4089 boolean found = false;
4092 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4093 if(resource.equals(object)) found = true;
4097 synchronized public void finished(AsyncReadGraph graph) {
4099 procedure.execute(graph, found);
4100 } catch (Throwable t2) {
4101 Logger.defaultLogError(t2);
4103 // impl.state.barrier.dec();
4107 public void exception(AsyncReadGraph graph, Throwable t) {
4109 procedure.exception(graph, t);
4110 } catch (Throwable t2) {
4111 Logger.defaultLogError(t2);
4113 // impl.state.barrier.dec();
4121 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4123 assert(subject != null);
4124 assert(procedure != null);
4126 final ListenerBase listener = getListenerBase(procedure);
4128 // impl.state.barrier.inc();
4131 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
4134 public void execute(ReadGraphImpl graph, byte[] object) {
4135 boolean result = object != null;
4137 procedure.execute(graph, result);
4138 } catch (Throwable t2) {
4139 Logger.defaultLogError(t2);
4141 // impl.state.barrier.dec();
4145 public void exception(ReadGraphImpl graph, Throwable t) {
4147 procedure.exception(graph, t);
4148 } catch (Throwable t2) {
4149 Logger.defaultLogError(t2);
4151 // impl.state.barrier.dec();
4155 } catch (DatabaseException e) {
4156 Logger.defaultLogError(e);
4162 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4164 assert(subject != null);
4165 assert(procedure != null);
4167 final ListenerBase listener = getListenerBase(procedure);
4171 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
4174 public void exception(ReadGraphImpl graph, Throwable t) {
4176 procedure.exception(graph, t);
4177 } catch (Throwable t2) {
4178 Logger.defaultLogError(t2);
4180 // impl.state.barrier.dec();
4184 public void execute(ReadGraphImpl graph, int i) {
4186 procedure.execute(graph, querySupport.getResource(i));
4187 } catch (Throwable t2) {
4188 Logger.defaultLogError(t2);
4193 public void finished(ReadGraphImpl graph) {
4195 procedure.finished(graph);
4196 } catch (Throwable t2) {
4197 Logger.defaultLogError(t2);
4199 // impl.state.barrier.dec();
4203 } catch (DatabaseException e) {
4204 Logger.defaultLogError(e);
4210 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
4212 // assert(request != null);
4213 // assert(procedure != null);
4215 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
4220 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
4222 // assert(graph != null);
4223 // assert(request != null);
4225 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
4226 // if(entry != null && entry.isReady()) {
4227 // return (T)entry.get(graph, this, null);
4229 // return request.perform(graph);
4234 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
4236 // assert(graph != null);
4237 // assert(request != null);
4239 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
4240 // if(entry != null && entry.isReady()) {
4241 // if(entry.isExcepted()) {
4242 // Throwable t = (Throwable)entry.getResult();
4243 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4244 // else throw new DatabaseException(t);
4246 // return (T)entry.getResult();
4250 // final DataContainer<T> result = new DataContainer<T>();
4251 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
4253 // request.register(graph, new Listener<T>() {
4256 // public void exception(Throwable t) {
4257 // exception.set(t);
4261 // public void execute(T t) {
4266 // public boolean isDisposed() {
4272 // Throwable t = exception.get();
4274 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4275 // else throw new DatabaseException(t);
4278 // return result.get();
4285 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
4287 // assert(graph != null);
4288 // assert(request != null);
4290 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
4291 // if(entry != null && entry.isReady()) {
4292 // if(entry.isExcepted()) {
4293 // procedure.exception(graph, (Throwable)entry.getResult());
4295 // procedure.execute(graph, (T)entry.getResult());
4298 // request.perform(graph, procedure);
4304 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4306 assert(request != null);
4307 assert(procedure != null);
4311 queryMultiRead(impl, request, parent, listener, procedure);
4313 } catch (DatabaseException e) {
4315 throw new IllegalStateException(e);
4322 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4324 assert(request != null);
4325 assert(procedure != null);
4327 // impl.state.barrier.inc();
4329 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4331 public void execute(AsyncReadGraph graph, T result) {
4334 procedure.execute(graph, result);
4335 } catch (Throwable t2) {
4336 Logger.defaultLogError(t2);
4341 public void finished(AsyncReadGraph graph) {
4344 procedure.finished(graph);
4345 } catch (Throwable t2) {
4346 Logger.defaultLogError(t2);
4349 // impl.state.barrier.dec();
4354 public String toString() {
4355 return procedure.toString();
4359 public void exception(AsyncReadGraph graph, Throwable t) {
4362 procedure.exception(graph, t);
4363 } catch (Throwable t2) {
4364 Logger.defaultLogError(t2);
4367 // impl.state.barrier.dec();
4376 // final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
4378 // assert(request != null);
4379 // assert(procedure != null);
4383 // queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4386 // public String toString() {
4387 // return procedure.toString();
4391 // public void execute(AsyncReadGraph graph, T result) {
4393 // procedure.execute(result);
4394 // } catch (Throwable t2) {
4395 // Logger.defaultLogError(t2);
4400 // public void exception(AsyncReadGraph graph, Throwable throwable) {
4402 // procedure.exception(throwable);
4403 // } catch (Throwable t2) {
4404 // Logger.defaultLogError(t2);
4410 // } catch (DatabaseException e) {
4412 // throw new IllegalStateException(e);
4419 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4421 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4426 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4428 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4433 public VirtualGraph getValueProvider(Resource subject) {
4435 return querySupport.getValueProvider(querySupport.getId(subject));
4439 public boolean resumeTasks(ReadGraphImpl graph) {
4441 return querySupport.resume(graph);
4445 public boolean isImmutable(int resourceId) {
4446 return querySupport.isImmutable(resourceId);
4449 public boolean isImmutable(Resource resource) {
4450 ResourceImpl impl = (ResourceImpl)resource;
4451 return isImmutable(impl.id);
4456 public Layer0 getL0(ReadGraph graph) {
4458 L0 = Layer0.getInstance(graph);