1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
35 import org.simantics.databoard.Bindings;
36 import org.simantics.db.AsyncReadGraph;
37 import org.simantics.db.DevelopmentKeys;
38 import org.simantics.db.DirectStatements;
39 import org.simantics.db.ReadGraph;
40 import org.simantics.db.RelationInfo;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
46 import org.simantics.db.common.utils.Logger;
47 import org.simantics.db.debug.ListenerReport;
48 import org.simantics.db.exception.DatabaseException;
49 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
50 import org.simantics.db.exception.NoInverseException;
51 import org.simantics.db.exception.ResourceNotFoundException;
52 import org.simantics.db.impl.ResourceImpl;
53 import org.simantics.db.impl.graph.BarrierTracing;
54 import org.simantics.db.impl.graph.ReadGraphImpl;
55 import org.simantics.db.impl.graph.ReadGraphSupport;
56 import org.simantics.db.impl.graph.WriteGraphImpl;
57 import org.simantics.db.impl.procedure.IntProcedureAdapter;
58 import org.simantics.db.impl.procedure.InternalProcedure;
59 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
60 import org.simantics.db.impl.support.ResourceSupport;
61 import org.simantics.db.procedure.AsyncMultiListener;
62 import org.simantics.db.procedure.AsyncMultiProcedure;
63 import org.simantics.db.procedure.AsyncProcedure;
64 import org.simantics.db.procedure.AsyncSetListener;
65 import org.simantics.db.procedure.ListenerBase;
66 import org.simantics.db.procedure.MultiProcedure;
67 import org.simantics.db.procedure.StatementProcedure;
68 import org.simantics.db.procedure.SyncMultiProcedure;
69 import org.simantics.db.request.AsyncMultiRead;
70 import org.simantics.db.request.ExternalRead;
71 import org.simantics.db.request.MultiRead;
72 import org.simantics.db.request.RequestFlags;
73 import org.simantics.layer0.Layer0;
74 import org.simantics.utils.DataContainer;
75 import org.simantics.utils.Development;
76 import org.simantics.utils.datastructures.Pair;
77 import org.simantics.utils.datastructures.collections.CollectionUtils;
78 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
80 import gnu.trove.procedure.TIntProcedure;
81 import gnu.trove.procedure.TLongProcedure;
82 import gnu.trove.procedure.TObjectProcedure;
83 import gnu.trove.set.hash.THashSet;
84 import gnu.trove.set.hash.TIntHashSet;
86 @SuppressWarnings({"rawtypes", "unchecked"})
87 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
89 public static int indent = 0;
94 public int boundQueries = 0;
97 final private int functionalRelation;
99 final private int superrelationOf;
101 final private int instanceOf;
103 final private int inverseOf;
105 final private int asserts;
107 final private int hasPredicate;
109 final private int hasPredicateInverse;
111 final private int hasObject;
113 final private int inherits;
115 final private int subrelationOf;
117 final private int rootLibrary;
120 * A cache for the root library resource. Initialized in
121 * {@link #getRootLibraryResource()}.
123 private volatile ResourceImpl rootLibraryResource;
125 final private int library;
127 final private int consistsOf;
129 final private int hasName;
131 AtomicInteger sleepers = new AtomicInteger(0);
133 private boolean updating = false;
136 private boolean firingListeners = false;
138 final public QueryCache cache;
139 final public QuerySupport querySupport;
140 final public Session session;
141 final public ResourceSupport resourceSupport;
143 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
145 QueryThread[] executors;
147 // public ArrayList<SessionTask>[] queues;
149 public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
153 INIT, RUN, SLEEP, DISPOSED
157 public ThreadState[] threadStates;
158 // public ReentrantLock[] threadLocks;
159 // public Condition[] threadConditions;
161 //public ArrayList<SessionTask>[] ownTasks;
163 //public ArrayList<SessionTask>[] ownSyncTasks;
165 //ArrayList<SessionTask>[] delayQueues;
167 final Object querySupportLock;
169 public Long modificationCounter = 0L;
171 public void close() {
174 public SessionTask getOwnTask(ReadGraphImpl impl) {
175 Set<ReadGraphImpl> ancestors = impl.ancestorSet();
176 synchronized(querySupportLock) {
178 while(index < freeScheduling.size()) {
179 SessionTask task = freeScheduling.get(index);
180 if(task.hasCommonParent(ancestors)) {
181 return freeScheduling.remove(index);
189 public SessionTask getSubTask(ReadGraphImpl impl) {
190 Set<ReadGraphImpl> onlyThis = Collections.singleton(impl);
191 synchronized(querySupportLock) {
193 while(index < freeScheduling.size()) {
194 SessionTask task = freeScheduling.get(index);
195 if(task.hasCommonParent(onlyThis)) {
196 return freeScheduling.remove(index);
204 public boolean performPending(ReadGraphImpl graph) {
205 SessionTask task = getOwnTask(graph);
207 task.run(QueryProcessor.thread.get());
214 // final public void scheduleOwn(int caller, SessionTask request) {
215 // ownTasks[caller].add(request);
218 final public void schedule(SessionTask request) {
220 //int performer = request.thread;
222 // if(DebugPolicy.SCHEDULE)
223 // System.out.println("schedule " + request + " " + " -> " + performer);
225 //assert(performer >= 0);
227 assert(request != null);
229 // if(caller == performer) {
230 // request.run(caller);
233 // if(performer == THREADS) {
235 synchronized(querySupportLock) {
237 if(BarrierTracing.BOOKKEEPING) {
238 Exception current = new Exception();
239 Exception previous = BarrierTracing.tasks.put(request, current);
240 if(previous != null) {
241 previous.printStackTrace();
242 current.printStackTrace();
246 freeScheduling.add(request);
248 querySupportLock.notifyAll();
256 // ReentrantLock queueLock = threadLocks[performer];
258 // queues[performer].add(request);
259 // // This thread could have been sleeping
260 // if(queues[performer].size() == 1) {
261 // //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
262 // threadConditions[performer].signalAll();
264 // queueLock.unlock();
271 final public int THREAD_MASK;
273 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
275 public static abstract class SessionTask {
277 public final ReadGraphImpl graph;
278 private Set<ReadGraphImpl> ancestors;
279 private int counter = 0;
280 private Exception trace;
282 public SessionTask(ReadGraphImpl graph) {
284 if(graph != null) graph.asyncBarrier.inc();
287 public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
288 if(graph == null) return false;
289 if(ancestors == null) ancestors = graph.ancestorSet();
290 return !Collections.disjoint(ancestors, otherAncestors);
293 public abstract void run0(int thread);
295 public final void run(int thread) {
297 if(BarrierTracing.BOOKKEEPING) {
298 trace.printStackTrace();
299 new Exception().printStackTrace();
301 throw new IllegalStateException("Multiple invocations of SessionTask!");
303 if(BarrierTracing.BOOKKEEPING) {
304 trace = new Exception();
307 if(graph != null) graph.asyncBarrier.dec();
311 public String toString() {
312 return "SessionTask[" + graph.parent + "]";
317 public static abstract class SessionRead extends SessionTask {
319 final public Semaphore notify;
320 final public DataContainer<Throwable> throwable;
322 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
324 this.throwable = throwable;
325 this.notify = notify;
330 long waitingTime = 0;
333 static int koss2 = 0;
335 public boolean resume(ReadGraphImpl graph) {
336 return executors[0].runSynchronized();
339 //private WeakReference<GarbageTracker> garbageTracker;
341 private class GarbageTracker {
344 protected void finalize() throws Throwable {
346 // System.err.println("GarbageTracker");
348 // garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
356 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
357 throws DatabaseException {
359 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
362 THREAD_MASK = threads - 1;
365 cache = new QueryCache(core, threads);
366 session = querySupport.getSession();
367 resourceSupport = querySupport.getSupport();
368 querySupportLock = core.getLock();
370 executors = new QueryThread[THREADS];
371 // queues = new ArrayList[THREADS];
372 // threadLocks = new ReentrantLock[THREADS];
373 // threadConditions = new Condition[THREADS];
374 threadStates = new ThreadState[THREADS];
375 // ownTasks = new ArrayList[THREADS];
376 // ownSyncTasks = new ArrayList[THREADS];
377 // delayQueues = new ArrayList[THREADS * THREADS];
379 // freeSchedule = new AtomicInteger(0);
381 // for (int i = 0; i < THREADS * THREADS; i++) {
382 // delayQueues[i] = new ArrayList<SessionTask>();
385 for (int i = 0; i < THREADS; i++) {
387 // tasks[i] = new ArrayList<Runnable>();
388 // ownTasks[i] = new ArrayList<SessionTask>();
389 // ownSyncTasks[i] = new ArrayList<SessionTask>();
390 // queues[i] = new ArrayList<SessionTask>();
391 // threadLocks[i] = new ReentrantLock();
392 // threadConditions[i] = threadLocks[i].newCondition();
393 // limits[i] = false;
394 threadStates[i] = ThreadState.INIT;
398 for (int i = 0; i < THREADS; i++) {
402 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
404 threadSet.add(executors[i]);
409 for (int i = 0; i < THREADS; i++) {
410 executors[i].start();
413 // Make sure that query threads are up and running
414 while(sleepers.get() != THREADS) {
417 } catch (InterruptedException e) {
422 rootLibrary = core.getBuiltin("http:/");
423 boolean builtinsInstalled = rootLibrary != 0;
425 if (builtinsInstalled) {
426 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
427 assert (functionalRelation != 0);
429 functionalRelation = 0;
431 if (builtinsInstalled) {
432 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
433 assert (instanceOf != 0);
437 if (builtinsInstalled) {
438 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
439 assert (inverseOf != 0);
444 if (builtinsInstalled) {
445 inherits = core.getBuiltin(Layer0.URIs.Inherits);
446 assert (inherits != 0);
450 if (builtinsInstalled) {
451 asserts = core.getBuiltin(Layer0.URIs.Asserts);
452 assert (asserts != 0);
456 if (builtinsInstalled) {
457 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
458 assert (hasPredicate != 0);
462 if (builtinsInstalled) {
463 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
464 assert (hasPredicateInverse != 0);
466 hasPredicateInverse = 0;
468 if (builtinsInstalled) {
469 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
470 assert (hasObject != 0);
474 if (builtinsInstalled) {
475 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
476 assert (subrelationOf != 0);
480 if (builtinsInstalled) {
481 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
482 assert (superrelationOf != 0);
486 if (builtinsInstalled) {
487 library = core.getBuiltin(Layer0.URIs.Library);
488 assert (library != 0);
492 if (builtinsInstalled) {
493 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
494 assert (consistsOf != 0);
498 if (builtinsInstalled) {
499 hasName = core.getBuiltin(Layer0.URIs.HasName);
500 assert (hasName != 0);
506 final public void releaseWrite(ReadGraphImpl graph) {
507 performDirtyUpdates(graph);
508 modificationCounter++;
511 final public int getId(final Resource r) {
512 return querySupport.getId(r);
515 public QuerySupport getCore() {
519 public int getFunctionalRelation() {
520 return functionalRelation;
523 public int getInherits() {
527 public int getInstanceOf() {
531 public int getInverseOf() {
535 public int getSubrelationOf() {
536 return subrelationOf;
539 public int getSuperrelationOf() {
540 return superrelationOf;
543 public int getAsserts() {
547 public int getHasPredicate() {
551 public int getHasPredicateInverse() {
552 return hasPredicateInverse;
555 public int getHasObject() {
559 public int getRootLibrary() {
563 public Resource getRootLibraryResource() {
564 if (rootLibraryResource == null) {
565 // Synchronization is not needed here, it doesn't matter if multiple
566 // threads simultaneously set rootLibraryResource once.
567 int root = getRootLibrary();
569 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
570 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
572 return rootLibraryResource;
575 public int getLibrary() {
579 public int getConsistsOf() {
583 public int getHasName() {
587 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
591 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
594 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
596 if (result != null && result != 0) {
597 procedure.execute(graph, result);
601 // Fall back to using the fixed builtins.
602 // result = querySupport.getBuiltin(id);
603 // if (result != 0) {
604 // procedure.execute(graph, result);
609 // result = querySupport.getRandomAccessReference(id);
610 // } catch (ResourceNotFoundException e) {
611 // procedure.exception(graph, e);
616 procedure.execute(graph, result);
618 procedure.exception(graph, new ResourceNotFoundException(id));
624 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
625 procedure.exception(graph, t);
629 } catch (DatabaseException e) {
633 procedure.exception(graph, e);
635 } catch (DatabaseException e1) {
637 Logger.defaultLogError(e1);
645 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
647 Integer result = querySupport.getBuiltin(id);
649 procedure.execute(graph, result);
651 procedure.exception(graph, new ResourceNotFoundException(id));
656 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
659 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
660 } catch (DatabaseException e) {
661 throw new IllegalStateException(e);
666 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
670 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
671 } catch (DatabaseException e) {
672 throw new IllegalStateException(e);
677 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
678 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
682 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
684 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
688 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
690 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
694 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
696 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
700 boolean isBound(ExternalReadEntry<?> entry) {
701 if(entry.hasParents()) return true;
702 else if(hasListener(entry)) return true;
706 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
708 if (parent != null && !inferred) {
710 if(!child.isImmutable(graph))
711 child.addParent(parent);
712 } catch (DatabaseException e) {
713 Logger.defaultLogError(e);
715 if (Development.DEVELOPMENT) {
716 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
717 System.out.println(child + " -> " + parent);
722 if (listener != null) {
723 return registerListener(child, listener, procedure);
731 static class Dummy implements InternalProcedure<Object>, IntProcedure {
734 public void execute(ReadGraphImpl graph, int i) {
738 public void finished(ReadGraphImpl graph) {
742 public void execute(ReadGraphImpl graph, Object result) {
746 public void exception(ReadGraphImpl graph, Throwable throwable) {
751 private static final Dummy dummy = new Dummy();
754 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
756 if (DebugPolicy.PERFORM)
757 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
760 assert (!collecting);
762 assert(query.assertNotDiscarded());
764 registerDependencies(graph, query, parent, listener, procedure, false);
766 // FRESH, REFUTED, EXCEPTED go here
767 if (!query.isReady()) {
772 query.computeForEach(graph, this, (Procedure)dummy, true);
773 return query.get(graph, this, null);
779 return query.get(graph, this, procedure);
787 interface QueryCollectorSupport {
788 public CacheCollectionResult allCaches();
789 public Collection<CacheEntry> getRootList();
790 public int getCurrentSize();
791 public int calculateCurrentSize();
792 public CacheEntryBase iterate(int level);
793 public void remove();
794 public void setLevel(CacheEntryBase entry, int level);
795 public boolean start(boolean flush);
798 interface QueryCollector {
800 public void collect(int youngTarget, int allowedTimeInMs);
804 class QueryCollectorSupportImpl implements QueryCollectorSupport {
806 private static final boolean DEBUG = false;
807 private static final double ITERATION_RATIO = 0.2;
809 private CacheCollectionResult iteration = new CacheCollectionResult();
810 private boolean fresh = true;
811 private boolean needDataInStart = true;
813 QueryCollectorSupportImpl() {
817 public CacheCollectionResult allCaches() {
818 CacheCollectionResult result = new CacheCollectionResult();
819 QueryProcessor.this.allCaches(result);
824 public boolean start(boolean flush) {
825 // We need new data from query maps
827 if(needDataInStart || flush) {
828 // Last run ended after processing all queries => refresh data
829 restart(flush ? 0.0 : ITERATION_RATIO);
831 // continue with previous big data
833 // Notify caller about iteration situation
834 return iteration.isAtStart();
837 private void restart(double targetRatio) {
839 needDataInStart = true;
841 long start = System.nanoTime();
844 // We need new data from query maps
846 int iterationSize = iteration.size()+1;
847 int diff = calculateCurrentSize()-iterationSize;
849 double ratio = (double)diff / (double)iterationSize;
850 boolean dirty = Math.abs(ratio) >= targetRatio;
853 iteration = allCaches();
855 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
856 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
857 System.err.print(" " + iteration.levels[i].size());
858 System.err.println("");
865 needDataInStart = false;
867 // We are returning here within the same GC round - reuse the cache table
876 public CacheEntryBase iterate(int level) {
878 CacheEntryBase entry = iteration.next(level);
880 restart(ITERATION_RATIO);
884 while(entry != null && entry.isDiscarded()) {
885 entry = iteration.next(level);
893 public void remove() {
898 public void setLevel(CacheEntryBase entry, int level) {
899 iteration.setLevel(entry, level);
902 public Collection<CacheEntry> getRootList() {
903 return cache.getRootList();
907 public int calculateCurrentSize() {
908 return cache.calculateCurrentSize();
912 public int getCurrentSize() {
917 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
919 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
920 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
922 public int querySize() {
926 public void gc(int youngTarget, int allowedTimeInMs) {
928 collector.collect(youngTarget, allowedTimeInMs);
932 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
934 assert (entry != null);
936 if (base.isDisposed())
939 return addListener(entry, base, procedure);
943 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
944 entry.setLastKnown(result);
947 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
949 assert (entry != null);
950 assert (procedure != null);
952 ArrayList<ListenerEntry> list = cache.listeners.get(entry);
954 list = new ArrayList<ListenerEntry>(1);
955 cache.listeners.put(entry, list);
958 ListenerEntry result = new ListenerEntry(entry, base, procedure);
959 int currentIndex = list.indexOf(result);
960 // There was already a listener
961 if(currentIndex > -1) {
962 ListenerEntry current = list.get(currentIndex);
963 if(!current.base.isDisposed()) return null;
964 list.set(currentIndex, result);
969 if (Development.DEVELOPMENT) {
970 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
971 new Exception().printStackTrace();
972 System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
980 private void scheduleListener(ListenerEntry entry) {
981 assert (entry != null);
982 if (Development.DEVELOPMENT) {
983 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
984 System.err.println("Scheduled " + entry.procedure);
987 scheduledListeners.add(entry);
990 private void removeListener(ListenerEntry entry) {
991 assert (entry != null);
992 ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
993 if(list == null) return;
994 boolean success = list.remove(entry);
997 cache.listeners.remove(entry.entry);
1000 private boolean hasListener(CacheEntry entry) {
1001 if(cache.listeners.get(entry) != null) return true;
1005 boolean hasListenerAfterDisposing(CacheEntry entry) {
1006 if(cache.listeners.get(entry) != null) {
1007 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1008 ArrayList<ListenerEntry> list = null;
1009 for (ListenerEntry e : entries) {
1010 if (e.base.isDisposed()) {
1011 if(list == null) list = new ArrayList<ListenerEntry>();
1016 for (ListenerEntry e : list) {
1020 if (entries.isEmpty()) {
1021 cache.listeners.remove(entry);
1029 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
1030 hasListenerAfterDisposing(entry);
1031 if(cache.listeners.get(entry) != null)
1032 return cache.listeners.get(entry);
1034 return Collections.emptyList();
1037 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
1039 if(!workarea.containsKey(entry)) {
1041 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
1042 for(ListenerEntry e : getListenerEntries(entry))
1045 workarea.put(entry, ls);
1047 for(CacheEntry parent : entry.getParents(this)) {
1048 processListenerReport(parent, workarea);
1049 ls.addAll(workarea.get(parent));
1056 public synchronized ListenerReport getListenerReport() throws IOException {
1058 class ListenerReportImpl implements ListenerReport {
1060 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
1063 public void print(PrintStream b) {
1064 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
1065 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
1066 for(ListenerBase l : e.getValue()) {
1067 Integer i = hist.get(l);
1068 hist.put(l, i != null ? i-1 : -1);
1072 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1073 b.print("" + -p.second + " " + p.first + "\n");
1081 ListenerReportImpl result = new ListenerReportImpl();
1083 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1084 for(CacheEntryBase entry : all) {
1085 hasListenerAfterDisposing(entry);
1087 for(CacheEntryBase entry : all) {
1088 processListenerReport(entry, result.workarea);
1095 public synchronized String reportListeners(File file) throws IOException {
1100 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1101 ListenerReport report = getListenerReport();
1104 return "Done reporting listeners.";
1108 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1110 if(entry.isDiscarded()) return;
1111 if(workarea.containsKey(entry)) return;
1113 Iterable<CacheEntry> parents = entry.getParents(this);
1114 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1115 for(CacheEntry e : parents) {
1116 if(e.isDiscarded()) continue;
1118 processParentReport(e, workarea);
1120 workarea.put(entry, ps);
1124 public synchronized String reportQueryActivity(File file) throws IOException {
1126 System.err.println("reportQueries " + file.getAbsolutePath());
1131 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1133 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
1134 Collections.reverse(entries);
1136 for(Pair<String,Integer> entry : entries) {
1137 b.println(entry.first + ": " + entry.second);
1142 Development.histogram.clear();
1148 public synchronized String reportQueries(File file) throws IOException {
1150 System.err.println("reportQueries " + file.getAbsolutePath());
1155 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1157 long start = System.nanoTime();
1159 // ArrayList<CacheEntry> all = ;
1161 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
1162 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1163 for(CacheEntryBase entry : caches) {
1164 processParentReport(entry, workarea);
1167 // for(CacheEntry e : all) System.err.println("entry: " + e);
1169 long duration = System.nanoTime() - start;
1170 System.err.println("Query root set in " + 1e-9*duration + "s.");
1172 start = System.nanoTime();
1174 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
1178 for(CacheEntry entry : workarea.keySet()) {
1179 boolean listener = hasListenerAfterDisposing(entry);
1180 boolean hasParents = entry.getParents(this).iterator().hasNext();
1183 flagMap.put(entry, 0);
1184 } else if (!hasParents) {
1186 flagMap.put(entry, 1);
1189 flagMap.put(entry, 2);
1191 // // Write leaf bit
1192 // entry.flags |= 4;
1195 boolean done = true;
1202 long start2 = System.nanoTime();
1204 int boundCounter = 0;
1205 int unboundCounter = 0;
1206 int unknownCounter = 0;
1208 for(CacheEntry<?> entry : workarea.keySet()) {
1210 //System.err.println("process " + entry);
1212 int flags = flagMap.get(entry);
1213 int bindStatus = flags & 3;
1215 if(bindStatus == 0) boundCounter++;
1216 else if(bindStatus == 1) unboundCounter++;
1217 else if(bindStatus == 2) unknownCounter++;
1219 if(bindStatus < 2) continue;
1222 for(CacheEntry parent : entry.getParents(this)) {
1224 if(parent.isDiscarded()) flagMap.put(parent, 1);
1226 int flags2 = flagMap.get(parent);
1227 int bindStatus2 = flags2 & 3;
1228 // Parent is bound => child is bound
1229 if(bindStatus2 == 0) {
1233 // Parent is unknown => child is unknown
1234 else if (bindStatus2 == 2) {
1241 flagMap.put(entry, newStatus);
1245 duration = System.nanoTime() - start2;
1246 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1247 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1249 } while(!done && loops++ < 20);
1253 for(CacheEntry entry : workarea.keySet()) {
1255 int bindStatus = flagMap.get(entry);
1256 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1262 duration = System.nanoTime() - start;
1263 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1265 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1267 for(CacheEntry entry : workarea.keySet()) {
1268 Class<?> clazz = entry.getClass();
1269 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
1270 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
1271 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
1272 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
1273 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
1274 Integer c = counts.get(clazz);
1275 if(c == null) counts.put(clazz, -1);
1276 else counts.put(clazz, c-1);
1279 b.print("// Simantics DB client query report file\n");
1280 b.print("// This file contains the following information\n");
1281 b.print("// -The amount of cached query instances per query class\n");
1282 b.print("// -The sizes of retained child sets\n");
1283 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1284 b.print("// -Followed by status, where\n");
1285 b.print("// -0=bound\n");
1286 b.print("// -1=free\n");
1287 b.print("// -2=unknown\n");
1288 b.print("// -L=has listener\n");
1289 b.print("// -List of children for each query (search for 'C <query name>')\n");
1291 b.print("----------------------------------------\n");
1293 b.print("// Queries by class\n");
1294 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1295 b.print(-p.second + " " + p.first.getName() + "\n");
1298 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1299 for(CacheEntry e : workarea.keySet())
1302 boolean changed = true;
1304 while(changed && iter++<50) {
1308 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1309 for(CacheEntry e : workarea.keySet())
1312 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1313 Integer c = hist.get(e.getKey());
1314 for(CacheEntry p : e.getValue()) {
1315 Integer i = newHist.get(p);
1316 newHist.put(p, i+c);
1319 for(CacheEntry e : workarea.keySet()) {
1320 Integer value = newHist.get(e);
1321 Integer old = hist.get(e);
1322 if(!value.equals(old)) {
1324 // System.err.println("hist " + e + ": " + old + " => " + value);
1329 System.err.println("Retained set iteration " + iter);
1333 b.print("// Queries by retained set\n");
1334 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1335 b.print("" + -p.second + " " + p.first + "\n");
1338 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1340 b.print("// Entry parent listing\n");
1341 for(CacheEntry entry : workarea.keySet()) {
1342 int status = flagMap.get(entry);
1343 boolean hasListener = hasListenerAfterDisposing(entry);
1344 b.print("Q " + entry.toString());
1346 b.print(" (L" + status + ")");
1349 b.print(" (" + status + ")");
1352 for(CacheEntry parent : workarea.get(entry)) {
1353 Collection<CacheEntry> inv = inverse.get(parent);
1355 inv = new ArrayList<CacheEntry>();
1356 inverse.put(parent, inv);
1359 b.print(" " + parent.toString());
1364 b.print("// Entry child listing\n");
1365 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1366 b.print("C " + entry.getKey().toString());
1368 for(CacheEntry child : entry.getValue()) {
1369 Integer h = hist.get(child);
1373 b.print(" <no children>");
1375 b.print(" " + child.toString());
1380 b.print("#queries: " + workarea.keySet().size() + "\n");
1381 b.print("#listeners: " + listeners + "\n");
1385 return "Dumped " + workarea.keySet().size() + " queries.";
1391 public CacheEntry caller;
1393 public CacheEntry entry;
1397 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
1398 this.caller = caller;
1400 this.indent = indent;
1405 boolean removeQuery(CacheEntry entry) {
1407 // This entry has been removed before. No need to do anything here.
1408 if(entry.isDiscarded()) return false;
1410 assert (!entry.isDiscarded());
1412 Query query = entry.getQuery();
1414 query.removeEntry(this);
1419 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1430 * @return true if this entry is being listened
1432 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1436 CacheEntry entry = e.entry;
1439 * If the dependency graph forms a DAG, some entries are inserted in the
1440 * todo list many times. They only need to be processed once though.
1442 if (entry.isDiscarded()) {
1443 if (Development.DEVELOPMENT) {
1444 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1445 System.err.print("D");
1446 for (int i = 0; i < e.indent; i++)
1447 System.err.print(" ");
1448 System.err.println(entry.getQuery());
1451 // System.err.println(" => DISCARDED");
1455 if (entry.isRefuted()) {
1456 if (Development.DEVELOPMENT) {
1457 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1458 System.err.print("R");
1459 for (int i = 0; i < e.indent; i++)
1460 System.err.print(" ");
1461 System.err.println(entry.getQuery());
1467 if (entry.isExcepted()) {
1468 if (Development.DEVELOPMENT) {
1469 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1470 System.err.print("E");
1475 if (entry.isPending()) {
1476 if (Development.DEVELOPMENT) {
1477 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1478 System.err.print("P");
1485 if (Development.DEVELOPMENT) {
1486 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1487 System.err.print("U ");
1488 for (int i = 0; i < e.indent; i++)
1489 System.err.print(" ");
1490 System.err.print(entry.getQuery());
1494 Query query = entry.getQuery();
1495 int type = query.type();
1497 boolean hasListener = hasListener(entry);
1499 if (Development.DEVELOPMENT) {
1500 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1501 if(hasListener(entry)) {
1502 System.err.println(" (L)");
1504 System.err.println("");
1509 if(entry.isPending() || entry.isExcepted()) {
1512 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1514 immediates.put(entry, entry);
1529 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1531 immediates.put(entry, entry);
1545 // System.err.println(" => FOO " + type);
1548 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1549 if(entries != null) {
1550 for (ListenerEntry le : entries) {
1551 scheduleListener(le);
1556 // If invalid, update parents
1557 if (type == RequestFlags.INVALIDATE) {
1558 updateParents(e.indent, entry, todo);
1565 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
1567 Iterable<CacheEntry> oldParents = entry.getParents(this);
1568 for (CacheEntry parent : oldParents) {
1569 // System.err.println("updateParents " + entry + " => " + parent);
1570 if(!parent.isDiscarded())
1571 todo.push(new UpdateEntry(entry, parent, indent + 2));
1576 private boolean pruneListener(ListenerEntry entry) {
1577 if (entry.base.isDisposed()) {
1578 removeListener(entry);
1586 * @param av1 an array (guaranteed)
1587 * @param av2 any object
1588 * @return <code>true</code> if the two arrays are equal
1590 private final boolean arrayEquals(Object av1, Object av2) {
1593 Class<?> c1 = av1.getClass().getComponentType();
1594 Class<?> c2 = av2.getClass().getComponentType();
1595 if (c2 == null || !c1.equals(c2))
1597 boolean p1 = c1.isPrimitive();
1598 boolean p2 = c2.isPrimitive();
1602 return Arrays.equals((Object[]) av1, (Object[]) av2);
1603 if (boolean.class.equals(c1))
1604 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1605 else if (byte.class.equals(c1))
1606 return Arrays.equals((byte[]) av1, (byte[]) av2);
1607 else if (int.class.equals(c1))
1608 return Arrays.equals((int[]) av1, (int[]) av2);
1609 else if (long.class.equals(c1))
1610 return Arrays.equals((long[]) av1, (long[]) av2);
1611 else if (float.class.equals(c1))
1612 return Arrays.equals((float[]) av1, (float[]) av2);
1613 else if (double.class.equals(c1))
1614 return Arrays.equals((double[]) av1, (double[]) av2);
1615 throw new RuntimeException("??? Contact application querySupport.");
1620 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1624 Query query = entry.getQuery();
1626 if (Development.DEVELOPMENT) {
1627 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_RECOMPUTE, Bindings.BOOLEAN)) {
1628 System.err.println("R " + query);
1632 entry.prepareRecompute(querySupport);
1634 ReadGraphImpl parentGraph = graph.forRecompute(entry);
1636 query.recompute(parentGraph);
1638 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1640 Object newValue = entry.getResult();
1642 if (ListenerEntry.NO_VALUE == oldValue) {
1643 if (Development.DEVELOPMENT) {
1644 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1645 System.out.println("C " + query);
1646 System.out.println("- " + oldValue);
1647 System.out.println("- " + newValue);
1653 boolean changed = false;
1655 if (newValue != null) {
1656 if (newValue.getClass().isArray()) {
1657 changed = !arrayEquals(newValue, oldValue);
1659 changed = !newValue.equals(oldValue);
1662 changed = (oldValue != null);
1664 if (Development.DEVELOPMENT) {
1665 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
1666 System.err.println("C " + query);
1667 System.err.println("- " + oldValue);
1668 System.err.println("- " + newValue);
1672 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1674 } catch (Throwable t) {
1676 Logger.defaultLogError(t);
1678 return ListenerEntry.NO_VALUE;
1684 public boolean hasScheduledUpdates() {
1685 return !scheduledListeners.isEmpty();
1688 public void performScheduledUpdates(WriteGraphImpl graph) {
1691 assert (!cache.collecting);
1692 assert (!firingListeners);
1694 firingListeners = true;
1698 // Performing may cause further events to be scheduled.
1699 while (!scheduledListeners.isEmpty()) {
1702 // graph.state.barrier.inc();
1704 // Clone current events to make new entries possible during
1706 THashSet<ListenerEntry> entries = scheduledListeners;
1707 scheduledListeners = new THashSet<ListenerEntry>();
1709 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
1711 for (ListenerEntry listenerEntry : entries) {
1713 if (pruneListener(listenerEntry)) {
1714 if (Development.DEVELOPMENT) {
1715 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
1716 new Exception().printStackTrace();
1717 System.err.println("Pruned " + listenerEntry.procedure);
1723 final CacheEntry entry = listenerEntry.entry;
1724 assert (entry != null);
1726 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
1728 if (newValue != ListenerEntry.NOT_CHANGED) {
1729 if (Development.DEVELOPMENT) {
1730 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
1731 new Exception().printStackTrace();
1732 System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
1735 schedule.add(listenerEntry);
1736 listenerEntry.setLastKnown(entry.getResult());
1741 for(ListenerEntry listenerEntry : schedule) {
1742 final CacheEntry entry = listenerEntry.entry;
1743 if (Development.DEVELOPMENT) {
1744 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
1745 System.err.println("Firing " + listenerEntry.procedure);
1749 if (Development.DEVELOPMENT) {
1750 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) {
1751 System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
1754 entry.performFromCache(graph, listenerEntry.procedure);
1755 } catch (Throwable t) {
1756 t.printStackTrace();
1760 // graph.state.barrier.dec();
1761 // graph.waitAsync(null);
1762 // graph.state.barrier.assertReady();
1767 firingListeners = false;
1774 * @return true if this entry still has listeners
1776 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1778 assert (!cache.collecting);
1782 boolean hadListeners = false;
1783 boolean listenersUnknown = false;
1787 assert(entry != null);
1788 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1789 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1790 todo.add(new UpdateEntry(null, entry, 0));
1794 // Walk the tree and collect immediate updates
1795 while (!todo.isEmpty()) {
1796 UpdateEntry e = todo.pop();
1797 hadListeners |= updateQuery(e, todo, immediates);
1800 if(immediates.isEmpty()) break;
1802 // Evaluate all immediate updates and collect parents to update
1803 for(CacheEntry immediate : immediates.values()) {
1805 if(immediate.isDiscarded()) {
1809 if(immediate.isExcepted()) {
1811 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1812 if (newValue != ListenerEntry.NOT_CHANGED)
1813 updateParents(0, immediate, todo);
1817 Object oldValue = immediate.getResult();
1818 Object newValue = compareTo(graph, immediate, oldValue);
1820 if (newValue != ListenerEntry.NOT_CHANGED) {
1821 updateParents(0, immediate, todo);
1823 // If not changed, keep the old value
1824 immediate.setResult(oldValue);
1825 immediate.setReady();
1826 listenersUnknown = true;
1836 } catch (Throwable t) {
1837 Logger.defaultLogError(t);
1843 return hadListeners | listenersUnknown;
1847 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1848 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1849 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1850 // Maybe use a mutex from util.concurrent?
1851 private Object primitiveUpdateLock = new Object();
1852 private THashSet scheduledPrimitiveUpdates = new THashSet();
1854 public void performDirtyUpdates(final ReadGraphImpl graph) {
1856 cache.dirty = false;
1859 if (Development.DEVELOPMENT) {
1860 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1861 System.err.println("== Query update ==");
1865 // Special case - one statement
1866 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1868 long arg0 = scheduledObjectUpdates.getFirst();
1870 final int subject = (int)(arg0 >>> 32);
1871 final int predicate = (int)(arg0 & 0xffffffff);
1873 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1874 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1875 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1877 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1878 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1879 if(principalTypes != null) update(graph, principalTypes);
1880 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1881 if(types != null) update(graph, types);
1884 if(predicate == subrelationOf) {
1885 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1886 if(superRelations != null) update(graph, superRelations);
1889 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1890 if(dp != null) update(graph, dp);
1891 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1892 if(os != null) update(graph, os);
1894 scheduledObjectUpdates.clear();
1899 // Special case - one value
1900 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1902 int arg0 = scheduledValueUpdates.getFirst();
1904 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1905 if(valueQuery != null) update(graph, valueQuery);
1907 scheduledValueUpdates.clear();
1912 final TIntHashSet predicates = new TIntHashSet();
1913 final TIntHashSet orderedSets = new TIntHashSet();
1915 THashSet primitiveUpdates;
1916 synchronized (primitiveUpdateLock) {
1917 primitiveUpdates = scheduledPrimitiveUpdates;
1918 scheduledPrimitiveUpdates = new THashSet();
1921 primitiveUpdates.forEach(new TObjectProcedure() {
1924 public boolean execute(Object arg0) {
1926 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1927 if (query != null) {
1928 boolean listening = update(graph, query);
1929 if (!listening && !query.hasParents()) {
1930 cache.externalReadEntryMap.remove(arg0);
1939 scheduledValueUpdates.forEach(new TIntProcedure() {
1942 public boolean execute(int arg0) {
1943 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1944 if(valueQuery != null) update(graph, valueQuery);
1950 scheduledInvalidates.forEach(new TIntProcedure() {
1953 public boolean execute(int resource) {
1955 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1956 if(valueQuery != null) update(graph, valueQuery);
1958 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1959 if(principalTypes != null) update(graph, principalTypes);
1960 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1961 if(types != null) update(graph, types);
1963 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1964 if(superRelations != null) update(graph, superRelations);
1966 predicates.add(resource);
1973 scheduledObjectUpdates.forEach(new TLongProcedure() {
1976 public boolean execute(long arg0) {
1978 final int subject = (int)(arg0 >>> 32);
1979 final int predicate = (int)(arg0 & 0xffffffff);
1981 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1982 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1983 if(principalTypes != null) update(graph, principalTypes);
1984 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1985 if(types != null) update(graph, types);
1988 if(predicate == subrelationOf) {
1989 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1990 if(superRelations != null) update(graph, superRelations);
1993 predicates.add(subject);
1994 orderedSets.add(predicate);
2002 predicates.forEach(new TIntProcedure() {
2005 public boolean execute(final int subject) {
2007 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
2008 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
2009 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
2011 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
2012 if(entry != null) update(graph, entry);
2020 orderedSets.forEach(new TIntProcedure() {
2023 public boolean execute(int orderedSet) {
2025 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
2026 if(entry != null) update(graph, entry);
2034 // for (Integer subject : predicates) {
2035 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
2036 // if(entry != null) update(graph, entry);
2040 if (Development.DEVELOPMENT) {
2041 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2042 System.err.println("== Query update ends ==");
2046 scheduledValueUpdates.clear();
2047 scheduledObjectUpdates.clear();
2048 scheduledInvalidates.clear();
2052 public void updateValue(final int resource) {
2053 scheduledValueUpdates.add(resource);
2057 public void updateStatements(final int resource, final int predicate) {
2058 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
2062 private int lastInvalidate = 0;
2064 public void invalidateResource(final int resource) {
2065 if(lastInvalidate == resource) return;
2066 scheduledValueUpdates.add(resource);
2067 lastInvalidate = resource;
2071 public void updatePrimitive(final ExternalRead primitive) {
2073 // External reads may be updated from arbitrary threads.
2074 // Synchronize to prevent race-conditions.
2075 synchronized (primitiveUpdateLock) {
2076 scheduledPrimitiveUpdates.add(primitive);
2078 querySupport.dirtyPrimitives();
2083 public synchronized String toString() {
2084 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
2088 protected void doDispose() {
2090 for(int index = 0; index < THREADS; index++) {
2091 executors[index].dispose();
2095 for(int i=0;i<100;i++) {
2097 boolean alive = false;
2098 for(int index = 0; index < THREADS; index++) {
2099 alive |= executors[index].isAlive();
2104 } catch (InterruptedException e) {
2105 Logger.defaultLogError(e);
2110 // Then start interrupting
2111 for(int i=0;i<100;i++) {
2113 boolean alive = false;
2114 for(int index = 0; index < THREADS; index++) {
2115 alive |= executors[index].isAlive();
2118 for(int index = 0; index < THREADS; index++) {
2119 executors[index].interrupt();
2123 // // Then just destroy
2124 // for(int index = 0; index < THREADS; index++) {
2125 // executors[index].destroy();
2128 for(int index = 0; index < THREADS; index++) {
2130 executors[index].join(5000);
2131 } catch (InterruptedException e) {
2132 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2134 executors[index] = null;
2139 public int getHits() {
2143 public int getMisses() {
2144 return cache.misses;
2147 public int getSize() {
2151 public Set<Long> getReferencedClusters() {
2152 HashSet<Long> result = new HashSet<Long>();
2153 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
2154 Objects query = (Objects) entry.getQuery();
2155 result.add(querySupport.getClusterId(query.r1()));
2157 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
2158 DirectPredicates query = (DirectPredicates) entry.getQuery();
2159 result.add(querySupport.getClusterId(query.id));
2161 for (CacheEntry entry : cache.valueQueryMap.values()) {
2162 ValueQuery query = (ValueQuery) entry.getQuery();
2163 result.add(querySupport.getClusterId(query.id));
2168 public void assertDone() {
2171 CacheCollectionResult allCaches(CacheCollectionResult result) {
2173 return cache.allCaches(result);
2177 public void printDiagnostics() {
2180 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
2181 querySupport.requestCluster(graph, clusterId, runnable);
2184 public int clean() {
2185 collector.collect(0, Integer.MAX_VALUE);
2189 public void clean(final Collection<ExternalRead<?>> requests) {
2190 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
2191 Iterator<ExternalRead<?>> iterator = requests.iterator();
2193 public CacheCollectionResult allCaches() {
2194 throw new UnsupportedOperationException();
2197 public CacheEntryBase iterate(int level) {
2198 if(iterator.hasNext()) {
2199 ExternalRead<?> request = iterator.next();
2200 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2201 if (entry != null) return entry;
2202 else return iterate(level);
2204 iterator = requests.iterator();
2209 public void remove() {
2210 throw new UnsupportedOperationException();
2213 public void setLevel(CacheEntryBase entry, int level) {
2214 throw new UnsupportedOperationException();
2217 public Collection<CacheEntry> getRootList() {
2218 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
2219 for (ExternalRead<?> request : requests) {
2220 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2227 public int getCurrentSize() {
2231 public int calculateCurrentSize() {
2232 // This tells the collector to attempt collecting everything.
2233 return Integer.MAX_VALUE;
2236 public boolean start(boolean flush) {
2240 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
2243 public void scanPending() {
2245 cache.scanPending();
2249 public ReadGraphImpl graphForVirtualRequest() {
2250 return ReadGraphImpl.createAsync(this);
2254 private HashMap<Resource, Class<?>> builtinValues;
2256 public Class<?> getBuiltinValue(Resource r) {
2257 if(builtinValues == null) initBuiltinValues();
2258 return builtinValues.get(r);
2261 Exception callerException = null;
2263 public interface AsyncBarrier {
2266 // public void inc(String debug);
2267 // public void dec(String debug);
2270 // final public QueryProcessor processor;
2271 // final public QuerySupport support;
2273 // boolean disposed = false;
2275 private void initBuiltinValues() {
2277 Layer0 b = getSession().peekService(Layer0.class);
2278 if(b == null) return;
2280 builtinValues = new HashMap<Resource, Class<?>>();
2282 builtinValues.put(b.String, String.class);
2283 builtinValues.put(b.Double, Double.class);
2284 builtinValues.put(b.Float, Float.class);
2285 builtinValues.put(b.Long, Long.class);
2286 builtinValues.put(b.Integer, Integer.class);
2287 builtinValues.put(b.Byte, Byte.class);
2288 builtinValues.put(b.Boolean, Boolean.class);
2290 builtinValues.put(b.StringArray, String[].class);
2291 builtinValues.put(b.DoubleArray, double[].class);
2292 builtinValues.put(b.FloatArray, float[].class);
2293 builtinValues.put(b.LongArray, long[].class);
2294 builtinValues.put(b.IntegerArray, int[].class);
2295 builtinValues.put(b.ByteArray, byte[].class);
2296 builtinValues.put(b.BooleanArray, boolean[].class);
2300 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
2302 // if (null == provider2) {
2303 // this.processor = null;
2307 // this.processor = provider2;
2308 // support = provider2.getCore();
2309 // initBuiltinValues();
2313 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2314 // return new ReadGraphSupportImpl(impl.processor);
2318 final public Session getSession() {
2322 final public ResourceSupport getResourceSupport() {
2323 return resourceSupport;
2327 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2331 for(Resource predicate : getPredicates(impl, subject))
2332 procedure.execute(impl, predicate);
2334 procedure.finished(impl);
2336 } catch (Throwable e) {
2337 procedure.exception(impl, e);
2343 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2345 throw new UnsupportedOperationException();
2347 // assert(subject != null);
2348 // assert(procedure != null);
2350 // final ListenerBase listener = getListenerBase(procedure);
2353 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2356 // public void execute(ReadGraphImpl graph, int i) {
2358 // procedure.execute(querySupport.getResource(i));
2359 // } catch (Throwable t2) {
2360 // Logger.defaultLogError(t2);
2365 // public void finished(ReadGraphImpl graph) {
2367 // procedure.finished();
2368 // } catch (Throwable t2) {
2369 // Logger.defaultLogError(t2);
2371 //// impl.state.barrier.dec();
2375 // public void exception(ReadGraphImpl graph, Throwable t) {
2377 // procedure.exception(t);
2378 // } catch (Throwable t2) {
2379 // Logger.defaultLogError(t2);
2381 //// impl.state.barrier.dec();
2385 // } catch (DatabaseException e) {
2386 // Logger.defaultLogError(e);
2392 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2393 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2397 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2398 final Resource predicate, final MultiProcedure<Statement> procedure) {
2400 assert(subject != null);
2401 assert(predicate != null);
2402 assert(procedure != null);
2404 final ListenerBase listener = getListenerBase(procedure);
2406 // impl.state.barrier.inc();
2409 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2412 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2414 procedure.execute(querySupport.getStatement(s, p, o));
2415 } catch (Throwable t2) {
2416 Logger.defaultLogError(t2);
2421 public void finished(ReadGraphImpl graph) {
2423 procedure.finished();
2424 } catch (Throwable t2) {
2425 Logger.defaultLogError(t2);
2427 // impl.state.barrier.dec();
2431 public void exception(ReadGraphImpl graph, Throwable t) {
2433 procedure.exception(t);
2434 } catch (Throwable t2) {
2435 Logger.defaultLogError(t2);
2437 // impl.state.barrier.dec();
2441 } catch (DatabaseException e) {
2442 Logger.defaultLogError(e);
2448 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2449 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2451 assert(subject != null);
2452 assert(predicate != null);
2453 assert(procedure != null);
2455 final ListenerBase listener = getListenerBase(procedure);
2457 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2459 boolean first = true;
2462 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2465 procedure.execute(graph, querySupport.getStatement(s, p, o));
2467 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2469 } catch (Throwable t2) {
2470 Logger.defaultLogError(t2);
2475 public void finished(ReadGraphImpl graph) {
2480 procedure.finished(graph);
2481 // impl.state.barrier.dec(this);
2483 procedure.finished(impl.newRestart(graph));
2485 } catch (Throwable t2) {
2486 Logger.defaultLogError(t2);
2492 public void exception(ReadGraphImpl graph, Throwable t) {
2497 procedure.exception(graph, t);
2498 // impl.state.barrier.dec(this);
2500 procedure.exception(impl.newRestart(graph), t);
2502 } catch (Throwable t2) {
2503 Logger.defaultLogError(t2);
2510 int sId = querySupport.getId(subject);
2511 int pId = querySupport.getId(predicate);
2513 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2514 // else impl.state.barrier.inc(null, null);
2517 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2518 } catch (DatabaseException e) {
2519 Logger.defaultLogError(e);
2525 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2526 final Resource predicate, final StatementProcedure procedure) {
2528 assert(subject != null);
2529 assert(predicate != null);
2530 assert(procedure != null);
2532 final ListenerBase listener = getListenerBase(procedure);
2534 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2536 boolean first = true;
2539 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2542 procedure.execute(graph, s, p, o);
2544 procedure.execute(impl.newRestart(graph), s, p, o);
2546 } catch (Throwable t2) {
2547 Logger.defaultLogError(t2);
2552 public void finished(ReadGraphImpl graph) {
2557 procedure.finished(graph);
2558 // impl.state.barrier.dec(this);
2560 procedure.finished(impl.newRestart(graph));
2562 } catch (Throwable t2) {
2563 Logger.defaultLogError(t2);
2569 public void exception(ReadGraphImpl graph, Throwable t) {
2574 procedure.exception(graph, t);
2575 // impl.state.barrier.dec(this);
2577 procedure.exception(impl.newRestart(graph), t);
2579 } catch (Throwable t2) {
2580 Logger.defaultLogError(t2);
2587 int sId = querySupport.getId(subject);
2588 int pId = querySupport.getId(predicate);
2590 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2591 // else impl.state.barrier.inc(null, null);
2594 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2595 } catch (DatabaseException e) {
2596 Logger.defaultLogError(e);
2602 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2604 assert(subject != null);
2605 assert(predicate != null);
2606 assert(procedure != null);
2608 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2610 private Set<Statement> current = null;
2611 private Set<Statement> run = new HashSet<Statement>();
2614 public void execute(AsyncReadGraph graph, Statement result) {
2616 boolean found = false;
2618 if(current != null) {
2620 found = current.remove(result);
2624 if(!found) procedure.add(graph, result);
2631 public void finished(AsyncReadGraph graph) {
2633 if(current != null) {
2634 for(Statement r : current) procedure.remove(graph, r);
2639 run = new HashSet<Statement>();
2644 public void exception(AsyncReadGraph graph, Throwable t) {
2645 procedure.exception(graph, t);
2649 public boolean isDisposed() {
2650 return procedure.isDisposed();
2658 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2659 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2661 assert(subject != null);
2662 assert(predicate != null);
2663 assert(procedure != null);
2665 final ListenerBase listener = getListenerBase(procedure);
2667 // impl.state.barrier.inc();
2670 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2673 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2675 procedure.execute(graph, querySupport.getStatement(s, p, o));
2676 } catch (Throwable t2) {
2677 Logger.defaultLogError(t2);
2682 public void finished(ReadGraphImpl graph) {
2684 procedure.finished(graph);
2685 } catch (Throwable t2) {
2686 Logger.defaultLogError(t2);
2688 // impl.state.barrier.dec();
2692 public void exception(ReadGraphImpl graph, Throwable t) {
2694 procedure.exception(graph, t);
2695 } catch (Throwable t2) {
2696 Logger.defaultLogError(t2);
2698 // impl.state.barrier.dec();
2702 } catch (DatabaseException e) {
2703 Logger.defaultLogError(e);
2708 private static ListenerBase getListenerBase(Object procedure) {
2709 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2714 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2716 assert(subject != null);
2717 assert(predicate != null);
2718 assert(procedure != null);
2720 final ListenerBase listener = getListenerBase(procedure);
2722 // impl.state.barrier.inc();
2725 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2728 public void execute(ReadGraphImpl graph, int i) {
2730 procedure.execute(querySupport.getResource(i));
2731 } catch (Throwable t2) {
2732 Logger.defaultLogError(t2);
2737 public void finished(ReadGraphImpl graph) {
2739 procedure.finished();
2740 } catch (Throwable t2) {
2741 Logger.defaultLogError(t2);
2743 // impl.state.barrier.dec();
2747 public void exception(ReadGraphImpl graph, Throwable t) {
2748 System.out.println("forEachObject exception " + t);
2750 procedure.exception(t);
2751 } catch (Throwable t2) {
2752 Logger.defaultLogError(t2);
2754 // impl.state.barrier.dec();
2758 } catch (DatabaseException e) {
2759 Logger.defaultLogError(e);
2765 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
2767 assert(subject != null);
2768 assert(procedure != null);
2770 final ListenerBase listener = getListenerBase(procedure);
2772 int sId = querySupport.getId(subject);
2775 QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
2778 public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
2779 procedure.execute(graph, result);
2783 public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
2784 procedure.exception(graph, throwable);
2788 } catch (DatabaseException e) {
2789 Logger.defaultLogError(e);
2794 final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
2796 // assert(subject != null);
2797 // assert(procedure != null);
2799 // final ListenerBase listener = getListenerBase(procedure);
2801 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2803 return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
2808 // final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2810 // assert(subject != null);
2811 // assert(procedure != null);
2813 // final ListenerBase listener = getListenerBase(procedure);
2815 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2819 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2822 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2824 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2826 private Resource single = null;
2829 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2830 if(single == null) {
2833 single = INVALID_RESOURCE;
2838 public synchronized void finished(AsyncReadGraph graph) {
2839 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2840 else procedure.execute(graph, single);
2844 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2845 procedure.exception(graph, throwable);
2852 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2854 final int sId = querySupport.getId(subject);
2855 final int pId = querySupport.getId(predicate);
2858 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2859 } catch (DatabaseException e) {
2860 Logger.defaultLogError(e);
2865 static class Runner2Procedure implements IntProcedure {
2867 public int single = 0;
2868 public Throwable t = null;
2870 public void clear() {
2876 public void execute(ReadGraphImpl graph, int i) {
2877 if(single == 0) single = i;
2882 public void finished(ReadGraphImpl graph) {
2883 if(single == -1) single = 0;
2887 public void exception(ReadGraphImpl graph, Throwable throwable) {
2892 public int get() throws DatabaseException {
2894 if(t instanceof DatabaseException) throw (DatabaseException)t;
2895 else throw new DatabaseException(t);
2902 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2904 final int sId = querySupport.getId(subject);
2905 final int pId = querySupport.getId(predicate);
2907 Runner2Procedure proc = new Runner2Procedure();
2908 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2913 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2915 assert(subject != null);
2916 assert(predicate != null);
2918 final ListenerBase listener = getListenerBase(procedure);
2920 if(impl.parent != null || listener != null) {
2922 IntProcedure ip = new IntProcedure() {
2924 AtomicBoolean first = new AtomicBoolean(true);
2927 public void execute(ReadGraphImpl graph, int i) {
2930 procedure.execute(impl, querySupport.getResource(i));
2932 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2934 } catch (Throwable t2) {
2935 Logger.defaultLogError(t2);
2941 public void finished(ReadGraphImpl graph) {
2943 if(first.compareAndSet(true, false)) {
2944 procedure.finished(impl);
2945 // impl.state.barrier.dec(this);
2947 procedure.finished(impl.newRestart(graph));
2949 } catch (Throwable t2) {
2950 Logger.defaultLogError(t2);
2955 public void exception(ReadGraphImpl graph, Throwable t) {
2957 procedure.exception(graph, t);
2958 } catch (Throwable t2) {
2959 Logger.defaultLogError(t2);
2961 // impl.state.barrier.dec(this);
2965 public String toString() {
2966 return "forEachObject with " + procedure;
2971 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2972 // else impl.state.barrier.inc(null, null);
2974 forEachObject(impl, subject, predicate, listener, ip);
2978 IntProcedure ip = new IntProcedure() {
2981 public void execute(ReadGraphImpl graph, int i) {
2982 procedure.execute(graph, querySupport.getResource(i));
2986 public void finished(ReadGraphImpl graph) {
2987 procedure.finished(graph);
2991 public void exception(ReadGraphImpl graph, Throwable t) {
2992 procedure.exception(graph, t);
2996 public String toString() {
2997 return "forEachObject with " + procedure;
3002 forEachObject(impl, subject, predicate, listener, ip);
3009 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3011 assert(subject != null);
3012 assert(predicate != null);
3013 assert(procedure != null);
3015 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3017 private Set<Resource> current = null;
3018 private Set<Resource> run = new HashSet<Resource>();
3021 public void execute(AsyncReadGraph graph, Resource result) {
3023 boolean found = false;
3025 if(current != null) {
3027 found = current.remove(result);
3031 if(!found) procedure.add(graph, result);
3038 public void finished(AsyncReadGraph graph) {
3040 if(current != null) {
3041 for(Resource r : current) procedure.remove(graph, r);
3046 run = new HashSet<Resource>();
3051 public boolean isDisposed() {
3052 return procedure.isDisposed();
3056 public void exception(AsyncReadGraph graph, Throwable t) {
3057 procedure.exception(graph, t);
3061 public String toString() {
3062 return "forObjectSet " + procedure;
3070 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3072 assert(subject != null);
3073 assert(procedure != null);
3075 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
3077 private Set<Resource> current = null;
3078 private Set<Resource> run = new HashSet<Resource>();
3081 public void execute(AsyncReadGraph graph, Resource result) {
3083 boolean found = false;
3085 if(current != null) {
3087 found = current.remove(result);
3091 if(!found) procedure.add(graph, result);
3098 public void finished(AsyncReadGraph graph) {
3100 if(current != null) {
3101 for(Resource r : current) procedure.remove(graph, r);
3106 run = new HashSet<Resource>();
3111 public boolean isDisposed() {
3112 return procedure.isDisposed();
3116 public void exception(AsyncReadGraph graph, Throwable t) {
3117 procedure.exception(graph, t);
3121 public String toString() {
3122 return "forPredicateSet " + procedure;
3130 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3132 assert(subject != null);
3133 assert(procedure != null);
3135 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
3137 private Set<Resource> current = null;
3138 private Set<Resource> run = new HashSet<Resource>();
3141 public void execute(AsyncReadGraph graph, Resource result) {
3143 boolean found = false;
3145 if(current != null) {
3147 found = current.remove(result);
3151 if(!found) procedure.add(graph, result);
3158 public void finished(AsyncReadGraph graph) {
3160 if(current != null) {
3161 for(Resource r : current) procedure.remove(graph, r);
3166 run = new HashSet<Resource>();
3171 public boolean isDisposed() {
3172 return procedure.isDisposed();
3176 public void exception(AsyncReadGraph graph, Throwable t) {
3177 procedure.exception(graph, t);
3181 public String toString() {
3182 return "forPrincipalTypeSet " + procedure;
3190 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3192 assert(subject != null);
3193 assert(predicate != null);
3194 assert(procedure != null);
3196 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3198 private Set<Resource> current = null;
3199 private Set<Resource> run = new HashSet<Resource>();
3202 public void execute(AsyncReadGraph graph, Resource result) {
3204 boolean found = false;
3206 if(current != null) {
3208 found = current.remove(result);
3212 if(!found) procedure.add(graph, result);
3219 public void finished(AsyncReadGraph graph) {
3221 if(current != null) {
3222 for(Resource r : current) procedure.remove(graph, r);
3227 run = new HashSet<Resource>();
3232 public boolean isDisposed() {
3233 return procedure.isDisposed();
3237 public void exception(AsyncReadGraph graph, Throwable t) {
3238 procedure.exception(graph, t);
3242 public String toString() {
3243 return "forObjectSet " + procedure;
3251 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3253 assert(subject != null);
3254 assert(predicate != null);
3255 assert(procedure != null);
3257 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3259 private Set<Statement> current = null;
3260 private Set<Statement> run = new HashSet<Statement>();
3263 public void execute(AsyncReadGraph graph, Statement result) {
3265 boolean found = false;
3267 if(current != null) {
3269 found = current.remove(result);
3273 if(!found) procedure.add(graph, result);
3280 public void finished(AsyncReadGraph graph) {
3282 if(current != null) {
3283 for(Statement s : current) procedure.remove(graph, s);
3288 run = new HashSet<Statement>();
3293 public boolean isDisposed() {
3294 return procedure.isDisposed();
3298 public void exception(AsyncReadGraph graph, Throwable t) {
3299 procedure.exception(graph, t);
3303 public String toString() {
3304 return "forStatementSet " + procedure;
3312 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
3313 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3315 assert(subject != null);
3316 assert(predicate != null);
3317 assert(procedure != null);
3319 final ListenerBase listener = getListenerBase(procedure);
3322 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3325 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3327 procedure.execute(graph, querySupport.getResource(o));
3328 } catch (Throwable t2) {
3329 Logger.defaultLogError(t2);
3334 public void finished(ReadGraphImpl graph) {
3336 procedure.finished(graph);
3337 } catch (Throwable t2) {
3338 Logger.defaultLogError(t2);
3340 // impl.state.barrier.dec();
3344 public void exception(ReadGraphImpl graph, Throwable t) {
3346 procedure.exception(graph, t);
3347 } catch (Throwable t2) {
3348 Logger.defaultLogError(t2);
3350 // impl.state.barrier.dec();
3354 } catch (DatabaseException e) {
3355 Logger.defaultLogError(e);
3361 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3363 assert(subject != null);
3364 assert(procedure != null);
3366 final ListenerBase listener = getListenerBase(procedure);
3368 IntProcedure ip = new IntProcedure() {
3371 public void execute(ReadGraphImpl graph, int i) {
3373 procedure.execute(graph, querySupport.getResource(i));
3374 } catch (Throwable t2) {
3375 Logger.defaultLogError(t2);
3380 public void finished(ReadGraphImpl graph) {
3382 procedure.finished(graph);
3383 } catch (Throwable t2) {
3384 Logger.defaultLogError(t2);
3386 // impl.state.barrier.dec(this);
3390 public void exception(ReadGraphImpl graph, Throwable t) {
3392 procedure.exception(graph, t);
3393 } catch (Throwable t2) {
3394 Logger.defaultLogError(t2);
3396 // impl.state.barrier.dec(this);
3401 int sId = querySupport.getId(subject);
3403 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3404 // else impl.state.barrier.inc(null, null);
3407 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3408 } catch (DatabaseException e) {
3409 Logger.defaultLogError(e);
3415 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3417 assert(subject != null);
3418 assert(procedure != null);
3420 final ListenerBase listener = getListenerBase(procedure);
3422 // impl.state.barrier.inc();
3425 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3428 public void execute(ReadGraphImpl graph, int i) {
3430 procedure.execute(querySupport.getResource(i));
3431 } catch (Throwable t2) {
3432 Logger.defaultLogError(t2);
3437 public void finished(ReadGraphImpl graph) {
3439 procedure.finished();
3440 } catch (Throwable t2) {
3441 Logger.defaultLogError(t2);
3443 // impl.state.barrier.dec();
3447 public void exception(ReadGraphImpl graph, Throwable t) {
3449 procedure.exception(t);
3450 } catch (Throwable t2) {
3451 Logger.defaultLogError(t2);
3453 // impl.state.barrier.dec();
3457 } catch (DatabaseException e) {
3458 Logger.defaultLogError(e);
3462 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3464 assert(subject != null);
3465 assert(procedure != null);
3467 final ListenerBase listener = getListenerBase(procedure);
3468 assert(listener == null);
3470 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3473 public void execute(final ReadGraphImpl graph, IntSet set) {
3474 procedure.execute(graph, set);
3478 public void exception(ReadGraphImpl graph, Throwable t) {
3479 procedure.exception(graph, t);
3484 int sId = querySupport.getId(subject);
3487 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3488 } catch (DatabaseException e) {
3489 Logger.defaultLogError(e);
3495 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3497 assert(subject != null);
3499 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3504 final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3506 assert(subject != null);
3508 return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null);
3513 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3515 assert(subject != null);
3516 assert(procedure != null);
3518 final ListenerBase listener = getListenerBase(procedure);
3521 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3523 AtomicBoolean first = new AtomicBoolean(true);
3526 public void execute(final ReadGraphImpl graph, IntSet set) {
3527 // final HashSet<Resource> result = new HashSet<Resource>();
3528 // set.forEach(new TIntProcedure() {
3531 // public boolean execute(int type) {
3532 // result.add(querySupport.getResource(type));
3538 if(first.compareAndSet(true, false)) {
3539 procedure.execute(graph, set);
3540 // impl.state.barrier.dec();
3542 procedure.execute(impl.newRestart(graph), set);
3544 } catch (Throwable t2) {
3545 Logger.defaultLogError(t2);
3550 public void exception(ReadGraphImpl graph, Throwable t) {
3552 if(first.compareAndSet(true, false)) {
3553 procedure.exception(graph, t);
3554 // impl.state.barrier.dec();
3556 procedure.exception(impl.newRestart(graph), t);
3558 } catch (Throwable t2) {
3559 Logger.defaultLogError(t2);
3564 } catch (DatabaseException e) {
3565 Logger.defaultLogError(e);
3571 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3573 assert(subject != null);
3574 assert(procedure != null);
3576 final ListenerBase listener = getListenerBase(procedure);
3578 IntProcedure ip = new IntProcedureAdapter() {
3581 public void execute(final ReadGraphImpl graph, int superRelation) {
3583 procedure.execute(graph, querySupport.getResource(superRelation));
3584 } catch (Throwable t2) {
3585 Logger.defaultLogError(t2);
3590 public void finished(final ReadGraphImpl graph) {
3592 procedure.finished(graph);
3593 } catch (Throwable t2) {
3594 Logger.defaultLogError(t2);
3596 // impl.state.barrier.dec(this);
3601 public void exception(ReadGraphImpl graph, Throwable t) {
3603 procedure.exception(graph, t);
3604 } catch (Throwable t2) {
3605 Logger.defaultLogError(t2);
3607 // impl.state.barrier.dec(this);
3612 int sId = querySupport.getId(subject);
3614 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3615 // else impl.state.barrier.inc(null, null);
3618 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3619 } catch (DatabaseException e) {
3620 Logger.defaultLogError(e);
3623 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3628 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3630 assert(subject != null);
3631 assert(procedure != null);
3633 final ListenerBase listener = getListenerBase(procedure);
3635 // impl.state.barrier.inc();
3637 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3642 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3644 assert(subject != null);
3645 assert(procedure != null);
3647 final ListenerBase listener = getListenerBase(procedure);
3649 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3652 public void execute(final ReadGraphImpl graph, IntSet set) {
3653 // final HashSet<Resource> result = new HashSet<Resource>();
3654 // set.forEach(new TIntProcedure() {
3657 // public boolean execute(int type) {
3658 // result.add(querySupport.getResource(type));
3664 procedure.execute(graph, set);
3665 } catch (Throwable t2) {
3666 Logger.defaultLogError(t2);
3668 // impl.state.barrier.dec(this);
3672 public void exception(ReadGraphImpl graph, Throwable t) {
3674 procedure.exception(graph, t);
3675 } catch (Throwable t2) {
3676 Logger.defaultLogError(t2);
3678 // impl.state.barrier.dec(this);
3683 int sId = querySupport.getId(subject);
3685 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3686 // else impl.state.barrier.inc(null, null);
3689 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3690 } catch (DatabaseException e) {
3691 Logger.defaultLogError(e);
3696 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3697 return getValue(impl, querySupport.getId(subject));
3700 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3701 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3705 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3707 assert(subject != null);
3708 assert(procedure != null);
3710 int sId = querySupport.getId(subject);
3712 // if(procedure != null) {
3714 final ListenerBase listener = getListenerBase(procedure);
3716 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3718 AtomicBoolean first = new AtomicBoolean(true);
3721 public void execute(ReadGraphImpl graph, byte[] result) {
3723 if(first.compareAndSet(true, false)) {
3724 procedure.execute(graph, result);
3725 // impl.state.barrier.dec(this);
3727 procedure.execute(impl.newRestart(graph), result);
3729 } catch (Throwable t2) {
3730 Logger.defaultLogError(t2);
3735 public void exception(ReadGraphImpl graph, Throwable t) {
3737 if(first.compareAndSet(true, false)) {
3738 procedure.exception(graph, t);
3739 // impl.state.barrier.dec(this);
3741 procedure.exception(impl.newRestart(graph), t);
3743 } catch (Throwable t2) {
3744 Logger.defaultLogError(t2);
3750 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3751 // else impl.state.barrier.inc(null, null);
3754 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3755 } catch (DatabaseException e) {
3756 throw new IllegalStateException("Internal error");
3761 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3765 // throw new IllegalStateException("Internal error");
3770 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3772 assert(subject != null);
3773 assert(procedure != null);
3775 final ListenerBase listener = getListenerBase(procedure);
3777 if(impl.parent != null || listener != null) {
3779 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3781 AtomicBoolean first = new AtomicBoolean(true);
3784 public void execute(ReadGraphImpl graph, byte[] result) {
3786 if(first.compareAndSet(true, false)) {
3787 procedure.execute(graph, result);
3788 // impl.state.barrier.dec(this);
3790 procedure.execute(impl.newRestart(graph), result);
3792 } catch (Throwable t2) {
3793 Logger.defaultLogError(t2);
3798 public void exception(ReadGraphImpl graph, Throwable t) {
3800 if(first.compareAndSet(true, false)) {
3801 procedure.exception(graph, t);
3802 // impl.state.barrier.dec(this);
3804 procedure.exception(impl.newRestart(graph), t);
3806 } catch (Throwable t2) {
3807 Logger.defaultLogError(t2);
3813 int sId = querySupport.getId(subject);
3815 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3816 // else impl.state.barrier.inc(null, null);
3819 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3820 } catch (DatabaseException e) {
3821 Logger.defaultLogError(e);
3826 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3829 public void execute(ReadGraphImpl graph, byte[] result) {
3831 procedure.execute(graph, result);
3836 public void exception(ReadGraphImpl graph, Throwable t) {
3838 procedure.exception(graph, t);
3844 int sId = querySupport.getId(subject);
3847 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3848 } catch (DatabaseException e) {
3849 Logger.defaultLogError(e);
3857 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3859 assert(relation != null);
3860 assert(procedure != null);
3862 final ListenerBase listener = getListenerBase(procedure);
3864 IntProcedure ip = new IntProcedure() {
3866 private int result = 0;
3868 final AtomicBoolean found = new AtomicBoolean(false);
3869 final AtomicBoolean done = new AtomicBoolean(false);
3872 public void finished(ReadGraphImpl graph) {
3874 // Shall fire exactly once!
3875 if(done.compareAndSet(false, true)) {
3878 procedure.exception(graph, new NoInverseException(""));
3879 // impl.state.barrier.dec(this);
3881 procedure.execute(graph, querySupport.getResource(result));
3882 // impl.state.barrier.dec(this);
3884 } catch (Throwable t) {
3885 Logger.defaultLogError(t);
3892 public void execute(ReadGraphImpl graph, int i) {
3894 if(found.compareAndSet(false, true)) {
3897 // Shall fire exactly once!
3898 if(done.compareAndSet(false, true)) {
3900 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3901 // impl.state.barrier.dec(this);
3902 } catch (Throwable t) {
3903 Logger.defaultLogError(t);
3911 public void exception(ReadGraphImpl graph, Throwable t) {
3912 // Shall fire exactly once!
3913 if(done.compareAndSet(false, true)) {
3915 procedure.exception(graph, t);
3916 // impl.state.barrier.dec(this);
3917 } catch (Throwable t2) {
3918 Logger.defaultLogError(t2);
3925 int sId = querySupport.getId(relation);
3927 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3928 // else impl.state.barrier.inc(null, null);
3931 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3932 } catch (DatabaseException e) {
3933 Logger.defaultLogError(e);
3939 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3942 assert(procedure != null);
3944 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3947 public void execute(ReadGraphImpl graph, Integer result) {
3949 procedure.execute(graph, querySupport.getResource(result));
3950 } catch (Throwable t2) {
3951 Logger.defaultLogError(t2);
3953 // impl.state.barrier.dec(this);
3957 public void exception(ReadGraphImpl graph, Throwable t) {
3960 procedure.exception(graph, t);
3961 } catch (Throwable t2) {
3962 Logger.defaultLogError(t2);
3964 // impl.state.barrier.dec(this);
3969 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3970 // else impl.state.barrier.inc(null, null);
3972 forResource(impl, id, impl.parent, ip);
3977 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3980 assert(procedure != null);
3982 // impl.state.barrier.inc();
3985 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
3988 public void execute(ReadGraphImpl graph, Integer result) {
3990 procedure.execute(graph, querySupport.getResource(result));
3991 } catch (Throwable t2) {
3992 Logger.defaultLogError(t2);
3994 // impl.state.barrier.dec();
3998 public void exception(ReadGraphImpl graph, Throwable t) {
4000 procedure.exception(graph, t);
4001 } catch (Throwable t2) {
4002 Logger.defaultLogError(t2);
4004 // impl.state.barrier.dec();
4008 } catch (DatabaseException e) {
4009 Logger.defaultLogError(e);
4015 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4017 assert(subject != null);
4018 assert(procedure != null);
4020 final ListenerBase listener = getListenerBase(procedure);
4023 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
4024 procedure.execute(impl, !result.isEmpty());
4025 } catch (DatabaseException e) {
4026 procedure.exception(impl, e);
4032 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
4034 assert(subject != null);
4035 assert(predicate != null);
4036 assert(procedure != null);
4038 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
4040 boolean found = false;
4043 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4048 synchronized public void finished(AsyncReadGraph graph) {
4050 procedure.execute(graph, found);
4051 } catch (Throwable t2) {
4052 Logger.defaultLogError(t2);
4054 // impl.state.barrier.dec(this);
4058 public void exception(AsyncReadGraph graph, Throwable t) {
4060 procedure.exception(graph, t);
4061 } catch (Throwable t2) {
4062 Logger.defaultLogError(t2);
4064 // impl.state.barrier.dec(this);
4069 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
4070 // else impl.state.barrier.inc(null, null);
4072 forEachObject(impl, subject, predicate, ip);
4077 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
4079 assert(subject != null);
4080 assert(predicate != null);
4081 assert(procedure != null);
4083 // impl.state.barrier.inc();
4085 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
4087 boolean found = false;
4090 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4091 if(resource.equals(object)) found = true;
4095 synchronized public void finished(AsyncReadGraph graph) {
4097 procedure.execute(graph, found);
4098 } catch (Throwable t2) {
4099 Logger.defaultLogError(t2);
4101 // impl.state.barrier.dec();
4105 public void exception(AsyncReadGraph graph, Throwable t) {
4107 procedure.exception(graph, t);
4108 } catch (Throwable t2) {
4109 Logger.defaultLogError(t2);
4111 // impl.state.barrier.dec();
4119 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4121 assert(subject != null);
4122 assert(procedure != null);
4124 final ListenerBase listener = getListenerBase(procedure);
4126 // impl.state.barrier.inc();
4129 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
4132 public void execute(ReadGraphImpl graph, byte[] object) {
4133 boolean result = object != null;
4135 procedure.execute(graph, result);
4136 } catch (Throwable t2) {
4137 Logger.defaultLogError(t2);
4139 // impl.state.barrier.dec();
4143 public void exception(ReadGraphImpl graph, Throwable t) {
4145 procedure.exception(graph, t);
4146 } catch (Throwable t2) {
4147 Logger.defaultLogError(t2);
4149 // impl.state.barrier.dec();
4153 } catch (DatabaseException e) {
4154 Logger.defaultLogError(e);
4160 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4162 assert(subject != null);
4163 assert(procedure != null);
4165 final ListenerBase listener = getListenerBase(procedure);
4169 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
4172 public void exception(ReadGraphImpl graph, Throwable t) {
4174 procedure.exception(graph, t);
4175 } catch (Throwable t2) {
4176 Logger.defaultLogError(t2);
4178 // impl.state.barrier.dec();
4182 public void execute(ReadGraphImpl graph, int i) {
4184 procedure.execute(graph, querySupport.getResource(i));
4185 } catch (Throwable t2) {
4186 Logger.defaultLogError(t2);
4191 public void finished(ReadGraphImpl graph) {
4193 procedure.finished(graph);
4194 } catch (Throwable t2) {
4195 Logger.defaultLogError(t2);
4197 // impl.state.barrier.dec();
4201 } catch (DatabaseException e) {
4202 Logger.defaultLogError(e);
4208 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
4210 // assert(request != null);
4211 // assert(procedure != null);
4213 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
4218 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
4220 // assert(graph != null);
4221 // assert(request != null);
4223 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
4224 // if(entry != null && entry.isReady()) {
4225 // return (T)entry.get(graph, this, null);
4227 // return request.perform(graph);
4232 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
4234 // assert(graph != null);
4235 // assert(request != null);
4237 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
4238 // if(entry != null && entry.isReady()) {
4239 // if(entry.isExcepted()) {
4240 // Throwable t = (Throwable)entry.getResult();
4241 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4242 // else throw new DatabaseException(t);
4244 // return (T)entry.getResult();
4248 // final DataContainer<T> result = new DataContainer<T>();
4249 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
4251 // request.register(graph, new Listener<T>() {
4254 // public void exception(Throwable t) {
4255 // exception.set(t);
4259 // public void execute(T t) {
4264 // public boolean isDisposed() {
4270 // Throwable t = exception.get();
4272 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4273 // else throw new DatabaseException(t);
4276 // return result.get();
4283 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
4285 // assert(graph != null);
4286 // assert(request != null);
4288 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
4289 // if(entry != null && entry.isReady()) {
4290 // if(entry.isExcepted()) {
4291 // procedure.exception(graph, (Throwable)entry.getResult());
4293 // procedure.execute(graph, (T)entry.getResult());
4296 // request.perform(graph, procedure);
4302 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final SyncMultiProcedure<T> procedure, ListenerBase listener) {
4304 assert(request != null);
4305 assert(procedure != null);
4309 queryMultiRead(impl, request, parent, listener, procedure);
4311 } catch (DatabaseException e) {
4313 throw new IllegalStateException(e);
4320 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4322 assert(request != null);
4323 assert(procedure != null);
4325 // impl.state.barrier.inc();
4327 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4329 public void execute(AsyncReadGraph graph, T result) {
4332 procedure.execute(graph, result);
4333 } catch (Throwable t2) {
4334 Logger.defaultLogError(t2);
4339 public void finished(AsyncReadGraph graph) {
4342 procedure.finished(graph);
4343 } catch (Throwable t2) {
4344 Logger.defaultLogError(t2);
4347 // impl.state.barrier.dec();
4352 public String toString() {
4353 return procedure.toString();
4357 public void exception(AsyncReadGraph graph, Throwable t) {
4360 procedure.exception(graph, t);
4361 } catch (Throwable t2) {
4362 Logger.defaultLogError(t2);
4365 // impl.state.barrier.dec();
4374 // final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
4376 // assert(request != null);
4377 // assert(procedure != null);
4381 // queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4384 // public String toString() {
4385 // return procedure.toString();
4389 // public void execute(AsyncReadGraph graph, T result) {
4391 // procedure.execute(result);
4392 // } catch (Throwable t2) {
4393 // Logger.defaultLogError(t2);
4398 // public void exception(AsyncReadGraph graph, Throwable throwable) {
4400 // procedure.exception(throwable);
4401 // } catch (Throwable t2) {
4402 // Logger.defaultLogError(t2);
4408 // } catch (DatabaseException e) {
4410 // throw new IllegalStateException(e);
4417 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4419 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4424 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4426 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4431 public VirtualGraph getValueProvider(Resource subject) {
4433 return querySupport.getValueProvider(querySupport.getId(subject));
4437 public boolean resumeTasks(ReadGraphImpl graph) {
4439 return querySupport.resume(graph);
4443 public boolean isImmutable(int resourceId) {
4444 return querySupport.isImmutable(resourceId);
4447 public boolean isImmutable(Resource resource) {
4448 ResourceImpl impl = (ResourceImpl)resource;
4449 return isImmutable(impl.id);
4454 public Layer0 getL0(ReadGraph graph) {
4456 L0 = Layer0.getInstance(graph);
4461 public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
4462 protected Integer initialValue() {