1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.lang.ref.WeakReference;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.IdentityHashMap;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
32 import java.util.concurrent.Semaphore;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.locks.Condition;
36 import java.util.concurrent.locks.ReentrantLock;
38 import org.simantics.databoard.Bindings;
39 import org.simantics.db.AsyncReadGraph;
40 import org.simantics.db.DevelopmentKeys;
41 import org.simantics.db.DirectStatements;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.RelationInfo;
44 import org.simantics.db.Resource;
45 import org.simantics.db.Session;
46 import org.simantics.db.Statement;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
49 import org.simantics.db.common.utils.Logger;
50 import org.simantics.db.debug.ListenerReport;
51 import org.simantics.db.exception.DatabaseException;
52 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
53 import org.simantics.db.exception.NoInverseException;
54 import org.simantics.db.exception.ResourceNotFoundException;
55 import org.simantics.db.impl.DebugPolicy;
56 import org.simantics.db.impl.ResourceImpl;
57 import org.simantics.db.impl.graph.MultiIntProcedure;
58 import org.simantics.db.impl.graph.ReadGraphImpl;
59 import org.simantics.db.impl.graph.ReadGraphSupport;
60 import org.simantics.db.impl.graph.WriteGraphImpl;
61 import org.simantics.db.impl.procedure.IntProcedureAdapter;
62 import org.simantics.db.impl.procedure.InternalProcedure;
63 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
64 import org.simantics.db.impl.support.ResourceSupport;
65 import org.simantics.db.procedure.AsyncMultiListener;
66 import org.simantics.db.procedure.AsyncMultiProcedure;
67 import org.simantics.db.procedure.AsyncProcedure;
68 import org.simantics.db.procedure.AsyncSetListener;
69 import org.simantics.db.procedure.ListenerBase;
70 import org.simantics.db.procedure.MultiProcedure;
71 import org.simantics.db.procedure.Procedure;
72 import org.simantics.db.procedure.StatementProcedure;
73 import org.simantics.db.request.AsyncMultiRead;
74 import org.simantics.db.request.AsyncRead;
75 import org.simantics.db.request.ExternalRead;
76 import org.simantics.db.request.MultiRead;
77 import org.simantics.db.request.Read;
78 import org.simantics.db.request.RequestFlags;
79 import org.simantics.db.request.WriteTraits;
80 import org.simantics.layer0.Layer0;
81 import org.simantics.utils.DataContainer;
82 import org.simantics.utils.Development;
83 import org.simantics.utils.datastructures.Pair;
84 import org.simantics.utils.datastructures.collections.CollectionUtils;
85 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
87 import gnu.trove.procedure.TIntProcedure;
88 import gnu.trove.procedure.TLongProcedure;
89 import gnu.trove.procedure.TObjectProcedure;
90 import gnu.trove.set.hash.THashSet;
91 import gnu.trove.set.hash.TIntHashSet;
93 @SuppressWarnings({"rawtypes", "unchecked"})
94 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
96 public static int indent = 0;
101 public int boundQueries = 0;
104 final private int functionalRelation;
106 final private int superrelationOf;
108 final private int instanceOf;
110 final private int inverseOf;
112 final private int asserts;
114 final private int hasPredicate;
116 final private int hasPredicateInverse;
118 final private int hasObject;
120 final private int inherits;
122 final private int subrelationOf;
124 final private int rootLibrary;
127 * A cache for the root library resource. Initialized in
128 * {@link #getRootLibraryResource()}.
130 private volatile ResourceImpl rootLibraryResource;
132 final private int library;
134 final private int consistsOf;
136 final private int hasName;
138 AtomicInteger sleepers = new AtomicInteger(0);
140 private boolean updating = false;
143 private boolean firingListeners = false;
145 final public QueryCache cache;
146 final public QuerySupport querySupport;
147 final public Session session;
148 final public ResourceSupport resourceSupport;
150 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
152 QueryThread[] executors;
154 // public ArrayList<SessionTask>[] queues;
156 public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
160 INIT, RUN, SLEEP, DISPOSED
164 public ThreadState[] threadStates;
165 // public ReentrantLock[] threadLocks;
166 // public Condition[] threadConditions;
168 //public ArrayList<SessionTask>[] ownTasks;
170 //public ArrayList<SessionTask>[] ownSyncTasks;
172 //ArrayList<SessionTask>[] delayQueues;
174 final Object querySupportLock;
176 public Long modificationCounter = 0L;
178 public void close() {
181 SessionTask getOwnTask(int thread) {
182 synchronized(querySupportLock) {
184 while(index < freeScheduling.size()) {
185 SessionTask task = freeScheduling.get(index);
186 if(task.thread == thread && !task.systemCall)
187 return freeScheduling.remove(index);
194 public boolean performPending(int thread) {
195 SessionTask task = getOwnTask(thread);
204 // final public void scheduleOwn(int caller, SessionTask request) {
205 // ownTasks[caller].add(request);
208 final public void schedule(SessionTask request) {
210 int performer = request.thread;
212 if(DebugPolicy.SCHEDULE)
213 System.out.println("schedule " + request + " " + " -> " + performer);
215 //assert(performer >= 0);
217 assert(request != null);
219 // if(caller == performer) {
220 // request.run(caller);
223 // if(performer == THREADS) {
225 synchronized(querySupportLock) {
227 //new Exception().printStackTrace();
229 freeScheduling.add(request);
231 querySupportLock.notifyAll();
233 //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
235 // for(int i=0;i<THREADS;i++) {
236 // ReentrantLock queueLock = threadLocks[i];
238 // //queues[performer].add(request);
239 // //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
240 // threadConditions[i].signalAll();
241 // queueLock.unlock();
250 // ReentrantLock queueLock = threadLocks[performer];
252 // queues[performer].add(request);
253 // // This thread could have been sleeping
254 // if(queues[performer].size() == 1) {
255 // //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
256 // threadConditions[performer].signalAll();
258 // queueLock.unlock();
265 final public int THREAD_MASK;
267 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
269 public static abstract class SessionTask {
271 final public int thread;
272 final public boolean systemCall;
273 // final public int syncCaller;
274 //final public Object object;
276 public SessionTask(boolean systemCall) {
277 this.thread = QueryProcessor.thread.get();
278 this.systemCall = systemCall;
279 // this.syncCaller = -1;
280 //this.object = object;
283 // public SessionTask(Object object, int syncCaller) {
284 // this.thread = QueryProcessor.thread.get();
285 // this.syncCaller = syncCaller;
286 // this.object = object;
289 public abstract void run(int thread);
292 public String toString() {
293 return "SessionTask[" + super.toString() + "]";
298 public static abstract class SessionRead extends SessionTask {
300 final public Semaphore notify;
301 final public DataContainer<Throwable> throwable;
303 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
305 this.throwable = throwable;
306 this.notify = notify;
311 long waitingTime = 0;
314 static int koss2 = 0;
316 public boolean resume(ReadGraphImpl graph) {
317 return executors[0].runSynchronized();
320 //private WeakReference<GarbageTracker> garbageTracker;
322 private class GarbageTracker {
325 protected void finalize() throws Throwable {
327 // System.err.println("GarbageTracker");
329 // garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
337 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
338 throws DatabaseException {
340 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
343 THREAD_MASK = threads - 1;
346 cache = new QueryCache(core, threads);
347 session = querySupport.getSession();
348 resourceSupport = querySupport.getSupport();
349 querySupportLock = core.getLock();
351 executors = new QueryThread[THREADS];
352 // queues = new ArrayList[THREADS];
353 // threadLocks = new ReentrantLock[THREADS];
354 // threadConditions = new Condition[THREADS];
355 threadStates = new ThreadState[THREADS];
356 // ownTasks = new ArrayList[THREADS];
357 // ownSyncTasks = new ArrayList[THREADS];
358 // delayQueues = new ArrayList[THREADS * THREADS];
360 // freeSchedule = new AtomicInteger(0);
362 // for (int i = 0; i < THREADS * THREADS; i++) {
363 // delayQueues[i] = new ArrayList<SessionTask>();
366 for (int i = 0; i < THREADS; i++) {
368 // tasks[i] = new ArrayList<Runnable>();
369 // ownTasks[i] = new ArrayList<SessionTask>();
370 // ownSyncTasks[i] = new ArrayList<SessionTask>();
371 // queues[i] = new ArrayList<SessionTask>();
372 // threadLocks[i] = new ReentrantLock();
373 // threadConditions[i] = threadLocks[i].newCondition();
374 // limits[i] = false;
375 threadStates[i] = ThreadState.INIT;
379 for (int i = 0; i < THREADS; i++) {
383 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
385 threadSet.add(executors[i]);
390 for (int i = 0; i < THREADS; i++) {
391 executors[i].start();
394 // Make sure that query threads are up and running
395 while(sleepers.get() != THREADS) {
398 } catch (InterruptedException e) {
403 rootLibrary = core.getBuiltin("http:/");
404 boolean builtinsInstalled = rootLibrary != 0;
406 if (builtinsInstalled) {
407 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
408 assert (functionalRelation != 0);
410 functionalRelation = 0;
412 if (builtinsInstalled) {
413 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
414 assert (instanceOf != 0);
418 if (builtinsInstalled) {
419 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
420 assert (inverseOf != 0);
425 if (builtinsInstalled) {
426 inherits = core.getBuiltin(Layer0.URIs.Inherits);
427 assert (inherits != 0);
431 if (builtinsInstalled) {
432 asserts = core.getBuiltin(Layer0.URIs.Asserts);
433 assert (asserts != 0);
437 if (builtinsInstalled) {
438 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
439 assert (hasPredicate != 0);
443 if (builtinsInstalled) {
444 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
445 assert (hasPredicateInverse != 0);
447 hasPredicateInverse = 0;
449 if (builtinsInstalled) {
450 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
451 assert (hasObject != 0);
455 if (builtinsInstalled) {
456 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
457 assert (subrelationOf != 0);
461 if (builtinsInstalled) {
462 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
463 assert (superrelationOf != 0);
467 if (builtinsInstalled) {
468 library = core.getBuiltin(Layer0.URIs.Library);
469 assert (library != 0);
473 if (builtinsInstalled) {
474 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
475 assert (consistsOf != 0);
479 if (builtinsInstalled) {
480 hasName = core.getBuiltin(Layer0.URIs.HasName);
481 assert (hasName != 0);
487 final public void releaseWrite(ReadGraphImpl graph) {
488 performDirtyUpdates(graph);
489 modificationCounter++;
492 final public int getId(final Resource r) {
493 return querySupport.getId(r);
496 public QuerySupport getCore() {
500 public int getFunctionalRelation() {
501 return functionalRelation;
504 public int getInherits() {
508 public int getInstanceOf() {
512 public int getInverseOf() {
516 public int getSubrelationOf() {
517 return subrelationOf;
520 public int getSuperrelationOf() {
521 return superrelationOf;
524 public int getAsserts() {
528 public int getHasPredicate() {
532 public int getHasPredicateInverse() {
533 return hasPredicateInverse;
536 public int getHasObject() {
540 public int getRootLibrary() {
544 public Resource getRootLibraryResource() {
545 if (rootLibraryResource == null) {
546 // Synchronization is not needed here, it doesn't matter if multiple
547 // threads simultaneously set rootLibraryResource once.
548 int root = getRootLibrary();
550 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
551 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
553 return rootLibraryResource;
556 public int getLibrary() {
560 public int getConsistsOf() {
564 public int getHasName() {
568 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
572 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
575 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
577 if (result != null && result != 0) {
578 procedure.execute(graph, result);
582 // Fall back to using the fixed builtins.
583 // result = querySupport.getBuiltin(id);
584 // if (result != 0) {
585 // procedure.execute(graph, result);
590 // result = querySupport.getRandomAccessReference(id);
591 // } catch (ResourceNotFoundException e) {
592 // procedure.exception(graph, e);
597 procedure.execute(graph, result);
599 procedure.exception(graph, new ResourceNotFoundException(id));
605 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
606 procedure.exception(graph, t);
610 } catch (DatabaseException e) {
614 procedure.exception(graph, e);
616 } catch (DatabaseException e1) {
618 Logger.defaultLogError(e1);
626 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
628 Integer result = querySupport.getBuiltin(id);
630 procedure.execute(graph, result);
632 procedure.exception(graph, new ResourceNotFoundException(id));
637 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
640 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
641 } catch (DatabaseException e) {
642 throw new IllegalStateException(e);
647 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
651 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
652 } catch (DatabaseException e) {
653 throw new IllegalStateException(e);
658 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
659 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
663 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
665 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
669 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
671 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
675 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
677 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
681 boolean isBound(ExternalReadEntry<?> entry) {
682 if(entry.hasParents()) return true;
683 else if(hasListener(entry)) return true;
687 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
689 if (parent != null && !inferred) {
691 if(!child.isImmutable(graph))
692 child.addParent(parent);
693 } catch (DatabaseException e) {
694 Logger.defaultLogError(e);
696 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
699 if (listener != null) {
700 return registerListener(child, listener, procedure);
708 static class Dummy implements InternalProcedure<Object>, IntProcedure {
711 public void execute(ReadGraphImpl graph, int i) {
715 public void finished(ReadGraphImpl graph) {
719 public void execute(ReadGraphImpl graph, Object result) {
723 public void exception(ReadGraphImpl graph, Throwable throwable) {
728 private static final Dummy dummy = new Dummy();
731 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
733 if (DebugPolicy.PERFORM)
734 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
737 assert (!collecting);
739 assert(query.assertNotDiscarded());
741 registerDependencies(graph, query, parent, listener, procedure, false);
743 // FRESH, REFUTED, EXCEPTED go here
744 if (!query.isReady()) {
749 query.computeForEach(graph, this, (Procedure)dummy, true);
750 return query.get(graph, this, null);
756 return query.get(graph, this, procedure);
764 interface QueryCollectorSupport {
765 public CacheCollectionResult allCaches();
766 public Collection<CacheEntry> getRootList();
767 public int getCurrentSize();
768 public int calculateCurrentSize();
769 public CacheEntryBase iterate(int level);
770 public void remove();
771 public void setLevel(CacheEntryBase entry, int level);
772 public boolean start(boolean flush);
775 interface QueryCollector {
777 public void collect(int youngTarget, int allowedTimeInMs);
781 class QueryCollectorSupportImpl implements QueryCollectorSupport {
783 private static final boolean DEBUG = false;
784 private static final double ITERATION_RATIO = 0.2;
786 private CacheCollectionResult iteration = new CacheCollectionResult();
787 private boolean fresh = true;
788 private boolean needDataInStart = true;
790 QueryCollectorSupportImpl() {
794 public CacheCollectionResult allCaches() {
795 CacheCollectionResult result = new CacheCollectionResult();
796 QueryProcessor.this.allCaches(result);
801 public boolean start(boolean flush) {
802 // We need new data from query maps
804 if(needDataInStart || flush) {
805 // Last run ended after processing all queries => refresh data
806 restart(flush ? 0.0 : ITERATION_RATIO);
808 // continue with previous big data
810 // Notify caller about iteration situation
811 return iteration.isAtStart();
814 private void restart(double targetRatio) {
816 needDataInStart = true;
818 long start = System.nanoTime();
821 // We need new data from query maps
823 int iterationSize = iteration.size()+1;
824 int diff = calculateCurrentSize()-iterationSize;
826 double ratio = (double)diff / (double)iterationSize;
827 boolean dirty = Math.abs(ratio) >= targetRatio;
830 iteration = allCaches();
832 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
833 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
834 System.err.print(" " + iteration.levels[i].size());
835 System.err.println("");
842 needDataInStart = false;
844 // We are returning here within the same GC round - reuse the cache table
853 public CacheEntryBase iterate(int level) {
855 CacheEntryBase entry = iteration.next(level);
857 restart(ITERATION_RATIO);
861 while(entry != null && entry.isDiscarded()) {
862 entry = iteration.next(level);
870 public void remove() {
875 public void setLevel(CacheEntryBase entry, int level) {
876 iteration.setLevel(entry, level);
879 public Collection<CacheEntry> getRootList() {
880 return cache.getRootList();
884 public int calculateCurrentSize() {
885 return cache.calculateCurrentSize();
889 public int getCurrentSize() {
894 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
896 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
897 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
899 public int querySize() {
903 public void gc(int youngTarget, int allowedTimeInMs) {
905 collector.collect(youngTarget, allowedTimeInMs);
909 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
911 assert (entry != null);
913 if (base.isDisposed())
916 return addListener(entry, base, procedure);
920 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
921 entry.setLastKnown(result);
924 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
926 assert (entry != null);
927 assert (procedure != null);
929 ArrayList<ListenerEntry> list = cache.listeners.get(entry);
931 list = new ArrayList<ListenerEntry>(1);
932 cache.listeners.put(entry, list);
935 ListenerEntry result = new ListenerEntry(entry, base, procedure);
936 int currentIndex = list.indexOf(result);
937 // There was already a listener
938 if(currentIndex > -1) {
939 ListenerEntry current = list.get(currentIndex);
940 if(!current.base.isDisposed()) return null;
941 list.set(currentIndex, result);
946 if(DebugPolicy.LISTENER) {
947 new Exception().printStackTrace();
948 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
955 private void scheduleListener(ListenerEntry entry) {
956 assert (entry != null);
957 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
958 scheduledListeners.add(entry);
961 private void removeListener(ListenerEntry entry) {
962 assert (entry != null);
963 ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
964 if(list == null) return;
965 boolean success = list.remove(entry);
968 cache.listeners.remove(entry.entry);
971 private boolean hasListener(CacheEntry entry) {
972 if(cache.listeners.get(entry) != null) return true;
976 boolean hasListenerAfterDisposing(CacheEntry entry) {
977 if(cache.listeners.get(entry) != null) {
978 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
979 ArrayList<ListenerEntry> list = null;
980 for (ListenerEntry e : entries) {
981 if (e.base.isDisposed()) {
982 if(list == null) list = new ArrayList<ListenerEntry>();
987 for (ListenerEntry e : list) {
991 if (entries.isEmpty()) {
992 cache.listeners.remove(entry);
1000 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
1001 hasListenerAfterDisposing(entry);
1002 if(cache.listeners.get(entry) != null)
1003 return cache.listeners.get(entry);
1005 return Collections.emptyList();
1008 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
1010 if(!workarea.containsKey(entry)) {
1012 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
1013 for(ListenerEntry e : getListenerEntries(entry))
1016 workarea.put(entry, ls);
1018 for(CacheEntry parent : entry.getParents(this)) {
1019 processListenerReport(parent, workarea);
1020 ls.addAll(workarea.get(parent));
1027 public synchronized ListenerReport getListenerReport() throws IOException {
1029 class ListenerReportImpl implements ListenerReport {
1031 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
1034 public void print(PrintStream b) {
1035 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
1036 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
1037 for(ListenerBase l : e.getValue()) {
1038 Integer i = hist.get(l);
1039 hist.put(l, i != null ? i-1 : -1);
1043 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1044 b.print("" + -p.second + " " + p.first + "\n");
1052 ListenerReportImpl result = new ListenerReportImpl();
1054 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1055 for(CacheEntryBase entry : all) {
1056 hasListenerAfterDisposing(entry);
1058 for(CacheEntryBase entry : all) {
1059 processListenerReport(entry, result.workarea);
1066 public synchronized String reportListeners(File file) throws IOException {
1071 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1072 ListenerReport report = getListenerReport();
1075 return "Done reporting listeners.";
1079 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1081 if(entry.isDiscarded()) return;
1082 if(workarea.containsKey(entry)) return;
1084 Iterable<CacheEntry> parents = entry.getParents(this);
1085 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1086 for(CacheEntry e : parents) {
1087 if(e.isDiscarded()) continue;
1089 processParentReport(e, workarea);
1091 workarea.put(entry, ps);
1095 public synchronized String reportQueryActivity(File file) throws IOException {
1097 System.err.println("reportQueries " + file.getAbsolutePath());
1102 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1104 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
1105 Collections.reverse(entries);
1107 for(Pair<String,Integer> entry : entries) {
1108 b.println(entry.first + ": " + entry.second);
1113 Development.histogram.clear();
1119 public synchronized String reportQueries(File file) throws IOException {
1121 System.err.println("reportQueries " + file.getAbsolutePath());
1126 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1128 long start = System.nanoTime();
1130 // ArrayList<CacheEntry> all = ;
1132 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
1133 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1134 for(CacheEntryBase entry : caches) {
1135 processParentReport(entry, workarea);
1138 // for(CacheEntry e : all) System.err.println("entry: " + e);
1140 long duration = System.nanoTime() - start;
1141 System.err.println("Query root set in " + 1e-9*duration + "s.");
1143 start = System.nanoTime();
1145 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
1149 for(CacheEntry entry : workarea.keySet()) {
1150 boolean listener = hasListenerAfterDisposing(entry);
1151 boolean hasParents = entry.getParents(this).iterator().hasNext();
1154 flagMap.put(entry, 0);
1155 } else if (!hasParents) {
1157 flagMap.put(entry, 1);
1160 flagMap.put(entry, 2);
1162 // // Write leaf bit
1163 // entry.flags |= 4;
1166 boolean done = true;
1173 long start2 = System.nanoTime();
1175 int boundCounter = 0;
1176 int unboundCounter = 0;
1177 int unknownCounter = 0;
1179 for(CacheEntry<?> entry : workarea.keySet()) {
1181 //System.err.println("process " + entry);
1183 int flags = flagMap.get(entry);
1184 int bindStatus = flags & 3;
1186 if(bindStatus == 0) boundCounter++;
1187 else if(bindStatus == 1) unboundCounter++;
1188 else if(bindStatus == 2) unknownCounter++;
1190 if(bindStatus < 2) continue;
1193 for(CacheEntry parent : entry.getParents(this)) {
1195 if(parent.isDiscarded()) flagMap.put(parent, 1);
1197 int flags2 = flagMap.get(parent);
1198 int bindStatus2 = flags2 & 3;
1199 // Parent is bound => child is bound
1200 if(bindStatus2 == 0) {
1204 // Parent is unknown => child is unknown
1205 else if (bindStatus2 == 2) {
1212 flagMap.put(entry, newStatus);
1216 duration = System.nanoTime() - start2;
1217 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1218 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1220 } while(!done && loops++ < 20);
1224 for(CacheEntry entry : workarea.keySet()) {
1226 int bindStatus = flagMap.get(entry);
1227 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1233 duration = System.nanoTime() - start;
1234 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1236 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1238 for(CacheEntry entry : workarea.keySet()) {
1239 Class<?> clazz = entry.getClass();
1240 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
1241 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
1242 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
1243 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
1244 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
1245 Integer c = counts.get(clazz);
1246 if(c == null) counts.put(clazz, -1);
1247 else counts.put(clazz, c-1);
1250 b.print("// Simantics DB client query report file\n");
1251 b.print("// This file contains the following information\n");
1252 b.print("// -The amount of cached query instances per query class\n");
1253 b.print("// -The sizes of retained child sets\n");
1254 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1255 b.print("// -Followed by status, where\n");
1256 b.print("// -0=bound\n");
1257 b.print("// -1=free\n");
1258 b.print("// -2=unknown\n");
1259 b.print("// -L=has listener\n");
1260 b.print("// -List of children for each query (search for 'C <query name>')\n");
1262 b.print("----------------------------------------\n");
1264 b.print("// Queries by class\n");
1265 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1266 b.print(-p.second + " " + p.first.getName() + "\n");
1269 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1270 for(CacheEntry e : workarea.keySet())
1273 boolean changed = true;
1275 while(changed && iter++<50) {
1279 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1280 for(CacheEntry e : workarea.keySet())
1283 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1284 Integer c = hist.get(e.getKey());
1285 for(CacheEntry p : e.getValue()) {
1286 Integer i = newHist.get(p);
1287 newHist.put(p, i+c);
1290 for(CacheEntry e : workarea.keySet()) {
1291 Integer value = newHist.get(e);
1292 Integer old = hist.get(e);
1293 if(!value.equals(old)) {
1295 // System.err.println("hist " + e + ": " + old + " => " + value);
1300 System.err.println("Retained set iteration " + iter);
1304 b.print("// Queries by retained set\n");
1305 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1306 b.print("" + -p.second + " " + p.first + "\n");
1309 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1311 b.print("// Entry parent listing\n");
1312 for(CacheEntry entry : workarea.keySet()) {
1313 int status = flagMap.get(entry);
1314 boolean hasListener = hasListenerAfterDisposing(entry);
1315 b.print("Q " + entry.toString());
1317 b.print(" (L" + status + ")");
1320 b.print(" (" + status + ")");
1323 for(CacheEntry parent : workarea.get(entry)) {
1324 Collection<CacheEntry> inv = inverse.get(parent);
1326 inv = new ArrayList<CacheEntry>();
1327 inverse.put(parent, inv);
1330 b.print(" " + parent.toString());
1335 b.print("// Entry child listing\n");
1336 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1337 b.print("C " + entry.getKey().toString());
1339 for(CacheEntry child : entry.getValue()) {
1340 Integer h = hist.get(child);
1344 b.print(" <no children>");
1346 b.print(" " + child.toString());
1351 b.print("#queries: " + workarea.keySet().size() + "\n");
1352 b.print("#listeners: " + listeners + "\n");
1356 return "Dumped " + workarea.keySet().size() + " queries.";
1362 public CacheEntry caller;
1364 public CacheEntry entry;
1368 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
1369 this.caller = caller;
1371 this.indent = indent;
1376 boolean removeQuery(CacheEntry entry) {
1378 // This entry has been removed before. No need to do anything here.
1379 if(entry.isDiscarded()) return false;
1381 assert (!entry.isDiscarded());
1383 Query query = entry.getQuery();
1385 query.removeEntry(this);
1390 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1401 * @return true if this entry is being listened
1403 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1407 CacheEntry entry = e.entry;
1409 //System.err.println("updateQuery " + entry);
1412 * If the dependency graph forms a DAG, some entries are inserted in the
1413 * todo list many times. They only need to be processed once though.
1415 if (entry.isDiscarded()) {
1416 if (Development.DEVELOPMENT) {
1417 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1418 System.out.print("D");
1419 for (int i = 0; i < e.indent; i++)
1420 System.out.print(" ");
1421 System.out.println(entry.getQuery());
1424 // System.err.println(" => DISCARDED");
1428 if (entry.isRefuted()) {
1429 if (Development.DEVELOPMENT) {
1430 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1431 System.out.print("R");
1432 for (int i = 0; i < e.indent; i++)
1433 System.out.print(" ");
1434 System.out.println(entry.getQuery());
1440 if (entry.isExcepted()) {
1441 if (Development.DEVELOPMENT) {
1442 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1443 System.out.print("E");
1448 if (entry.isPending()) {
1449 if (Development.DEVELOPMENT) {
1450 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1451 System.out.print("P");
1458 if (Development.DEVELOPMENT) {
1459 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1460 System.out.print("U ");
1461 for (int i = 0; i < e.indent; i++)
1462 System.out.print(" ");
1463 System.out.print(entry.getQuery());
1467 Query query = entry.getQuery();
1468 int type = query.type();
1470 boolean hasListener = hasListener(entry);
1472 if (Development.DEVELOPMENT) {
1473 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1474 if(hasListener(entry)) {
1475 System.out.println(" (L)");
1477 System.out.println("");
1482 if(entry.isPending() || entry.isExcepted()) {
1485 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1487 immediates.put(entry, entry);
1502 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1504 immediates.put(entry, entry);
1518 // System.err.println(" => FOO " + type);
1521 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1522 if(entries != null) {
1523 for (ListenerEntry le : entries) {
1524 scheduleListener(le);
1529 // If invalid, update parents
1530 if (type == RequestFlags.INVALIDATE) {
1531 updateParents(e.indent, entry, todo);
1538 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
1540 Iterable<CacheEntry> oldParents = entry.getParents(this);
1541 for (CacheEntry parent : oldParents) {
1542 // System.err.println("updateParents " + entry + " => " + parent);
1543 if(!parent.isDiscarded())
1544 todo.push(new UpdateEntry(entry, parent, indent + 2));
1549 private boolean pruneListener(ListenerEntry entry) {
1550 if (entry.base.isDisposed()) {
1551 removeListener(entry);
1559 * @param av1 an array (guaranteed)
1560 * @param av2 any object
1561 * @return <code>true</code> if the two arrays are equal
1563 private final boolean arrayEquals(Object av1, Object av2) {
1566 Class<?> c1 = av1.getClass().getComponentType();
1567 Class<?> c2 = av2.getClass().getComponentType();
1568 if (c2 == null || !c1.equals(c2))
1570 boolean p1 = c1.isPrimitive();
1571 boolean p2 = c2.isPrimitive();
1575 return Arrays.equals((Object[]) av1, (Object[]) av2);
1576 if (boolean.class.equals(c1))
1577 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1578 else if (byte.class.equals(c1))
1579 return Arrays.equals((byte[]) av1, (byte[]) av2);
1580 else if (int.class.equals(c1))
1581 return Arrays.equals((int[]) av1, (int[]) av2);
1582 else if (long.class.equals(c1))
1583 return Arrays.equals((long[]) av1, (long[]) av2);
1584 else if (float.class.equals(c1))
1585 return Arrays.equals((float[]) av1, (float[]) av2);
1586 else if (double.class.equals(c1))
1587 return Arrays.equals((double[]) av1, (double[]) av2);
1588 throw new RuntimeException("??? Contact application querySupport.");
1593 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1597 Query query = entry.getQuery();
1599 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
1601 entry.prepareRecompute(querySupport);
1603 ReadGraphImpl parentGraph = graph.withParent(entry);
1605 query.recompute(parentGraph);
1607 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1609 Object newValue = entry.getResult();
1611 if (ListenerEntry.NO_VALUE == oldValue) {
1612 if(DebugPolicy.CHANGES) {
1613 System.out.println("C " + query);
1614 System.out.println("- " + oldValue);
1615 System.out.println("- " + newValue);
1620 boolean changed = false;
1622 if (newValue != null) {
1623 if (newValue.getClass().isArray()) {
1624 changed = !arrayEquals(newValue, oldValue);
1626 changed = !newValue.equals(oldValue);
1629 changed = (oldValue != null);
1631 if(DebugPolicy.CHANGES && changed) {
1632 System.out.println("C " + query);
1633 System.out.println("- " + oldValue);
1634 System.out.println("- " + newValue);
1637 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1639 } catch (Throwable t) {
1641 Logger.defaultLogError(t);
1643 return ListenerEntry.NO_VALUE;
1649 public boolean hasScheduledUpdates() {
1650 return !scheduledListeners.isEmpty();
1653 public void performScheduledUpdates(WriteGraphImpl graph) {
1656 assert (!cache.collecting);
1657 assert (!firingListeners);
1659 firingListeners = true;
1663 // Performing may cause further events to be scheduled.
1664 while (!scheduledListeners.isEmpty()) {
1667 // graph.state.barrier.inc();
1669 // Clone current events to make new entries possible during
1671 THashSet<ListenerEntry> entries = scheduledListeners;
1672 scheduledListeners = new THashSet<ListenerEntry>();
1674 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
1676 for (ListenerEntry listenerEntry : entries) {
1678 if (pruneListener(listenerEntry)) {
1679 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
1683 final CacheEntry entry = listenerEntry.entry;
1684 assert (entry != null);
1686 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
1688 if (newValue != ListenerEntry.NOT_CHANGED) {
1689 if(DebugPolicy.LISTENER)
1690 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
1691 schedule.add(listenerEntry);
1692 listenerEntry.setLastKnown(entry.getResult());
1697 for(ListenerEntry listenerEntry : schedule) {
1698 final CacheEntry entry = listenerEntry.entry;
1699 if(DebugPolicy.LISTENER)
1700 System.out.println("Firing " + listenerEntry.procedure);
1702 if(DebugPolicy.LISTENER)
1703 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
1704 entry.performFromCache(graph, listenerEntry.procedure);
1705 } catch (Throwable t) {
1706 t.printStackTrace();
1710 // graph.state.barrier.dec();
1711 // graph.waitAsync(null);
1712 // graph.state.barrier.assertReady();
1717 firingListeners = false;
1724 * @return true if this entry still has listeners
1726 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1728 assert (!cache.collecting);
1732 boolean hadListeners = false;
1733 boolean listenersUnknown = false;
1737 assert(entry != null);
1738 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1739 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1740 todo.add(new UpdateEntry(null, entry, 0));
1744 // Walk the tree and collect immediate updates
1745 while (!todo.isEmpty()) {
1746 UpdateEntry e = todo.pop();
1747 hadListeners |= updateQuery(e, todo, immediates);
1750 if(immediates.isEmpty()) break;
1752 // Evaluate all immediate updates and collect parents to update
1753 for(CacheEntry immediate : immediates.values()) {
1755 if(immediate.isDiscarded()) {
1759 if(immediate.isExcepted()) {
1761 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1762 if (newValue != ListenerEntry.NOT_CHANGED)
1763 updateParents(0, immediate, todo);
1767 Object oldValue = immediate.getResult();
1768 Object newValue = compareTo(graph, immediate, oldValue);
1770 if (newValue != ListenerEntry.NOT_CHANGED) {
1771 updateParents(0, immediate, todo);
1773 // If not changed, keep the old value
1774 immediate.setResult(oldValue);
1775 listenersUnknown = true;
1785 } catch (Throwable t) {
1786 Logger.defaultLogError(t);
1792 return hadListeners | listenersUnknown;
1796 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1797 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1798 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1799 // Maybe use a mutex from util.concurrent?
1800 private Object primitiveUpdateLock = new Object();
1801 private THashSet scheduledPrimitiveUpdates = new THashSet();
1803 public void performDirtyUpdates(final ReadGraphImpl graph) {
1805 cache.dirty = false;
1808 if (Development.DEVELOPMENT) {
1809 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1810 System.err.println("== Query update ==");
1814 // Special case - one statement
1815 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1817 long arg0 = scheduledObjectUpdates.getFirst();
1819 final int subject = (int)(arg0 >>> 32);
1820 final int predicate = (int)(arg0 & 0xffffffff);
1822 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1823 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1824 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1826 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1827 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1828 if(principalTypes != null) update(graph, principalTypes);
1829 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1830 if(types != null) update(graph, types);
1833 if(predicate == subrelationOf) {
1834 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1835 if(superRelations != null) update(graph, superRelations);
1838 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1839 if(dp != null) update(graph, dp);
1840 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1841 if(os != null) update(graph, os);
1843 scheduledObjectUpdates.clear();
1848 // Special case - one value
1849 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1851 int arg0 = scheduledValueUpdates.getFirst();
1853 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1854 if(valueQuery != null) update(graph, valueQuery);
1856 scheduledValueUpdates.clear();
1861 final TIntHashSet predicates = new TIntHashSet();
1862 final TIntHashSet orderedSets = new TIntHashSet();
1864 THashSet primitiveUpdates;
1865 synchronized (primitiveUpdateLock) {
1866 primitiveUpdates = scheduledPrimitiveUpdates;
1867 scheduledPrimitiveUpdates = new THashSet();
1870 primitiveUpdates.forEach(new TObjectProcedure() {
1873 public boolean execute(Object arg0) {
1875 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1876 if (query != null) {
1877 boolean listening = update(graph, query);
1878 if (!listening && !query.hasParents()) {
1879 cache.externalReadEntryMap.remove(arg0);
1888 scheduledValueUpdates.forEach(new TIntProcedure() {
1891 public boolean execute(int arg0) {
1892 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1893 if(valueQuery != null) update(graph, valueQuery);
1899 scheduledInvalidates.forEach(new TIntProcedure() {
1902 public boolean execute(int resource) {
1904 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1905 if(valueQuery != null) update(graph, valueQuery);
1907 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1908 if(principalTypes != null) update(graph, principalTypes);
1909 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1910 if(types != null) update(graph, types);
1912 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1913 if(superRelations != null) update(graph, superRelations);
1915 predicates.add(resource);
1922 scheduledObjectUpdates.forEach(new TLongProcedure() {
1925 public boolean execute(long arg0) {
1927 final int subject = (int)(arg0 >>> 32);
1928 final int predicate = (int)(arg0 & 0xffffffff);
1930 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1931 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1932 if(principalTypes != null) update(graph, principalTypes);
1933 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1934 if(types != null) update(graph, types);
1937 if(predicate == subrelationOf) {
1938 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1939 if(superRelations != null) update(graph, superRelations);
1942 predicates.add(subject);
1943 orderedSets.add(predicate);
1951 predicates.forEach(new TIntProcedure() {
1954 public boolean execute(final int subject) {
1956 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1957 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1958 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1960 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1961 if(entry != null) update(graph, entry);
1969 orderedSets.forEach(new TIntProcedure() {
1972 public boolean execute(int orderedSet) {
1974 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1975 if(entry != null) update(graph, entry);
1983 // for (Integer subject : predicates) {
1984 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
1985 // if(entry != null) update(graph, entry);
1989 if (Development.DEVELOPMENT) {
1990 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1991 System.err.println("== Query update ends ==");
1995 scheduledValueUpdates.clear();
1996 scheduledObjectUpdates.clear();
1997 scheduledInvalidates.clear();
2001 public void updateValue(final int resource) {
2002 scheduledValueUpdates.add(resource);
2006 public void updateStatements(final int resource, final int predicate) {
2007 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
2011 private int lastInvalidate = 0;
2013 public void invalidateResource(final int resource) {
2014 if(lastInvalidate == resource) return;
2015 scheduledValueUpdates.add(resource);
2016 lastInvalidate = resource;
2020 public void updatePrimitive(final ExternalRead primitive) {
2022 // External reads may be updated from arbitrary threads.
2023 // Synchronize to prevent race-conditions.
2024 synchronized (primitiveUpdateLock) {
2025 scheduledPrimitiveUpdates.add(primitive);
2027 querySupport.dirtyPrimitives();
2032 public synchronized String toString() {
2033 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
2037 protected void doDispose() {
2039 for(int index = 0; index < THREADS; index++) {
2040 executors[index].dispose();
2044 for(int i=0;i<100;i++) {
2046 boolean alive = false;
2047 for(int index = 0; index < THREADS; index++) {
2048 alive |= executors[index].isAlive();
2053 } catch (InterruptedException e) {
2054 Logger.defaultLogError(e);
2059 // Then start interrupting
2060 for(int i=0;i<100;i++) {
2062 boolean alive = false;
2063 for(int index = 0; index < THREADS; index++) {
2064 alive |= executors[index].isAlive();
2067 for(int index = 0; index < THREADS; index++) {
2068 executors[index].interrupt();
2072 // // Then just destroy
2073 // for(int index = 0; index < THREADS; index++) {
2074 // executors[index].destroy();
2077 for(int index = 0; index < THREADS; index++) {
2079 executors[index].join(5000);
2080 } catch (InterruptedException e) {
2081 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2083 executors[index] = null;
2088 public int getHits() {
2092 public int getMisses() {
2093 return cache.misses;
2096 public int getSize() {
2100 public Set<Long> getReferencedClusters() {
2101 HashSet<Long> result = new HashSet<Long>();
2102 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
2103 Objects query = (Objects) entry.getQuery();
2104 result.add(querySupport.getClusterId(query.r1()));
2106 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
2107 DirectPredicates query = (DirectPredicates) entry.getQuery();
2108 result.add(querySupport.getClusterId(query.id));
2110 for (CacheEntry entry : cache.valueQueryMap.values()) {
2111 ValueQuery query = (ValueQuery) entry.getQuery();
2112 result.add(querySupport.getClusterId(query.id));
2117 public void assertDone() {
2120 CacheCollectionResult allCaches(CacheCollectionResult result) {
2122 return cache.allCaches(result);
2126 public void printDiagnostics() {
2129 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
2130 querySupport.requestCluster(graph, clusterId, runnable);
2133 public int clean() {
2134 collector.collect(0, Integer.MAX_VALUE);
2138 public void clean(final Collection<ExternalRead<?>> requests) {
2139 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
2140 Iterator<ExternalRead<?>> iterator = requests.iterator();
2142 public CacheCollectionResult allCaches() {
2143 throw new UnsupportedOperationException();
2146 public CacheEntryBase iterate(int level) {
2147 if(iterator.hasNext()) {
2148 ExternalRead<?> request = iterator.next();
2149 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2150 if (entry != null) return entry;
2151 else return iterate(level);
2153 iterator = requests.iterator();
2158 public void remove() {
2159 throw new UnsupportedOperationException();
2162 public void setLevel(CacheEntryBase entry, int level) {
2163 throw new UnsupportedOperationException();
2166 public Collection<CacheEntry> getRootList() {
2167 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
2168 for (ExternalRead<?> request : requests) {
2169 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2176 public int getCurrentSize() {
2180 public int calculateCurrentSize() {
2181 // This tells the collector to attempt collecting everything.
2182 return Integer.MAX_VALUE;
2185 public boolean start(boolean flush) {
2189 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
2192 public void scanPending() {
2194 cache.scanPending();
2198 public ReadGraphImpl graphForVirtualRequest() {
2199 return ReadGraphImpl.createAsync(this);
2203 private HashMap<Resource, Class<?>> builtinValues;
2205 public Class<?> getBuiltinValue(Resource r) {
2206 if(builtinValues == null) initBuiltinValues();
2207 return builtinValues.get(r);
2210 Exception callerException = null;
2212 public interface AsyncBarrier {
2215 // public void inc(String debug);
2216 // public void dec(String debug);
2219 // final public QueryProcessor processor;
2220 // final public QuerySupport support;
2222 // boolean disposed = false;
2224 private void initBuiltinValues() {
2226 Layer0 b = getSession().peekService(Layer0.class);
2227 if(b == null) return;
2229 builtinValues = new HashMap<Resource, Class<?>>();
2231 builtinValues.put(b.String, String.class);
2232 builtinValues.put(b.Double, Double.class);
2233 builtinValues.put(b.Float, Float.class);
2234 builtinValues.put(b.Long, Long.class);
2235 builtinValues.put(b.Integer, Integer.class);
2236 builtinValues.put(b.Byte, Byte.class);
2237 builtinValues.put(b.Boolean, Boolean.class);
2239 builtinValues.put(b.StringArray, String[].class);
2240 builtinValues.put(b.DoubleArray, double[].class);
2241 builtinValues.put(b.FloatArray, float[].class);
2242 builtinValues.put(b.LongArray, long[].class);
2243 builtinValues.put(b.IntegerArray, int[].class);
2244 builtinValues.put(b.ByteArray, byte[].class);
2245 builtinValues.put(b.BooleanArray, boolean[].class);
2249 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
2251 // if (null == provider2) {
2252 // this.processor = null;
2256 // this.processor = provider2;
2257 // support = provider2.getCore();
2258 // initBuiltinValues();
2262 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2263 // return new ReadGraphSupportImpl(impl.processor);
2267 final public Session getSession() {
2271 final public ResourceSupport getResourceSupport() {
2272 return resourceSupport;
2276 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2278 throw new UnsupportedOperationException();
2280 // assert(subject != null);
2281 // assert(procedure != null);
2283 // final ListenerBase listener = getListenerBase(procedure);
2285 // IntProcedure ip = new IntProcedure() {
2287 // AtomicBoolean first = new AtomicBoolean(true);
2290 // public void execute(ReadGraphImpl graph, int i) {
2292 // if(first.get()) {
2293 // procedure.execute(graph, querySupport.getResource(i));
2295 // procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2297 // } catch (Throwable t2) {
2298 // Logger.defaultLogError(t2);
2303 // public void finished(ReadGraphImpl graph) {
2305 // if(first.compareAndSet(true, false)) {
2306 // procedure.finished(graph);
2307 //// impl.state.barrier.dec(this);
2309 // procedure.finished(impl.newRestart(graph));
2312 // } catch (Throwable t2) {
2313 // Logger.defaultLogError(t2);
2318 // public void exception(ReadGraphImpl graph, Throwable t) {
2320 // if(first.compareAndSet(true, false)) {
2321 // procedure.exception(graph, t);
2323 // procedure.exception(impl.newRestart(graph), t);
2325 // } catch (Throwable t2) {
2326 // Logger.defaultLogError(t2);
2332 // int sId = querySupport.getId(subject);
2335 // QueryCache.runnerPredicates(impl, sId, impl.parent, listener, ip);
2336 // } catch (DatabaseException e) {
2337 // Logger.defaultLogError(e);
2343 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2345 throw new UnsupportedOperationException();
2347 // assert(subject != null);
2348 // assert(procedure != null);
2350 // final ListenerBase listener = getListenerBase(procedure);
2353 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2356 // public void execute(ReadGraphImpl graph, int i) {
2358 // procedure.execute(querySupport.getResource(i));
2359 // } catch (Throwable t2) {
2360 // Logger.defaultLogError(t2);
2365 // public void finished(ReadGraphImpl graph) {
2367 // procedure.finished();
2368 // } catch (Throwable t2) {
2369 // Logger.defaultLogError(t2);
2371 //// impl.state.barrier.dec();
2375 // public void exception(ReadGraphImpl graph, Throwable t) {
2377 // procedure.exception(t);
2378 // } catch (Throwable t2) {
2379 // Logger.defaultLogError(t2);
2381 //// impl.state.barrier.dec();
2385 // } catch (DatabaseException e) {
2386 // Logger.defaultLogError(e);
2392 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2393 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2397 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2398 final Resource predicate, final MultiProcedure<Statement> procedure) {
2400 assert(subject != null);
2401 assert(predicate != null);
2402 assert(procedure != null);
2404 final ListenerBase listener = getListenerBase(procedure);
2406 // impl.state.barrier.inc();
2409 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2412 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2414 procedure.execute(querySupport.getStatement(s, p, o));
2415 } catch (Throwable t2) {
2416 Logger.defaultLogError(t2);
2421 public void finished(ReadGraphImpl graph) {
2423 procedure.finished();
2424 } catch (Throwable t2) {
2425 Logger.defaultLogError(t2);
2427 // impl.state.barrier.dec();
2431 public void exception(ReadGraphImpl graph, Throwable t) {
2433 procedure.exception(t);
2434 } catch (Throwable t2) {
2435 Logger.defaultLogError(t2);
2437 // impl.state.barrier.dec();
2441 } catch (DatabaseException e) {
2442 Logger.defaultLogError(e);
2448 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2449 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2451 assert(subject != null);
2452 assert(predicate != null);
2453 assert(procedure != null);
2455 final ListenerBase listener = getListenerBase(procedure);
2457 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2459 boolean first = true;
2462 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2465 procedure.execute(graph, querySupport.getStatement(s, p, o));
2467 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2469 } catch (Throwable t2) {
2470 Logger.defaultLogError(t2);
2475 public void finished(ReadGraphImpl graph) {
2480 procedure.finished(graph);
2481 // impl.state.barrier.dec(this);
2483 procedure.finished(impl.newRestart(graph));
2485 } catch (Throwable t2) {
2486 Logger.defaultLogError(t2);
2492 public void exception(ReadGraphImpl graph, Throwable t) {
2497 procedure.exception(graph, t);
2498 // impl.state.barrier.dec(this);
2500 procedure.exception(impl.newRestart(graph), t);
2502 } catch (Throwable t2) {
2503 Logger.defaultLogError(t2);
2510 int sId = querySupport.getId(subject);
2511 int pId = querySupport.getId(predicate);
2513 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2514 // else impl.state.barrier.inc(null, null);
2517 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2518 } catch (DatabaseException e) {
2519 Logger.defaultLogError(e);
2525 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2526 final Resource predicate, final StatementProcedure procedure) {
2528 assert(subject != null);
2529 assert(predicate != null);
2530 assert(procedure != null);
2532 final ListenerBase listener = getListenerBase(procedure);
2534 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2536 boolean first = true;
2539 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2542 procedure.execute(graph, s, p, o);
2544 procedure.execute(impl.newRestart(graph), s, p, o);
2546 } catch (Throwable t2) {
2547 Logger.defaultLogError(t2);
2552 public void finished(ReadGraphImpl graph) {
2557 procedure.finished(graph);
2558 // impl.state.barrier.dec(this);
2560 procedure.finished(impl.newRestart(graph));
2562 } catch (Throwable t2) {
2563 Logger.defaultLogError(t2);
2569 public void exception(ReadGraphImpl graph, Throwable t) {
2574 procedure.exception(graph, t);
2575 // impl.state.barrier.dec(this);
2577 procedure.exception(impl.newRestart(graph), t);
2579 } catch (Throwable t2) {
2580 Logger.defaultLogError(t2);
2587 int sId = querySupport.getId(subject);
2588 int pId = querySupport.getId(predicate);
2590 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2591 // else impl.state.barrier.inc(null, null);
2594 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2595 } catch (DatabaseException e) {
2596 Logger.defaultLogError(e);
2602 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2604 assert(subject != null);
2605 assert(predicate != null);
2606 assert(procedure != null);
2608 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2610 private Set<Statement> current = null;
2611 private Set<Statement> run = new HashSet<Statement>();
2614 public void execute(AsyncReadGraph graph, Statement result) {
2616 boolean found = false;
2618 if(current != null) {
2620 found = current.remove(result);
2624 if(!found) procedure.add(graph, result);
2631 public void finished(AsyncReadGraph graph) {
2633 if(current != null) {
2634 for(Statement r : current) procedure.remove(graph, r);
2639 run = new HashSet<Statement>();
2644 public void exception(AsyncReadGraph graph, Throwable t) {
2645 procedure.exception(graph, t);
2649 public boolean isDisposed() {
2650 return procedure.isDisposed();
2658 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2659 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2661 assert(subject != null);
2662 assert(predicate != null);
2663 assert(procedure != null);
2665 final ListenerBase listener = getListenerBase(procedure);
2667 // impl.state.barrier.inc();
2670 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2673 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2675 procedure.execute(graph, querySupport.getStatement(s, p, o));
2676 } catch (Throwable t2) {
2677 Logger.defaultLogError(t2);
2682 public void finished(ReadGraphImpl graph) {
2684 procedure.finished(graph);
2685 } catch (Throwable t2) {
2686 Logger.defaultLogError(t2);
2688 // impl.state.barrier.dec();
2692 public void exception(ReadGraphImpl graph, Throwable t) {
2694 procedure.exception(graph, t);
2695 } catch (Throwable t2) {
2696 Logger.defaultLogError(t2);
2698 // impl.state.barrier.dec();
2702 } catch (DatabaseException e) {
2703 Logger.defaultLogError(e);
2708 private static ListenerBase getListenerBase(Object procedure) {
2709 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2714 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2716 assert(subject != null);
2717 assert(predicate != null);
2718 assert(procedure != null);
2720 final ListenerBase listener = getListenerBase(procedure);
2722 // impl.state.barrier.inc();
2725 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2728 public void execute(ReadGraphImpl graph, int i) {
2730 procedure.execute(querySupport.getResource(i));
2731 } catch (Throwable t2) {
2732 Logger.defaultLogError(t2);
2737 public void finished(ReadGraphImpl graph) {
2739 procedure.finished();
2740 } catch (Throwable t2) {
2741 Logger.defaultLogError(t2);
2743 // impl.state.barrier.dec();
2747 public void exception(ReadGraphImpl graph, Throwable t) {
2748 System.out.println("forEachObject exception " + t);
2750 procedure.exception(t);
2751 } catch (Throwable t2) {
2752 Logger.defaultLogError(t2);
2754 // impl.state.barrier.dec();
2758 } catch (DatabaseException e) {
2759 Logger.defaultLogError(e);
2765 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2767 throw new UnsupportedOperationException();
2769 // assert(subject != null);
2770 // assert(procedure != null);
2772 // final ListenerBase listener = getListenerBase(procedure);
2774 // MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);
2776 // int sId = querySupport.getId(subject);
2779 // QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, proc);
2780 // } catch (DatabaseException e) {
2781 // Logger.defaultLogError(e);
2787 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {
2789 assert(subject != null);
2790 assert(procedure != null);
2792 final ListenerBase listener = getListenerBase(procedure);
2794 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2799 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2801 assert(subject != null);
2802 assert(procedure != null);
2804 final ListenerBase listener = getListenerBase(procedure);
2806 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2810 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2813 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2815 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2817 private Resource single = null;
2820 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2821 if(single == null) {
2824 single = INVALID_RESOURCE;
2829 public synchronized void finished(AsyncReadGraph graph) {
2830 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2831 else procedure.execute(graph, single);
2835 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2836 procedure.exception(graph, throwable);
2843 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2845 final int sId = querySupport.getId(subject);
2846 final int pId = querySupport.getId(predicate);
2849 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2850 } catch (DatabaseException e) {
2851 Logger.defaultLogError(e);
2856 static class Runner2Procedure implements IntProcedure {
2858 public int single = 0;
2859 public Throwable t = null;
2861 public void clear() {
2867 public void execute(ReadGraphImpl graph, int i) {
2868 if(single == 0) single = i;
2873 public void finished(ReadGraphImpl graph) {
2874 if(single == -1) single = 0;
2878 public void exception(ReadGraphImpl graph, Throwable throwable) {
2883 public int get() throws DatabaseException {
2885 if(t instanceof DatabaseException) throw (DatabaseException)t;
2886 else throw new DatabaseException(t);
2893 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2895 final int sId = querySupport.getId(subject);
2896 final int pId = querySupport.getId(predicate);
2898 Runner2Procedure proc = new Runner2Procedure();
2899 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2904 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2906 assert(subject != null);
2907 assert(predicate != null);
2909 final ListenerBase listener = getListenerBase(procedure);
2911 if(impl.parent != null || listener != null) {
2913 IntProcedure ip = new IntProcedure() {
2915 AtomicBoolean first = new AtomicBoolean(true);
2918 public void execute(ReadGraphImpl graph, int i) {
2921 procedure.execute(impl, querySupport.getResource(i));
2923 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2925 } catch (Throwable t2) {
2926 Logger.defaultLogError(t2);
2932 public void finished(ReadGraphImpl graph) {
2934 if(first.compareAndSet(true, false)) {
2935 procedure.finished(impl);
2936 // impl.state.barrier.dec(this);
2938 procedure.finished(impl.newRestart(graph));
2940 } catch (Throwable t2) {
2941 Logger.defaultLogError(t2);
2946 public void exception(ReadGraphImpl graph, Throwable t) {
2948 procedure.exception(graph, t);
2949 } catch (Throwable t2) {
2950 Logger.defaultLogError(t2);
2952 // impl.state.barrier.dec(this);
2956 public String toString() {
2957 return "forEachObject with " + procedure;
2962 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2963 // else impl.state.barrier.inc(null, null);
2965 forEachObject(impl, subject, predicate, listener, ip);
2969 IntProcedure ip = new IntProcedure() {
2972 public void execute(ReadGraphImpl graph, int i) {
2973 procedure.execute(graph, querySupport.getResource(i));
2977 public void finished(ReadGraphImpl graph) {
2978 procedure.finished(graph);
2982 public void exception(ReadGraphImpl graph, Throwable t) {
2983 procedure.exception(graph, t);
2987 public String toString() {
2988 return "forEachObject with " + procedure;
2993 forEachObject(impl, subject, predicate, listener, ip);
3000 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3002 assert(subject != null);
3003 assert(predicate != null);
3004 assert(procedure != null);
3006 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3008 private Set<Resource> current = null;
3009 private Set<Resource> run = new HashSet<Resource>();
3012 public void execute(AsyncReadGraph graph, Resource result) {
3014 boolean found = false;
3016 if(current != null) {
3018 found = current.remove(result);
3022 if(!found) procedure.add(graph, result);
3029 public void finished(AsyncReadGraph graph) {
3031 if(current != null) {
3032 for(Resource r : current) procedure.remove(graph, r);
3037 run = new HashSet<Resource>();
3042 public boolean isDisposed() {
3043 return procedure.isDisposed();
3047 public void exception(AsyncReadGraph graph, Throwable t) {
3048 procedure.exception(graph, t);
3052 public String toString() {
3053 return "forObjectSet " + procedure;
3061 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3063 assert(subject != null);
3064 assert(procedure != null);
3066 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
3068 private Set<Resource> current = null;
3069 private Set<Resource> run = new HashSet<Resource>();
3072 public void execute(AsyncReadGraph graph, Resource result) {
3074 boolean found = false;
3076 if(current != null) {
3078 found = current.remove(result);
3082 if(!found) procedure.add(graph, result);
3089 public void finished(AsyncReadGraph graph) {
3091 if(current != null) {
3092 for(Resource r : current) procedure.remove(graph, r);
3097 run = new HashSet<Resource>();
3102 public boolean isDisposed() {
3103 return procedure.isDisposed();
3107 public void exception(AsyncReadGraph graph, Throwable t) {
3108 procedure.exception(graph, t);
3112 public String toString() {
3113 return "forPredicateSet " + procedure;
3121 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3123 assert(subject != null);
3124 assert(procedure != null);
3126 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
3128 private Set<Resource> current = null;
3129 private Set<Resource> run = new HashSet<Resource>();
3132 public void execute(AsyncReadGraph graph, Resource result) {
3134 boolean found = false;
3136 if(current != null) {
3138 found = current.remove(result);
3142 if(!found) procedure.add(graph, result);
3149 public void finished(AsyncReadGraph graph) {
3151 if(current != null) {
3152 for(Resource r : current) procedure.remove(graph, r);
3157 run = new HashSet<Resource>();
3162 public boolean isDisposed() {
3163 return procedure.isDisposed();
3167 public void exception(AsyncReadGraph graph, Throwable t) {
3168 procedure.exception(graph, t);
3172 public String toString() {
3173 return "forPrincipalTypeSet " + procedure;
3181 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3183 assert(subject != null);
3184 assert(predicate != null);
3185 assert(procedure != null);
3187 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3189 private Set<Resource> current = null;
3190 private Set<Resource> run = new HashSet<Resource>();
3193 public void execute(AsyncReadGraph graph, Resource result) {
3195 boolean found = false;
3197 if(current != null) {
3199 found = current.remove(result);
3203 if(!found) procedure.add(graph, result);
3210 public void finished(AsyncReadGraph graph) {
3212 if(current != null) {
3213 for(Resource r : current) procedure.remove(graph, r);
3218 run = new HashSet<Resource>();
3223 public boolean isDisposed() {
3224 return procedure.isDisposed();
3228 public void exception(AsyncReadGraph graph, Throwable t) {
3229 procedure.exception(graph, t);
3233 public String toString() {
3234 return "forObjectSet " + procedure;
3242 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3244 assert(subject != null);
3245 assert(predicate != null);
3246 assert(procedure != null);
3248 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3250 private Set<Statement> current = null;
3251 private Set<Statement> run = new HashSet<Statement>();
3254 public void execute(AsyncReadGraph graph, Statement result) {
3256 boolean found = false;
3258 if(current != null) {
3260 found = current.remove(result);
3264 if(!found) procedure.add(graph, result);
3271 public void finished(AsyncReadGraph graph) {
3273 if(current != null) {
3274 for(Statement s : current) procedure.remove(graph, s);
3279 run = new HashSet<Statement>();
3284 public boolean isDisposed() {
3285 return procedure.isDisposed();
3289 public void exception(AsyncReadGraph graph, Throwable t) {
3290 procedure.exception(graph, t);
3294 public String toString() {
3295 return "forStatementSet " + procedure;
3303 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
3304 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3306 assert(subject != null);
3307 assert(predicate != null);
3308 assert(procedure != null);
3310 final ListenerBase listener = getListenerBase(procedure);
3313 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3316 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3318 procedure.execute(graph, querySupport.getResource(o));
3319 } catch (Throwable t2) {
3320 Logger.defaultLogError(t2);
3325 public void finished(ReadGraphImpl graph) {
3327 procedure.finished(graph);
3328 } catch (Throwable t2) {
3329 Logger.defaultLogError(t2);
3331 // impl.state.barrier.dec();
3335 public void exception(ReadGraphImpl graph, Throwable t) {
3337 procedure.exception(graph, t);
3338 } catch (Throwable t2) {
3339 Logger.defaultLogError(t2);
3341 // impl.state.barrier.dec();
3345 } catch (DatabaseException e) {
3346 Logger.defaultLogError(e);
3352 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3354 assert(subject != null);
3355 assert(procedure != null);
3357 final ListenerBase listener = getListenerBase(procedure);
3359 IntProcedure ip = new IntProcedure() {
3362 public void execute(ReadGraphImpl graph, int i) {
3364 procedure.execute(graph, querySupport.getResource(i));
3365 } catch (Throwable t2) {
3366 Logger.defaultLogError(t2);
3371 public void finished(ReadGraphImpl graph) {
3373 procedure.finished(graph);
3374 } catch (Throwable t2) {
3375 Logger.defaultLogError(t2);
3377 // impl.state.barrier.dec(this);
3381 public void exception(ReadGraphImpl graph, Throwable t) {
3383 procedure.exception(graph, t);
3384 } catch (Throwable t2) {
3385 Logger.defaultLogError(t2);
3387 // impl.state.barrier.dec(this);
3392 int sId = querySupport.getId(subject);
3394 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3395 // else impl.state.barrier.inc(null, null);
3398 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3399 } catch (DatabaseException e) {
3400 Logger.defaultLogError(e);
3406 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3408 assert(subject != null);
3409 assert(procedure != null);
3411 final ListenerBase listener = getListenerBase(procedure);
3413 // impl.state.barrier.inc();
3416 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3419 public void execute(ReadGraphImpl graph, int i) {
3421 procedure.execute(querySupport.getResource(i));
3422 } catch (Throwable t2) {
3423 Logger.defaultLogError(t2);
3428 public void finished(ReadGraphImpl graph) {
3430 procedure.finished();
3431 } catch (Throwable t2) {
3432 Logger.defaultLogError(t2);
3434 // impl.state.barrier.dec();
3438 public void exception(ReadGraphImpl graph, Throwable t) {
3440 procedure.exception(t);
3441 } catch (Throwable t2) {
3442 Logger.defaultLogError(t2);
3444 // impl.state.barrier.dec();
3448 } catch (DatabaseException e) {
3449 Logger.defaultLogError(e);
3453 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3455 assert(subject != null);
3456 assert(procedure != null);
3458 final ListenerBase listener = getListenerBase(procedure);
3459 assert(listener == null);
3461 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3464 public void execute(final ReadGraphImpl graph, IntSet set) {
3465 procedure.execute(graph, set);
3469 public void exception(ReadGraphImpl graph, Throwable t) {
3470 procedure.exception(graph, t);
3475 int sId = querySupport.getId(subject);
3478 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3479 } catch (DatabaseException e) {
3480 Logger.defaultLogError(e);
3486 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3488 assert(subject != null);
3490 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3495 final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {
3497 assert(subject != null);
3498 assert(procedure != null);
3500 final ListenerBase listener = getListenerBase(procedure);
3501 assert(listener == null);
3505 QueryCache.runnerRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<RelationInfo>() {
3508 public void execute(final ReadGraphImpl graph, RelationInfo set) {
3509 procedure.execute(graph, set);
3513 public void exception(ReadGraphImpl graph, Throwable t) {
3514 procedure.exception(graph, t);
3518 } catch (DatabaseException e) {
3519 Logger.defaultLogError(e);
3525 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3527 assert(subject != null);
3528 assert(procedure != null);
3530 final ListenerBase listener = getListenerBase(procedure);
3533 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3535 AtomicBoolean first = new AtomicBoolean(true);
3538 public void execute(final ReadGraphImpl graph, IntSet set) {
3539 // final HashSet<Resource> result = new HashSet<Resource>();
3540 // set.forEach(new TIntProcedure() {
3543 // public boolean execute(int type) {
3544 // result.add(querySupport.getResource(type));
3550 if(first.compareAndSet(true, false)) {
3551 procedure.execute(graph, set);
3552 // impl.state.barrier.dec();
3554 procedure.execute(impl.newRestart(graph), set);
3556 } catch (Throwable t2) {
3557 Logger.defaultLogError(t2);
3562 public void exception(ReadGraphImpl graph, Throwable t) {
3564 if(first.compareAndSet(true, false)) {
3565 procedure.exception(graph, t);
3566 // impl.state.barrier.dec();
3568 procedure.exception(impl.newRestart(graph), t);
3570 } catch (Throwable t2) {
3571 Logger.defaultLogError(t2);
3576 } catch (DatabaseException e) {
3577 Logger.defaultLogError(e);
3583 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3585 assert(subject != null);
3586 assert(procedure != null);
3588 final ListenerBase listener = getListenerBase(procedure);
3590 IntProcedure ip = new IntProcedureAdapter() {
3593 public void execute(final ReadGraphImpl graph, int superRelation) {
3595 procedure.execute(graph, querySupport.getResource(superRelation));
3596 } catch (Throwable t2) {
3597 Logger.defaultLogError(t2);
3602 public void finished(final ReadGraphImpl graph) {
3604 procedure.finished(graph);
3605 } catch (Throwable t2) {
3606 Logger.defaultLogError(t2);
3608 // impl.state.barrier.dec(this);
3613 public void exception(ReadGraphImpl graph, Throwable t) {
3615 procedure.exception(graph, t);
3616 } catch (Throwable t2) {
3617 Logger.defaultLogError(t2);
3619 // impl.state.barrier.dec(this);
3624 int sId = querySupport.getId(subject);
3626 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3627 // else impl.state.barrier.inc(null, null);
3630 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3631 } catch (DatabaseException e) {
3632 Logger.defaultLogError(e);
3635 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3640 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3642 assert(subject != null);
3643 assert(procedure != null);
3645 final ListenerBase listener = getListenerBase(procedure);
3647 // impl.state.barrier.inc();
3649 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3654 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3656 assert(subject != null);
3657 assert(procedure != null);
3659 final ListenerBase listener = getListenerBase(procedure);
3661 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3664 public void execute(final ReadGraphImpl graph, IntSet set) {
3665 // final HashSet<Resource> result = new HashSet<Resource>();
3666 // set.forEach(new TIntProcedure() {
3669 // public boolean execute(int type) {
3670 // result.add(querySupport.getResource(type));
3676 procedure.execute(graph, set);
3677 } catch (Throwable t2) {
3678 Logger.defaultLogError(t2);
3680 // impl.state.barrier.dec(this);
3684 public void exception(ReadGraphImpl graph, Throwable t) {
3686 procedure.exception(graph, t);
3687 } catch (Throwable t2) {
3688 Logger.defaultLogError(t2);
3690 // impl.state.barrier.dec(this);
3695 int sId = querySupport.getId(subject);
3697 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3698 // else impl.state.barrier.inc(null, null);
3701 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3702 } catch (DatabaseException e) {
3703 Logger.defaultLogError(e);
3708 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3709 return getValue(impl, querySupport.getId(subject));
3712 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3713 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3717 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3719 assert(subject != null);
3720 assert(procedure != null);
3722 int sId = querySupport.getId(subject);
3724 // if(procedure != null) {
3726 final ListenerBase listener = getListenerBase(procedure);
3728 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3730 AtomicBoolean first = new AtomicBoolean(true);
3733 public void execute(ReadGraphImpl graph, byte[] result) {
3735 if(first.compareAndSet(true, false)) {
3736 procedure.execute(graph, result);
3737 // impl.state.barrier.dec(this);
3739 procedure.execute(impl.newRestart(graph), result);
3741 } catch (Throwable t2) {
3742 Logger.defaultLogError(t2);
3747 public void exception(ReadGraphImpl graph, Throwable t) {
3749 if(first.compareAndSet(true, false)) {
3750 procedure.exception(graph, t);
3751 // impl.state.barrier.dec(this);
3753 procedure.exception(impl.newRestart(graph), t);
3755 } catch (Throwable t2) {
3756 Logger.defaultLogError(t2);
3762 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3763 // else impl.state.barrier.inc(null, null);
3766 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3767 } catch (DatabaseException e) {
3768 throw new IllegalStateException("Internal error");
3773 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3777 // throw new IllegalStateException("Internal error");
3782 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3784 assert(subject != null);
3785 assert(procedure != null);
3787 final ListenerBase listener = getListenerBase(procedure);
3789 if(impl.parent != null || listener != null) {
3791 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3793 AtomicBoolean first = new AtomicBoolean(true);
3796 public void execute(ReadGraphImpl graph, byte[] result) {
3798 if(first.compareAndSet(true, false)) {
3799 procedure.execute(graph, result);
3800 // impl.state.barrier.dec(this);
3802 procedure.execute(impl.newRestart(graph), result);
3804 } catch (Throwable t2) {
3805 Logger.defaultLogError(t2);
3810 public void exception(ReadGraphImpl graph, Throwable t) {
3812 if(first.compareAndSet(true, false)) {
3813 procedure.exception(graph, t);
3814 // impl.state.barrier.dec(this);
3816 procedure.exception(impl.newRestart(graph), t);
3818 } catch (Throwable t2) {
3819 Logger.defaultLogError(t2);
3825 int sId = querySupport.getId(subject);
3827 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3828 // else impl.state.barrier.inc(null, null);
3831 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3832 } catch (DatabaseException e) {
3833 Logger.defaultLogError(e);
3838 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3841 public void execute(ReadGraphImpl graph, byte[] result) {
3843 procedure.execute(graph, result);
3848 public void exception(ReadGraphImpl graph, Throwable t) {
3850 procedure.exception(graph, t);
3856 int sId = querySupport.getId(subject);
3859 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3860 } catch (DatabaseException e) {
3861 Logger.defaultLogError(e);
3869 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3871 assert(relation != null);
3872 assert(procedure != null);
3874 final ListenerBase listener = getListenerBase(procedure);
3876 IntProcedure ip = new IntProcedure() {
3878 private int result = 0;
3880 final AtomicBoolean found = new AtomicBoolean(false);
3881 final AtomicBoolean done = new AtomicBoolean(false);
3884 public void finished(ReadGraphImpl graph) {
3886 // Shall fire exactly once!
3887 if(done.compareAndSet(false, true)) {
3890 procedure.exception(graph, new NoInverseException(""));
3891 // impl.state.barrier.dec(this);
3893 procedure.execute(graph, querySupport.getResource(result));
3894 // impl.state.barrier.dec(this);
3896 } catch (Throwable t) {
3897 Logger.defaultLogError(t);
3904 public void execute(ReadGraphImpl graph, int i) {
3906 if(found.compareAndSet(false, true)) {
3909 // Shall fire exactly once!
3910 if(done.compareAndSet(false, true)) {
3912 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3913 // impl.state.barrier.dec(this);
3914 } catch (Throwable t) {
3915 Logger.defaultLogError(t);
3923 public void exception(ReadGraphImpl graph, Throwable t) {
3924 // Shall fire exactly once!
3925 if(done.compareAndSet(false, true)) {
3927 procedure.exception(graph, t);
3928 // impl.state.barrier.dec(this);
3929 } catch (Throwable t2) {
3930 Logger.defaultLogError(t2);
3937 int sId = querySupport.getId(relation);
3939 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3940 // else impl.state.barrier.inc(null, null);
3943 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3944 } catch (DatabaseException e) {
3945 Logger.defaultLogError(e);
3951 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3954 assert(procedure != null);
3956 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3959 public void execute(ReadGraphImpl graph, Integer result) {
3961 procedure.execute(graph, querySupport.getResource(result));
3962 } catch (Throwable t2) {
3963 Logger.defaultLogError(t2);
3965 // impl.state.barrier.dec(this);
3969 public void exception(ReadGraphImpl graph, Throwable t) {
3972 procedure.exception(graph, t);
3973 } catch (Throwable t2) {
3974 Logger.defaultLogError(t2);
3976 // impl.state.barrier.dec(this);
3981 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3982 // else impl.state.barrier.inc(null, null);
3984 forResource(impl, id, impl.parent, ip);
3989 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3992 assert(procedure != null);
3994 // impl.state.barrier.inc();
3997 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
4000 public void execute(ReadGraphImpl graph, Integer result) {
4002 procedure.execute(graph, querySupport.getResource(result));
4003 } catch (Throwable t2) {
4004 Logger.defaultLogError(t2);
4006 // impl.state.barrier.dec();
4010 public void exception(ReadGraphImpl graph, Throwable t) {
4012 procedure.exception(graph, t);
4013 } catch (Throwable t2) {
4014 Logger.defaultLogError(t2);
4016 // impl.state.barrier.dec();
4020 } catch (DatabaseException e) {
4021 Logger.defaultLogError(e);
4027 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4029 assert(subject != null);
4030 assert(procedure != null);
4032 final ListenerBase listener = getListenerBase(procedure);
4035 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
4036 procedure.execute(impl, !result.isEmpty());
4037 } catch (DatabaseException e) {
4038 procedure.exception(impl, e);
4044 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
4046 assert(subject != null);
4047 assert(predicate != null);
4048 assert(procedure != null);
4050 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
4052 boolean found = false;
4055 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4060 synchronized public void finished(AsyncReadGraph graph) {
4062 procedure.execute(graph, found);
4063 } catch (Throwable t2) {
4064 Logger.defaultLogError(t2);
4066 // impl.state.barrier.dec(this);
4070 public void exception(AsyncReadGraph graph, Throwable t) {
4072 procedure.exception(graph, t);
4073 } catch (Throwable t2) {
4074 Logger.defaultLogError(t2);
4076 // impl.state.barrier.dec(this);
4081 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
4082 // else impl.state.barrier.inc(null, null);
4084 forEachObject(impl, subject, predicate, ip);
4089 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
4091 assert(subject != null);
4092 assert(predicate != null);
4093 assert(procedure != null);
4095 // impl.state.barrier.inc();
4097 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
4099 boolean found = false;
4102 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4103 if(resource.equals(object)) found = true;
4107 synchronized public void finished(AsyncReadGraph graph) {
4109 procedure.execute(graph, found);
4110 } catch (Throwable t2) {
4111 Logger.defaultLogError(t2);
4113 // impl.state.barrier.dec();
4117 public void exception(AsyncReadGraph graph, Throwable t) {
4119 procedure.exception(graph, t);
4120 } catch (Throwable t2) {
4121 Logger.defaultLogError(t2);
4123 // impl.state.barrier.dec();
4131 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4133 assert(subject != null);
4134 assert(procedure != null);
4136 final ListenerBase listener = getListenerBase(procedure);
4138 // impl.state.barrier.inc();
4141 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
4144 public void execute(ReadGraphImpl graph, byte[] object) {
4145 boolean result = object != null;
4147 procedure.execute(graph, result);
4148 } catch (Throwable t2) {
4149 Logger.defaultLogError(t2);
4151 // impl.state.barrier.dec();
4155 public void exception(ReadGraphImpl graph, Throwable t) {
4157 procedure.exception(graph, t);
4158 } catch (Throwable t2) {
4159 Logger.defaultLogError(t2);
4161 // impl.state.barrier.dec();
4165 } catch (DatabaseException e) {
4166 Logger.defaultLogError(e);
4172 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4174 assert(subject != null);
4175 assert(procedure != null);
4177 final ListenerBase listener = getListenerBase(procedure);
4181 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
4184 public void exception(ReadGraphImpl graph, Throwable t) {
4186 procedure.exception(graph, t);
4187 } catch (Throwable t2) {
4188 Logger.defaultLogError(t2);
4190 // impl.state.barrier.dec();
4194 public void execute(ReadGraphImpl graph, int i) {
4196 procedure.execute(graph, querySupport.getResource(i));
4197 } catch (Throwable t2) {
4198 Logger.defaultLogError(t2);
4203 public void finished(ReadGraphImpl graph) {
4205 procedure.finished(graph);
4206 } catch (Throwable t2) {
4207 Logger.defaultLogError(t2);
4209 // impl.state.barrier.dec();
4213 } catch (DatabaseException e) {
4214 Logger.defaultLogError(e);
4220 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
4222 // assert(request != null);
4223 // assert(procedure != null);
4225 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
4230 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
4232 // assert(graph != null);
4233 // assert(request != null);
4235 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
4236 // if(entry != null && entry.isReady()) {
4237 // return (T)entry.get(graph, this, null);
4239 // return request.perform(graph);
4244 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
4246 // assert(graph != null);
4247 // assert(request != null);
4249 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
4250 // if(entry != null && entry.isReady()) {
4251 // if(entry.isExcepted()) {
4252 // Throwable t = (Throwable)entry.getResult();
4253 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4254 // else throw new DatabaseException(t);
4256 // return (T)entry.getResult();
4260 // final DataContainer<T> result = new DataContainer<T>();
4261 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
4263 // request.register(graph, new Listener<T>() {
4266 // public void exception(Throwable t) {
4267 // exception.set(t);
4271 // public void execute(T t) {
4276 // public boolean isDisposed() {
4282 // Throwable t = exception.get();
4284 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4285 // else throw new DatabaseException(t);
4288 // return result.get();
4295 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
4297 // assert(graph != null);
4298 // assert(request != null);
4300 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
4301 // if(entry != null && entry.isReady()) {
4302 // if(entry.isExcepted()) {
4303 // procedure.exception(graph, (Throwable)entry.getResult());
4305 // procedure.execute(graph, (T)entry.getResult());
4308 // request.perform(graph, procedure);
4314 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4316 assert(request != null);
4317 assert(procedure != null);
4321 queryMultiRead(impl, request, parent, listener, procedure);
4323 } catch (DatabaseException e) {
4325 throw new IllegalStateException(e);
4332 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4334 assert(request != null);
4335 assert(procedure != null);
4337 // impl.state.barrier.inc();
4339 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4341 public void execute(AsyncReadGraph graph, T result) {
4344 procedure.execute(graph, result);
4345 } catch (Throwable t2) {
4346 Logger.defaultLogError(t2);
4351 public void finished(AsyncReadGraph graph) {
4354 procedure.finished(graph);
4355 } catch (Throwable t2) {
4356 Logger.defaultLogError(t2);
4359 // impl.state.barrier.dec();
4364 public String toString() {
4365 return procedure.toString();
4369 public void exception(AsyncReadGraph graph, Throwable t) {
4372 procedure.exception(graph, t);
4373 } catch (Throwable t2) {
4374 Logger.defaultLogError(t2);
4377 // impl.state.barrier.dec();
4386 // final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
4388 // assert(request != null);
4389 // assert(procedure != null);
4393 // queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4396 // public String toString() {
4397 // return procedure.toString();
4401 // public void execute(AsyncReadGraph graph, T result) {
4403 // procedure.execute(result);
4404 // } catch (Throwable t2) {
4405 // Logger.defaultLogError(t2);
4410 // public void exception(AsyncReadGraph graph, Throwable throwable) {
4412 // procedure.exception(throwable);
4413 // } catch (Throwable t2) {
4414 // Logger.defaultLogError(t2);
4420 // } catch (DatabaseException e) {
4422 // throw new IllegalStateException(e);
4429 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4431 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4436 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4438 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4443 public VirtualGraph getValueProvider(Resource subject) {
4445 return querySupport.getValueProvider(querySupport.getId(subject));
4449 public boolean resumeTasks(ReadGraphImpl graph) {
4451 return querySupport.resume(graph);
4455 public boolean isImmutable(int resourceId) {
4456 return querySupport.isImmutable(resourceId);
4459 public boolean isImmutable(Resource resource) {
4460 ResourceImpl impl = (ResourceImpl)resource;
4461 return isImmutable(impl.id);
4466 public Layer0 getL0(ReadGraph graph) {
4468 L0 = Layer0.getInstance(graph);
4473 public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
4474 protected Integer initialValue() {