1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
37 import org.simantics.databoard.Bindings;
38 import org.simantics.db.AsyncReadGraph;
39 import org.simantics.db.DevelopmentKeys;
40 import org.simantics.db.DirectStatements;
41 import org.simantics.db.ReadGraph;
42 import org.simantics.db.RelationInfo;
43 import org.simantics.db.Resource;
44 import org.simantics.db.Session;
45 import org.simantics.db.Statement;
46 import org.simantics.db.VirtualGraph;
47 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
48 import org.simantics.db.common.utils.Logger;
49 import org.simantics.db.debug.ListenerReport;
50 import org.simantics.db.exception.DatabaseException;
51 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
52 import org.simantics.db.exception.NoInverseException;
53 import org.simantics.db.exception.ResourceNotFoundException;
54 import org.simantics.db.impl.DebugPolicy;
55 import org.simantics.db.impl.ResourceImpl;
56 import org.simantics.db.impl.graph.MultiIntProcedure;
57 import org.simantics.db.impl.graph.ReadGraphImpl;
58 import org.simantics.db.impl.graph.ReadGraphSupport;
59 import org.simantics.db.impl.graph.WriteGraphImpl;
60 import org.simantics.db.impl.procedure.IntProcedureAdapter;
61 import org.simantics.db.impl.procedure.InternalProcedure;
62 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
63 import org.simantics.db.impl.support.ResourceSupport;
64 import org.simantics.db.procedure.AsyncMultiListener;
65 import org.simantics.db.procedure.AsyncMultiProcedure;
66 import org.simantics.db.procedure.AsyncProcedure;
67 import org.simantics.db.procedure.AsyncSetListener;
68 import org.simantics.db.procedure.ListenerBase;
69 import org.simantics.db.procedure.MultiProcedure;
70 import org.simantics.db.procedure.Procedure;
71 import org.simantics.db.procedure.StatementProcedure;
72 import org.simantics.db.request.AsyncMultiRead;
73 import org.simantics.db.request.AsyncRead;
74 import org.simantics.db.request.ExternalRead;
75 import org.simantics.db.request.MultiRead;
76 import org.simantics.db.request.Read;
77 import org.simantics.db.request.RequestFlags;
78 import org.simantics.db.request.WriteTraits;
79 import org.simantics.layer0.Layer0;
80 import org.simantics.utils.DataContainer;
81 import org.simantics.utils.Development;
82 import org.simantics.utils.datastructures.Pair;
83 import org.simantics.utils.datastructures.collections.CollectionUtils;
84 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
86 import gnu.trove.procedure.TIntProcedure;
87 import gnu.trove.procedure.TLongProcedure;
88 import gnu.trove.procedure.TObjectProcedure;
89 import gnu.trove.set.hash.THashSet;
90 import gnu.trove.set.hash.TIntHashSet;
92 @SuppressWarnings({"rawtypes", "unchecked"})
93 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
95 public static int indent = 0;
100 public int boundQueries = 0;
103 final private int functionalRelation;
105 final private int superrelationOf;
107 final private int instanceOf;
109 final private int inverseOf;
111 final private int asserts;
113 final private int hasPredicate;
115 final private int hasPredicateInverse;
117 final private int hasObject;
119 final private int inherits;
121 final private int subrelationOf;
123 final private int rootLibrary;
126 * A cache for the root library resource. Initialized in
127 * {@link #getRootLibraryResource()}.
129 private volatile ResourceImpl rootLibraryResource;
131 final private int library;
133 final private int consistsOf;
135 final private int hasName;
137 AtomicInteger sleepers = new AtomicInteger(0);
139 private boolean updating = false;
142 private boolean firingListeners = false;
144 final public QueryCache cache;
145 final public QuerySupport querySupport;
146 final public Session session;
147 final public ResourceSupport resourceSupport;
149 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
151 QueryThread[] executors;
153 public ArrayList<SessionTask>[] queues;
157 INIT, RUN, SLEEP, DISPOSED
161 public ThreadState[] threadStates;
162 public ReentrantLock[] threadLocks;
163 public Condition[] threadConditions;
165 public ArrayList<SessionTask>[] ownTasks;
167 public ArrayList<SessionTask>[] ownSyncTasks;
169 ArrayList<SessionTask>[] delayQueues;
171 public boolean synch = true;
173 final Object querySupportLock;
175 public Long modificationCounter = 0L;
177 public void close() {
180 final public void scheduleOwn(int caller, SessionTask request) {
181 ownTasks[caller].add(request);
184 final public void scheduleAlways(int caller, SessionTask request) {
186 int performer = request.thread;
187 if(caller == performer) {
188 ownTasks[caller].add(request);
190 schedule(caller, request);
195 final public void schedule(int caller, SessionTask request) {
197 int performer = request.thread;
199 if(DebugPolicy.SCHEDULE)
200 System.out.println("schedule " + request + " " + caller + " -> " + performer);
202 assert(performer >= 0);
204 assert(request != null);
206 if(caller == performer) {
209 ReentrantLock queueLock = threadLocks[performer];
211 queues[performer].add(request);
212 // This thread could have been sleeping
213 if(queues[performer].size() == 1) {
214 if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
215 threadConditions[performer].signalAll();
224 final public int THREAD_MASK;
226 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
228 public static abstract class SessionTask {
230 final public int thread;
231 final public int syncCaller;
232 final public Object object;
234 public SessionTask(WriteTraits object, int thread) {
235 this.thread = thread;
236 this.syncCaller = -1;
237 this.object = object;
240 public SessionTask(Object object, int thread, int syncCaller) {
241 this.thread = thread;
242 this.syncCaller = syncCaller;
243 this.object = object;
246 public abstract void run(int thread);
249 public String toString() {
250 return "SessionTask[" + object + "]";
255 public static abstract class SessionRead extends SessionTask {
257 final public Semaphore notify;
258 final public DataContainer<Throwable> throwable;
260 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
261 super(object, thread, thread);
262 this.throwable = throwable;
263 this.notify = notify;
266 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
267 super(object, thread, syncThread);
268 this.throwable = throwable;
269 this.notify = notify;
274 long waitingTime = 0;
277 static int koss2 = 0;
279 public boolean resume(ReadGraphImpl graph) {
280 return executors[0].runSynchronized();
283 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
284 throws DatabaseException {
287 THREAD_MASK = threads - 1;
290 cache = new QueryCache(core, threads);
291 session = querySupport.getSession();
292 resourceSupport = querySupport.getSupport();
293 querySupportLock = core.getLock();
295 executors = new QueryThread[THREADS];
296 queues = new ArrayList[THREADS];
297 threadLocks = new ReentrantLock[THREADS];
298 threadConditions = new Condition[THREADS];
299 threadStates = new ThreadState[THREADS];
300 ownTasks = new ArrayList[THREADS];
301 ownSyncTasks = new ArrayList[THREADS];
302 delayQueues = new ArrayList[THREADS * THREADS];
304 // freeSchedule = new AtomicInteger(0);
306 for (int i = 0; i < THREADS * THREADS; i++) {
307 delayQueues[i] = new ArrayList<SessionTask>();
310 for (int i = 0; i < THREADS; i++) {
312 // tasks[i] = new ArrayList<Runnable>();
313 ownTasks[i] = new ArrayList<SessionTask>();
314 ownSyncTasks[i] = new ArrayList<SessionTask>();
315 queues[i] = new ArrayList<SessionTask>();
316 threadLocks[i] = new ReentrantLock();
317 threadConditions[i] = threadLocks[i].newCondition();
318 // limits[i] = false;
319 threadStates[i] = ThreadState.INIT;
323 for (int i = 0; i < THREADS; i++) {
327 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
329 threadSet.add(executors[i]);
334 for (int i = 0; i < THREADS; i++) {
335 executors[i].start();
338 // Make sure that query threads are up and running
339 while(sleepers.get() != THREADS) {
342 } catch (InterruptedException e) {
347 rootLibrary = core.getBuiltin("http:/");
348 boolean builtinsInstalled = rootLibrary != 0;
350 if (builtinsInstalled) {
351 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
352 assert (functionalRelation != 0);
354 functionalRelation = 0;
356 if (builtinsInstalled) {
357 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
358 assert (instanceOf != 0);
362 if (builtinsInstalled) {
363 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
364 assert (inverseOf != 0);
369 if (builtinsInstalled) {
370 inherits = core.getBuiltin(Layer0.URIs.Inherits);
371 assert (inherits != 0);
375 if (builtinsInstalled) {
376 asserts = core.getBuiltin(Layer0.URIs.Asserts);
377 assert (asserts != 0);
381 if (builtinsInstalled) {
382 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
383 assert (hasPredicate != 0);
387 if (builtinsInstalled) {
388 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
389 assert (hasPredicateInverse != 0);
391 hasPredicateInverse = 0;
393 if (builtinsInstalled) {
394 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
395 assert (hasObject != 0);
399 if (builtinsInstalled) {
400 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
401 assert (subrelationOf != 0);
405 if (builtinsInstalled) {
406 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
407 assert (superrelationOf != 0);
411 if (builtinsInstalled) {
412 library = core.getBuiltin(Layer0.URIs.Library);
413 assert (library != 0);
417 if (builtinsInstalled) {
418 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
419 assert (consistsOf != 0);
423 if (builtinsInstalled) {
424 hasName = core.getBuiltin(Layer0.URIs.HasName);
425 assert (hasName != 0);
431 final public void releaseWrite(ReadGraphImpl graph) {
432 performDirtyUpdates(graph);
433 modificationCounter++;
436 final public int getId(final Resource r) {
437 return querySupport.getId(r);
440 public QuerySupport getCore() {
444 public int getFunctionalRelation() {
445 return functionalRelation;
448 public int getInherits() {
452 public int getInstanceOf() {
456 public int getInverseOf() {
460 public int getSubrelationOf() {
461 return subrelationOf;
464 public int getSuperrelationOf() {
465 return superrelationOf;
468 public int getAsserts() {
472 public int getHasPredicate() {
476 public int getHasPredicateInverse() {
477 return hasPredicateInverse;
480 public int getHasObject() {
484 public int getRootLibrary() {
488 public Resource getRootLibraryResource() {
489 if (rootLibraryResource == null) {
490 // Synchronization is not needed here, it doesn't matter if multiple
491 // threads simultaneously set rootLibraryResource once.
492 int root = getRootLibrary();
494 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
495 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
497 return rootLibraryResource;
500 public int getLibrary() {
504 public int getConsistsOf() {
508 public int getHasName() {
512 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
516 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
519 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
521 if (result != null && result != 0) {
522 procedure.execute(graph, result);
526 // Fall back to using the fixed builtins.
527 // result = querySupport.getBuiltin(id);
528 // if (result != 0) {
529 // procedure.execute(graph, result);
534 // result = querySupport.getRandomAccessReference(id);
535 // } catch (ResourceNotFoundException e) {
536 // procedure.exception(graph, e);
541 procedure.execute(graph, result);
543 procedure.exception(graph, new ResourceNotFoundException(id));
549 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
550 procedure.exception(graph, t);
554 } catch (DatabaseException e) {
555 Logger.defaultLogError(e);
560 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
562 Integer result = querySupport.getBuiltin(id);
564 procedure.execute(graph, result);
566 procedure.exception(graph, new ResourceNotFoundException(id));
571 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
574 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
575 } catch (DatabaseException e) {
576 throw new IllegalStateException(e);
581 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
585 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
586 } catch (DatabaseException e) {
587 throw new IllegalStateException(e);
592 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
593 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
597 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
599 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
603 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
605 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
609 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
611 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
615 boolean isBound(ExternalReadEntry<?> entry) {
616 if(entry.hasParents()) return true;
617 else if(hasListener(entry)) return true;
621 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
623 if (parent != null && !inferred) {
625 if(!child.isImmutable(graph))
626 child.addParent(parent);
627 } catch (DatabaseException e) {
628 Logger.defaultLogError(e);
630 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
633 if (listener != null) {
634 return registerListener(child, listener, procedure);
642 static class Dummy implements InternalProcedure<Object>, IntProcedure {
645 public void execute(ReadGraphImpl graph, int i) {
649 public void finished(ReadGraphImpl graph) {
653 public void execute(ReadGraphImpl graph, Object result) {
657 public void exception(ReadGraphImpl graph, Throwable throwable) {
662 private static final Dummy dummy = new Dummy();
665 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
667 if (DebugPolicy.PERFORM)
668 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
671 assert (!collecting);
673 assert(query.assertNotDiscarded());
675 registerDependencies(graph, query, parent, listener, procedure, false);
677 // FRESH, REFUTED, EXCEPTED go here
678 if (!query.isReady()) {
683 query.computeForEach(graph, this, (Procedure)dummy, true);
684 return query.get(graph, this, null);
690 return query.get(graph, this, procedure);
698 interface QueryCollectorSupport {
699 public CacheCollectionResult allCaches();
700 public Collection<CacheEntry> getRootList();
701 public int getCurrentSize();
702 public int calculateCurrentSize();
703 public CacheEntryBase iterate(int level);
704 public void remove();
705 public void setLevel(CacheEntryBase entry, int level);
706 public boolean start(boolean flush);
709 interface QueryCollector {
711 public void collect(int youngTarget, int allowedTimeInMs);
715 class QueryCollectorSupportImpl implements QueryCollectorSupport {
717 private static final boolean DEBUG = false;
718 private static final double ITERATION_RATIO = 0.2;
720 private CacheCollectionResult iteration = new CacheCollectionResult();
721 private boolean fresh = true;
722 private boolean needDataInStart = true;
724 QueryCollectorSupportImpl() {
728 public CacheCollectionResult allCaches() {
729 CacheCollectionResult result = new CacheCollectionResult();
730 QueryProcessor.this.allCaches(result);
735 public boolean start(boolean flush) {
736 // We need new data from query maps
738 if(needDataInStart || flush) {
739 // Last run ended after processing all queries => refresh data
740 restart(flush ? 0.0 : ITERATION_RATIO);
742 // continue with previous big data
744 // Notify caller about iteration situation
745 return iteration.isAtStart();
748 private void restart(double targetRatio) {
750 needDataInStart = true;
752 long start = System.nanoTime();
755 // We need new data from query maps
757 int iterationSize = iteration.size()+1;
758 int diff = calculateCurrentSize()-iterationSize;
760 double ratio = (double)diff / (double)iterationSize;
761 boolean dirty = Math.abs(ratio) >= targetRatio;
764 iteration = allCaches();
766 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
767 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
768 System.err.print(" " + iteration.levels[i].size());
769 System.err.println("");
776 needDataInStart = false;
778 // We are returning here within the same GC round - reuse the cache table
787 public CacheEntryBase iterate(int level) {
789 CacheEntryBase entry = iteration.next(level);
791 restart(ITERATION_RATIO);
795 while(entry != null && entry.isDiscarded()) {
796 entry = iteration.next(level);
804 public void remove() {
809 public void setLevel(CacheEntryBase entry, int level) {
810 iteration.setLevel(entry, level);
813 public Collection<CacheEntry> getRootList() {
814 return cache.getRootList();
818 public int calculateCurrentSize() {
819 return cache.calculateCurrentSize();
823 public int getCurrentSize() {
828 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
830 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
831 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
833 public int querySize() {
837 public void gc(int youngTarget, int allowedTimeInMs) {
839 collector.collect(youngTarget, allowedTimeInMs);
843 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
845 assert (entry != null);
847 if (base.isDisposed())
850 return addListener(entry, base, procedure);
854 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
855 entry.setLastKnown(result);
858 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
860 assert (entry != null);
861 assert (procedure != null);
863 ArrayList<ListenerEntry> list = cache.listeners.get(entry);
865 list = new ArrayList<ListenerEntry>(1);
866 cache.listeners.put(entry, list);
869 ListenerEntry result = new ListenerEntry(entry, base, procedure);
870 int currentIndex = list.indexOf(result);
871 // There was already a listener
872 if(currentIndex > -1) {
873 ListenerEntry current = list.get(currentIndex);
874 if(!current.base.isDisposed()) return null;
875 list.set(currentIndex, result);
880 if(DebugPolicy.LISTENER) {
881 new Exception().printStackTrace();
882 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
889 private void scheduleListener(ListenerEntry entry) {
890 assert (entry != null);
891 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
892 scheduledListeners.add(entry);
895 private void removeListener(ListenerEntry entry) {
896 assert (entry != null);
897 ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
898 if(list == null) return;
899 boolean success = list.remove(entry);
902 cache.listeners.remove(entry.entry);
905 private boolean hasListener(CacheEntry entry) {
906 if(cache.listeners.get(entry) != null) return true;
910 boolean hasListenerAfterDisposing(CacheEntry entry) {
911 if(cache.listeners.get(entry) != null) {
912 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
913 ArrayList<ListenerEntry> list = null;
914 for (ListenerEntry e : entries) {
915 if (e.base.isDisposed()) {
916 if(list == null) list = new ArrayList<ListenerEntry>();
921 for (ListenerEntry e : list) {
925 if (entries.isEmpty()) {
926 cache.listeners.remove(entry);
934 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
935 hasListenerAfterDisposing(entry);
936 if(cache.listeners.get(entry) != null)
937 return cache.listeners.get(entry);
939 return Collections.emptyList();
942 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
944 if(!workarea.containsKey(entry)) {
946 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
947 for(ListenerEntry e : getListenerEntries(entry))
950 workarea.put(entry, ls);
952 for(CacheEntry parent : entry.getParents(this)) {
953 processListenerReport(parent, workarea);
954 ls.addAll(workarea.get(parent));
961 public synchronized ListenerReport getListenerReport() throws IOException {
963 class ListenerReportImpl implements ListenerReport {
965 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
968 public void print(PrintStream b) {
969 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
970 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
971 for(ListenerBase l : e.getValue()) {
972 Integer i = hist.get(l);
973 hist.put(l, i != null ? i-1 : -1);
977 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
978 b.print("" + -p.second + " " + p.first + "\n");
986 ListenerReportImpl result = new ListenerReportImpl();
988 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
989 for(CacheEntryBase entry : all) {
990 hasListenerAfterDisposing(entry);
992 for(CacheEntryBase entry : all) {
993 processListenerReport(entry, result.workarea);
1000 public synchronized String reportListeners(File file) throws IOException {
1005 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1006 ListenerReport report = getListenerReport();
1009 return "Done reporting listeners.";
1013 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1015 if(entry.isDiscarded()) return;
1016 if(workarea.containsKey(entry)) return;
1018 Iterable<CacheEntry> parents = entry.getParents(this);
1019 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1020 for(CacheEntry e : parents) {
1021 if(e.isDiscarded()) continue;
1023 processParentReport(e, workarea);
1025 workarea.put(entry, ps);
1029 public synchronized String reportQueryActivity(File file) throws IOException {
1031 System.err.println("reportQueries " + file.getAbsolutePath());
1036 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1038 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
1039 Collections.reverse(entries);
1041 for(Pair<String,Integer> entry : entries) {
1042 b.println(entry.first + ": " + entry.second);
1047 Development.histogram.clear();
1053 public synchronized String reportQueries(File file) throws IOException {
1055 System.err.println("reportQueries " + file.getAbsolutePath());
1060 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1062 long start = System.nanoTime();
1064 // ArrayList<CacheEntry> all = ;
1066 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
1067 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1068 for(CacheEntryBase entry : caches) {
1069 processParentReport(entry, workarea);
1072 // for(CacheEntry e : all) System.err.println("entry: " + e);
1074 long duration = System.nanoTime() - start;
1075 System.err.println("Query root set in " + 1e-9*duration + "s.");
1077 start = System.nanoTime();
1079 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
1083 for(CacheEntry entry : workarea.keySet()) {
1084 boolean listener = hasListenerAfterDisposing(entry);
1085 boolean hasParents = entry.getParents(this).iterator().hasNext();
1088 flagMap.put(entry, 0);
1089 } else if (!hasParents) {
1091 flagMap.put(entry, 1);
1094 flagMap.put(entry, 2);
1096 // // Write leaf bit
1097 // entry.flags |= 4;
1100 boolean done = true;
1107 long start2 = System.nanoTime();
1109 int boundCounter = 0;
1110 int unboundCounter = 0;
1111 int unknownCounter = 0;
1113 for(CacheEntry<?> entry : workarea.keySet()) {
1115 //System.err.println("process " + entry);
1117 int flags = flagMap.get(entry);
1118 int bindStatus = flags & 3;
1120 if(bindStatus == 0) boundCounter++;
1121 else if(bindStatus == 1) unboundCounter++;
1122 else if(bindStatus == 2) unknownCounter++;
1124 if(bindStatus < 2) continue;
1127 for(CacheEntry parent : entry.getParents(this)) {
1129 if(parent.isDiscarded()) flagMap.put(parent, 1);
1131 int flags2 = flagMap.get(parent);
1132 int bindStatus2 = flags2 & 3;
1133 // Parent is bound => child is bound
1134 if(bindStatus2 == 0) {
1138 // Parent is unknown => child is unknown
1139 else if (bindStatus2 == 2) {
1146 flagMap.put(entry, newStatus);
1150 duration = System.nanoTime() - start2;
1151 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1152 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1154 } while(!done && loops++ < 20);
1158 for(CacheEntry entry : workarea.keySet()) {
1160 int bindStatus = flagMap.get(entry);
1161 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1167 duration = System.nanoTime() - start;
1168 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1170 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1172 for(CacheEntry entry : workarea.keySet()) {
1173 Class<?> clazz = entry.getClass();
1174 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
1175 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
1176 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
1177 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
1178 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
1179 Integer c = counts.get(clazz);
1180 if(c == null) counts.put(clazz, -1);
1181 else counts.put(clazz, c-1);
1184 b.print("// Simantics DB client query report file\n");
1185 b.print("// This file contains the following information\n");
1186 b.print("// -The amount of cached query instances per query class\n");
1187 b.print("// -The sizes of retained child sets\n");
1188 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1189 b.print("// -Followed by status, where\n");
1190 b.print("// -0=bound\n");
1191 b.print("// -1=free\n");
1192 b.print("// -2=unknown\n");
1193 b.print("// -L=has listener\n");
1194 b.print("// -List of children for each query (search for 'C <query name>')\n");
1196 b.print("----------------------------------------\n");
1198 b.print("// Queries by class\n");
1199 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1200 b.print(-p.second + " " + p.first.getName() + "\n");
1203 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1204 for(CacheEntry e : workarea.keySet())
1207 boolean changed = true;
1209 while(changed && iter++<50) {
1213 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1214 for(CacheEntry e : workarea.keySet())
1217 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1218 Integer c = hist.get(e.getKey());
1219 for(CacheEntry p : e.getValue()) {
1220 Integer i = newHist.get(p);
1221 newHist.put(p, i+c);
1224 for(CacheEntry e : workarea.keySet()) {
1225 Integer value = newHist.get(e);
1226 Integer old = hist.get(e);
1227 if(!value.equals(old)) {
1229 // System.err.println("hist " + e + ": " + old + " => " + value);
1234 System.err.println("Retained set iteration " + iter);
1238 b.print("// Queries by retained set\n");
1239 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1240 b.print("" + -p.second + " " + p.first + "\n");
1243 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1245 b.print("// Entry parent listing\n");
1246 for(CacheEntry entry : workarea.keySet()) {
1247 int status = flagMap.get(entry);
1248 boolean hasListener = hasListenerAfterDisposing(entry);
1249 b.print("Q " + entry.toString());
1251 b.print(" (L" + status + ")");
1254 b.print(" (" + status + ")");
1257 for(CacheEntry parent : workarea.get(entry)) {
1258 Collection<CacheEntry> inv = inverse.get(parent);
1260 inv = new ArrayList<CacheEntry>();
1261 inverse.put(parent, inv);
1264 b.print(" " + parent.toString());
1269 b.print("// Entry child listing\n");
1270 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1271 b.print("C " + entry.getKey().toString());
1273 for(CacheEntry child : entry.getValue()) {
1274 Integer h = hist.get(child);
1278 b.print(" <no children>");
1280 b.print(" " + child.toString());
1285 b.print("#queries: " + workarea.keySet().size() + "\n");
1286 b.print("#listeners: " + listeners + "\n");
1290 return "Dumped " + workarea.keySet().size() + " queries.";
1296 public CacheEntry caller;
1298 public CacheEntry entry;
1302 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
1303 this.caller = caller;
1305 this.indent = indent;
1310 boolean removeQuery(CacheEntry entry) {
1312 // This entry has been removed before. No need to do anything here.
1313 if(entry.isDiscarded()) return false;
1315 assert (!entry.isDiscarded());
1317 Query query = entry.getQuery();
1319 query.removeEntry(this);
1324 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1335 * @return true if this entry is being listened
1337 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1341 CacheEntry entry = e.entry;
1343 System.err.println("updateQuery " + entry);
1346 * If the dependency graph forms a DAG, some entries are inserted in the
1347 * todo list many times. They only need to be processed once though.
1349 if (entry.isDiscarded()) {
1350 if (Development.DEVELOPMENT) {
1351 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1352 System.out.print("D");
1353 for (int i = 0; i < e.indent; i++)
1354 System.out.print(" ");
1355 System.out.println(entry.getQuery());
1358 // System.err.println(" => DISCARDED");
1362 if (entry.isRefuted()) {
1363 if (Development.DEVELOPMENT) {
1364 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1365 System.out.print("R");
1366 for (int i = 0; i < e.indent; i++)
1367 System.out.print(" ");
1368 System.out.println(entry.getQuery());
1374 if (entry.isExcepted()) {
1375 if (Development.DEVELOPMENT) {
1376 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1377 System.out.print("E");
1382 if (entry.isPending()) {
1383 if (Development.DEVELOPMENT) {
1384 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1385 System.out.print("P");
1392 if (Development.DEVELOPMENT) {
1393 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1394 System.out.print("U ");
1395 for (int i = 0; i < e.indent; i++)
1396 System.out.print(" ");
1397 System.out.print(entry.getQuery());
1401 Query query = entry.getQuery();
1402 int type = query.type();
1404 boolean hasListener = hasListener(entry);
1406 if (Development.DEVELOPMENT) {
1407 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1408 if(hasListener(entry)) {
1409 System.out.println(" (L)");
1411 System.out.println("");
1416 if(entry.isPending() || entry.isExcepted()) {
1419 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1421 immediates.put(entry, entry);
1436 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1438 immediates.put(entry, entry);
1452 // System.err.println(" => FOO " + type);
1455 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1456 if(entries != null) {
1457 for (ListenerEntry le : entries) {
1458 scheduleListener(le);
1463 // If invalid, update parents
1464 if (type == RequestFlags.INVALIDATE) {
1465 updateParents(e.indent, entry, todo);
1472 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
1474 Iterable<CacheEntry> oldParents = entry.getParents(this);
1475 for (CacheEntry parent : oldParents) {
1476 // System.err.println("updateParents " + entry + " => " + parent);
1477 if(!parent.isDiscarded())
1478 todo.push(new UpdateEntry(entry, parent, indent + 2));
1483 private boolean pruneListener(ListenerEntry entry) {
1484 if (entry.base.isDisposed()) {
1485 removeListener(entry);
1493 * @param av1 an array (guaranteed)
1494 * @param av2 any object
1495 * @return <code>true</code> if the two arrays are equal
1497 private final boolean arrayEquals(Object av1, Object av2) {
1500 Class<?> c1 = av1.getClass().getComponentType();
1501 Class<?> c2 = av2.getClass().getComponentType();
1502 if (c2 == null || !c1.equals(c2))
1504 boolean p1 = c1.isPrimitive();
1505 boolean p2 = c2.isPrimitive();
1509 return Arrays.equals((Object[]) av1, (Object[]) av2);
1510 if (boolean.class.equals(c1))
1511 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1512 else if (byte.class.equals(c1))
1513 return Arrays.equals((byte[]) av1, (byte[]) av2);
1514 else if (int.class.equals(c1))
1515 return Arrays.equals((int[]) av1, (int[]) av2);
1516 else if (long.class.equals(c1))
1517 return Arrays.equals((long[]) av1, (long[]) av2);
1518 else if (float.class.equals(c1))
1519 return Arrays.equals((float[]) av1, (float[]) av2);
1520 else if (double.class.equals(c1))
1521 return Arrays.equals((double[]) av1, (double[]) av2);
1522 throw new RuntimeException("??? Contact application querySupport.");
1527 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1531 Query query = entry.getQuery();
1533 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
1535 entry.prepareRecompute(querySupport);
1537 ReadGraphImpl parentGraph = graph.withParent(entry);
1539 query.recompute(parentGraph);
1541 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1543 Object newValue = entry.getResult();
1545 if (ListenerEntry.NO_VALUE == oldValue) {
1546 if(DebugPolicy.CHANGES) {
1547 System.out.println("C " + query);
1548 System.out.println("- " + oldValue);
1549 System.out.println("- " + newValue);
1554 boolean changed = false;
1556 if (newValue != null) {
1557 if (newValue.getClass().isArray()) {
1558 changed = !arrayEquals(newValue, oldValue);
1560 changed = !newValue.equals(oldValue);
1563 changed = (oldValue != null);
1565 if(DebugPolicy.CHANGES && changed) {
1566 System.out.println("C " + query);
1567 System.out.println("- " + oldValue);
1568 System.out.println("- " + newValue);
1571 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1573 } catch (Throwable t) {
1575 Logger.defaultLogError(t);
1577 return ListenerEntry.NO_VALUE;
1583 public boolean hasScheduledUpdates() {
1584 return !scheduledListeners.isEmpty();
1587 public void performScheduledUpdates(WriteGraphImpl graph) {
1590 assert (!cache.collecting);
1591 assert (!firingListeners);
1593 firingListeners = true;
1597 // Performing may cause further events to be scheduled.
1598 while (!scheduledListeners.isEmpty()) {
1601 // graph.state.barrier.inc();
1603 // Clone current events to make new entries possible during
1605 THashSet<ListenerEntry> entries = scheduledListeners;
1606 scheduledListeners = new THashSet<ListenerEntry>();
1608 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
1610 for (ListenerEntry listenerEntry : entries) {
1612 if (pruneListener(listenerEntry)) {
1613 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
1617 final CacheEntry entry = listenerEntry.entry;
1618 assert (entry != null);
1620 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
1622 if (newValue != ListenerEntry.NOT_CHANGED) {
1623 if(DebugPolicy.LISTENER)
1624 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
1625 schedule.add(listenerEntry);
1626 listenerEntry.setLastKnown(entry.getResult());
1631 for(ListenerEntry listenerEntry : schedule) {
1632 final CacheEntry entry = listenerEntry.entry;
1633 if(DebugPolicy.LISTENER)
1634 System.out.println("Firing " + listenerEntry.procedure);
1636 if(DebugPolicy.LISTENER)
1637 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
1638 entry.performFromCache(graph, listenerEntry.procedure);
1639 } catch (Throwable t) {
1640 t.printStackTrace();
1644 // graph.state.barrier.dec();
1645 // graph.waitAsync(null);
1646 // graph.state.barrier.assertReady();
1651 firingListeners = false;
1658 * @return true if this entry still has listeners
1660 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1662 assert (!cache.collecting);
1666 boolean hadListeners = false;
1667 boolean listenersUnknown = false;
1671 assert(entry != null);
1672 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1673 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1674 todo.add(new UpdateEntry(null, entry, 0));
1678 // Walk the tree and collect immediate updates
1679 while (!todo.isEmpty()) {
1680 UpdateEntry e = todo.pop();
1681 hadListeners |= updateQuery(e, todo, immediates);
1684 if(immediates.isEmpty()) break;
1686 // Evaluate all immediate updates and collect parents to update
1687 for(CacheEntry immediate : immediates.values()) {
1689 if(immediate.isDiscarded()) {
1693 if(immediate.isExcepted()) {
1695 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1696 if (newValue != ListenerEntry.NOT_CHANGED)
1697 updateParents(0, immediate, todo);
1701 Object oldValue = immediate.getResult();
1702 Object newValue = compareTo(graph, immediate, oldValue);
1704 if (newValue != ListenerEntry.NOT_CHANGED) {
1705 updateParents(0, immediate, todo);
1707 // If not changed, keep the old value
1708 immediate.setResult(oldValue);
1709 listenersUnknown = true;
1719 } catch (Throwable t) {
1720 Logger.defaultLogError(t);
1726 return hadListeners | listenersUnknown;
1730 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1731 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1732 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1733 // Maybe use a mutex from util.concurrent?
1734 private Object primitiveUpdateLock = new Object();
1735 private THashSet scheduledPrimitiveUpdates = new THashSet();
1737 public void performDirtyUpdates(final ReadGraphImpl graph) {
1739 cache.dirty = false;
1742 if (Development.DEVELOPMENT) {
1743 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1744 System.err.println("== Query update ==");
1748 // Special case - one statement
1749 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1751 long arg0 = scheduledObjectUpdates.getFirst();
1753 final int subject = (int)(arg0 >>> 32);
1754 final int predicate = (int)(arg0 & 0xffffffff);
1756 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1757 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1758 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1760 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1761 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1762 if(principalTypes != null) update(graph, principalTypes);
1763 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1764 if(types != null) update(graph, types);
1767 if(predicate == subrelationOf) {
1768 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1769 if(superRelations != null) update(graph, superRelations);
1772 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1773 if(dp != null) update(graph, dp);
1774 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1775 if(os != null) update(graph, os);
1777 scheduledObjectUpdates.clear();
1782 // Special case - one value
1783 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1785 int arg0 = scheduledValueUpdates.getFirst();
1787 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1788 if(valueQuery != null) update(graph, valueQuery);
1790 scheduledValueUpdates.clear();
1795 final TIntHashSet predicates = new TIntHashSet();
1796 final TIntHashSet orderedSets = new TIntHashSet();
1798 THashSet primitiveUpdates;
1799 synchronized (primitiveUpdateLock) {
1800 primitiveUpdates = scheduledPrimitiveUpdates;
1801 scheduledPrimitiveUpdates = new THashSet();
1804 primitiveUpdates.forEach(new TObjectProcedure() {
1807 public boolean execute(Object arg0) {
1809 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1810 if (query != null) {
1811 boolean listening = update(graph, query);
1812 if (!listening && !query.hasParents()) {
1813 cache.externalReadEntryMap.remove(arg0);
1822 scheduledValueUpdates.forEach(new TIntProcedure() {
1825 public boolean execute(int arg0) {
1826 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1827 if(valueQuery != null) update(graph, valueQuery);
1833 scheduledInvalidates.forEach(new TIntProcedure() {
1836 public boolean execute(int resource) {
1838 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1839 if(valueQuery != null) update(graph, valueQuery);
1841 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1842 if(principalTypes != null) update(graph, principalTypes);
1843 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1844 if(types != null) update(graph, types);
1846 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1847 if(superRelations != null) update(graph, superRelations);
1849 predicates.add(resource);
1856 scheduledObjectUpdates.forEach(new TLongProcedure() {
1859 public boolean execute(long arg0) {
1861 final int subject = (int)(arg0 >>> 32);
1862 final int predicate = (int)(arg0 & 0xffffffff);
1864 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1865 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1866 if(principalTypes != null) update(graph, principalTypes);
1867 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1868 if(types != null) update(graph, types);
1871 if(predicate == subrelationOf) {
1872 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1873 if(superRelations != null) update(graph, superRelations);
1876 predicates.add(subject);
1877 orderedSets.add(predicate);
1885 predicates.forEach(new TIntProcedure() {
1888 public boolean execute(final int subject) {
1890 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1891 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1892 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1894 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1895 if(entry != null) update(graph, entry);
1903 orderedSets.forEach(new TIntProcedure() {
1906 public boolean execute(int orderedSet) {
1908 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1909 if(entry != null) update(graph, entry);
1917 // for (Integer subject : predicates) {
1918 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
1919 // if(entry != null) update(graph, entry);
1923 if (Development.DEVELOPMENT) {
1924 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1925 System.err.println("== Query update ends ==");
1929 scheduledValueUpdates.clear();
1930 scheduledObjectUpdates.clear();
1931 scheduledInvalidates.clear();
1935 public void updateValue(final int resource) {
1936 scheduledValueUpdates.add(resource);
1940 public void updateStatements(final int resource, final int predicate) {
1941 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
1945 private int lastInvalidate = 0;
1947 public void invalidateResource(final int resource) {
1948 if(lastInvalidate == resource) return;
1949 scheduledValueUpdates.add(resource);
1950 lastInvalidate = resource;
1954 public void updatePrimitive(final ExternalRead primitive) {
1956 // External reads may be updated from arbitrary threads.
1957 // Synchronize to prevent race-conditions.
1958 synchronized (primitiveUpdateLock) {
1959 scheduledPrimitiveUpdates.add(primitive);
1961 querySupport.dirtyPrimitives();
1966 public synchronized String toString() {
1967 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
1971 protected void doDispose() {
1973 for(int index = 0; index < THREADS; index++) {
1974 executors[index].dispose();
1978 for(int i=0;i<100;i++) {
1980 boolean alive = false;
1981 for(int index = 0; index < THREADS; index++) {
1982 alive |= executors[index].isAlive();
1987 } catch (InterruptedException e) {
1988 Logger.defaultLogError(e);
1993 // Then start interrupting
1994 for(int i=0;i<100;i++) {
1996 boolean alive = false;
1997 for(int index = 0; index < THREADS; index++) {
1998 alive |= executors[index].isAlive();
2001 for(int index = 0; index < THREADS; index++) {
2002 executors[index].interrupt();
2006 // // Then just destroy
2007 // for(int index = 0; index < THREADS; index++) {
2008 // executors[index].destroy();
2011 for(int index = 0; index < THREADS; index++) {
2013 executors[index].join(5000);
2014 } catch (InterruptedException e) {
2015 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2017 executors[index] = null;
2022 public int getHits() {
2026 public int getMisses() {
2027 return cache.misses;
2030 public int getSize() {
2034 public Set<Long> getReferencedClusters() {
2035 HashSet<Long> result = new HashSet<Long>();
2036 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
2037 Objects query = (Objects) entry.getQuery();
2038 result.add(querySupport.getClusterId(query.r1()));
2040 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
2041 DirectPredicates query = (DirectPredicates) entry.getQuery();
2042 result.add(querySupport.getClusterId(query.id));
2044 for (CacheEntry entry : cache.valueQueryMap.values()) {
2045 ValueQuery query = (ValueQuery) entry.getQuery();
2046 result.add(querySupport.getClusterId(query.id));
2051 public void assertDone() {
2054 CacheCollectionResult allCaches(CacheCollectionResult result) {
2056 return cache.allCaches(result);
2060 public void printDiagnostics() {
2063 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
2064 querySupport.requestCluster(graph, clusterId, runnable);
2067 public int clean() {
2068 collector.collect(0, Integer.MAX_VALUE);
2072 public void clean(final Collection<ExternalRead<?>> requests) {
2073 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
2074 Iterator<ExternalRead<?>> iterator = requests.iterator();
2076 public CacheCollectionResult allCaches() {
2077 throw new UnsupportedOperationException();
2080 public CacheEntryBase iterate(int level) {
2081 if(iterator.hasNext()) {
2082 ExternalRead<?> request = iterator.next();
2083 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2084 if (entry != null) return entry;
2085 else return iterate(level);
2087 iterator = requests.iterator();
2092 public void remove() {
2093 throw new UnsupportedOperationException();
2096 public void setLevel(CacheEntryBase entry, int level) {
2097 throw new UnsupportedOperationException();
2100 public Collection<CacheEntry> getRootList() {
2101 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
2102 for (ExternalRead<?> request : requests) {
2103 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2110 public int getCurrentSize() {
2114 public int calculateCurrentSize() {
2115 // This tells the collector to attempt collecting everything.
2116 return Integer.MAX_VALUE;
2119 public boolean start(boolean flush) {
2123 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
2126 public void scanPending() {
2128 cache.scanPending();
2132 public ReadGraphImpl graphForVirtualRequest() {
2133 return ReadGraphImpl.createAsync(this);
2137 private HashMap<Resource, Class<?>> builtinValues;
2139 public Class<?> getBuiltinValue(Resource r) {
2140 if(builtinValues == null) initBuiltinValues();
2141 return builtinValues.get(r);
2144 Exception callerException = null;
2146 public interface AsyncBarrier {
2149 // public void inc(String debug);
2150 // public void dec(String debug);
2153 // final public QueryProcessor processor;
2154 // final public QuerySupport support;
2156 // boolean disposed = false;
2158 private void initBuiltinValues() {
2160 Layer0 b = getSession().peekService(Layer0.class);
2161 if(b == null) return;
2163 builtinValues = new HashMap<Resource, Class<?>>();
2165 builtinValues.put(b.String, String.class);
2166 builtinValues.put(b.Double, Double.class);
2167 builtinValues.put(b.Float, Float.class);
2168 builtinValues.put(b.Long, Long.class);
2169 builtinValues.put(b.Integer, Integer.class);
2170 builtinValues.put(b.Byte, Byte.class);
2171 builtinValues.put(b.Boolean, Boolean.class);
2173 builtinValues.put(b.StringArray, String[].class);
2174 builtinValues.put(b.DoubleArray, double[].class);
2175 builtinValues.put(b.FloatArray, float[].class);
2176 builtinValues.put(b.LongArray, long[].class);
2177 builtinValues.put(b.IntegerArray, int[].class);
2178 builtinValues.put(b.ByteArray, byte[].class);
2179 builtinValues.put(b.BooleanArray, boolean[].class);
2183 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
2185 // if (null == provider2) {
2186 // this.processor = null;
2190 // this.processor = provider2;
2191 // support = provider2.getCore();
2192 // initBuiltinValues();
2196 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2197 // return new ReadGraphSupportImpl(impl.processor);
2201 final public Session getSession() {
2205 final public ResourceSupport getResourceSupport() {
2206 return resourceSupport;
2210 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2212 throw new UnsupportedOperationException();
2214 // assert(subject != null);
2215 // assert(procedure != null);
2217 // final ListenerBase listener = getListenerBase(procedure);
2219 // IntProcedure ip = new IntProcedure() {
2221 // AtomicBoolean first = new AtomicBoolean(true);
2224 // public void execute(ReadGraphImpl graph, int i) {
2226 // if(first.get()) {
2227 // procedure.execute(graph, querySupport.getResource(i));
2229 // procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2231 // } catch (Throwable t2) {
2232 // Logger.defaultLogError(t2);
2237 // public void finished(ReadGraphImpl graph) {
2239 // if(first.compareAndSet(true, false)) {
2240 // procedure.finished(graph);
2241 //// impl.state.barrier.dec(this);
2243 // procedure.finished(impl.newRestart(graph));
2246 // } catch (Throwable t2) {
2247 // Logger.defaultLogError(t2);
2252 // public void exception(ReadGraphImpl graph, Throwable t) {
2254 // if(first.compareAndSet(true, false)) {
2255 // procedure.exception(graph, t);
2257 // procedure.exception(impl.newRestart(graph), t);
2259 // } catch (Throwable t2) {
2260 // Logger.defaultLogError(t2);
2266 // int sId = querySupport.getId(subject);
2269 // QueryCache.runnerPredicates(impl, sId, impl.parent, listener, ip);
2270 // } catch (DatabaseException e) {
2271 // Logger.defaultLogError(e);
2277 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2279 throw new UnsupportedOperationException();
2281 // assert(subject != null);
2282 // assert(procedure != null);
2284 // final ListenerBase listener = getListenerBase(procedure);
2287 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2290 // public void execute(ReadGraphImpl graph, int i) {
2292 // procedure.execute(querySupport.getResource(i));
2293 // } catch (Throwable t2) {
2294 // Logger.defaultLogError(t2);
2299 // public void finished(ReadGraphImpl graph) {
2301 // procedure.finished();
2302 // } catch (Throwable t2) {
2303 // Logger.defaultLogError(t2);
2305 //// impl.state.barrier.dec();
2309 // public void exception(ReadGraphImpl graph, Throwable t) {
2311 // procedure.exception(t);
2312 // } catch (Throwable t2) {
2313 // Logger.defaultLogError(t2);
2315 //// impl.state.barrier.dec();
2319 // } catch (DatabaseException e) {
2320 // Logger.defaultLogError(e);
2326 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2328 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null, null);
2334 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2335 final Resource predicate, final MultiProcedure<Statement> procedure) {
2337 assert(subject != null);
2338 assert(predicate != null);
2339 assert(procedure != null);
2341 final ListenerBase listener = getListenerBase(procedure);
2343 // impl.state.barrier.inc();
2346 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2349 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2351 procedure.execute(querySupport.getStatement(s, p, o));
2352 } catch (Throwable t2) {
2353 Logger.defaultLogError(t2);
2358 public void finished(ReadGraphImpl graph) {
2360 procedure.finished();
2361 } catch (Throwable t2) {
2362 Logger.defaultLogError(t2);
2364 // impl.state.barrier.dec();
2368 public void exception(ReadGraphImpl graph, Throwable t) {
2370 procedure.exception(t);
2371 } catch (Throwable t2) {
2372 Logger.defaultLogError(t2);
2374 // impl.state.barrier.dec();
2378 } catch (DatabaseException e) {
2379 Logger.defaultLogError(e);
2385 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2386 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2388 assert(subject != null);
2389 assert(predicate != null);
2390 assert(procedure != null);
2392 final ListenerBase listener = getListenerBase(procedure);
2394 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2396 boolean first = true;
2399 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2402 procedure.execute(graph, querySupport.getStatement(s, p, o));
2404 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2406 } catch (Throwable t2) {
2407 Logger.defaultLogError(t2);
2412 public void finished(ReadGraphImpl graph) {
2417 procedure.finished(graph);
2418 // impl.state.barrier.dec(this);
2420 procedure.finished(impl.newRestart(graph));
2422 } catch (Throwable t2) {
2423 Logger.defaultLogError(t2);
2429 public void exception(ReadGraphImpl graph, Throwable t) {
2434 procedure.exception(graph, t);
2435 // impl.state.barrier.dec(this);
2437 procedure.exception(impl.newRestart(graph), t);
2439 } catch (Throwable t2) {
2440 Logger.defaultLogError(t2);
2447 int sId = querySupport.getId(subject);
2448 int pId = querySupport.getId(predicate);
2450 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2451 // else impl.state.barrier.inc(null, null);
2454 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2455 } catch (DatabaseException e) {
2456 Logger.defaultLogError(e);
2462 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2463 final Resource predicate, final StatementProcedure procedure) {
2465 assert(subject != null);
2466 assert(predicate != null);
2467 assert(procedure != null);
2469 final ListenerBase listener = getListenerBase(procedure);
2471 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2473 boolean first = true;
2476 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2479 procedure.execute(graph, s, p, o);
2481 procedure.execute(impl.newRestart(graph), s, p, o);
2483 } catch (Throwable t2) {
2484 Logger.defaultLogError(t2);
2489 public void finished(ReadGraphImpl graph) {
2494 procedure.finished(graph);
2495 // impl.state.barrier.dec(this);
2497 procedure.finished(impl.newRestart(graph));
2499 } catch (Throwable t2) {
2500 Logger.defaultLogError(t2);
2506 public void exception(ReadGraphImpl graph, Throwable t) {
2511 procedure.exception(graph, t);
2512 // impl.state.barrier.dec(this);
2514 procedure.exception(impl.newRestart(graph), t);
2516 } catch (Throwable t2) {
2517 Logger.defaultLogError(t2);
2524 int sId = querySupport.getId(subject);
2525 int pId = querySupport.getId(predicate);
2527 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2528 // else impl.state.barrier.inc(null, null);
2531 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2532 } catch (DatabaseException e) {
2533 Logger.defaultLogError(e);
2539 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2541 assert(subject != null);
2542 assert(predicate != null);
2543 assert(procedure != null);
2545 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2547 private Set<Statement> current = null;
2548 private Set<Statement> run = new HashSet<Statement>();
2551 public void execute(AsyncReadGraph graph, Statement result) {
2553 boolean found = false;
2555 if(current != null) {
2557 found = current.remove(result);
2561 if(!found) procedure.add(graph, result);
2568 public void finished(AsyncReadGraph graph) {
2570 if(current != null) {
2571 for(Statement r : current) procedure.remove(graph, r);
2576 run = new HashSet<Statement>();
2581 public void exception(AsyncReadGraph graph, Throwable t) {
2582 procedure.exception(graph, t);
2586 public boolean isDisposed() {
2587 return procedure.isDisposed();
2595 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2596 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2598 assert(subject != null);
2599 assert(predicate != null);
2600 assert(procedure != null);
2602 final ListenerBase listener = getListenerBase(procedure);
2604 // impl.state.barrier.inc();
2607 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2610 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2612 procedure.execute(graph, querySupport.getStatement(s, p, o));
2613 } catch (Throwable t2) {
2614 Logger.defaultLogError(t2);
2619 public void finished(ReadGraphImpl graph) {
2621 procedure.finished(graph);
2622 } catch (Throwable t2) {
2623 Logger.defaultLogError(t2);
2625 // impl.state.barrier.dec();
2629 public void exception(ReadGraphImpl graph, Throwable t) {
2631 procedure.exception(graph, t);
2632 } catch (Throwable t2) {
2633 Logger.defaultLogError(t2);
2635 // impl.state.barrier.dec();
2639 } catch (DatabaseException e) {
2640 Logger.defaultLogError(e);
2645 private static ListenerBase getListenerBase(Object procedure) {
2646 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2651 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2653 assert(subject != null);
2654 assert(predicate != null);
2655 assert(procedure != null);
2657 final ListenerBase listener = getListenerBase(procedure);
2659 // impl.state.barrier.inc();
2662 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2665 public void execute(ReadGraphImpl graph, int i) {
2667 procedure.execute(querySupport.getResource(i));
2668 } catch (Throwable t2) {
2669 Logger.defaultLogError(t2);
2674 public void finished(ReadGraphImpl graph) {
2676 procedure.finished();
2677 } catch (Throwable t2) {
2678 Logger.defaultLogError(t2);
2680 // impl.state.barrier.dec();
2684 public void exception(ReadGraphImpl graph, Throwable t) {
2685 System.out.println("forEachObject exception " + t);
2687 procedure.exception(t);
2688 } catch (Throwable t2) {
2689 Logger.defaultLogError(t2);
2691 // impl.state.barrier.dec();
2695 } catch (DatabaseException e) {
2696 Logger.defaultLogError(e);
2702 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2704 throw new UnsupportedOperationException();
2706 // assert(subject != null);
2707 // assert(procedure != null);
2709 // final ListenerBase listener = getListenerBase(procedure);
2711 // MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);
2713 // int sId = querySupport.getId(subject);
2716 // QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, proc);
2717 // } catch (DatabaseException e) {
2718 // Logger.defaultLogError(e);
2724 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {
2726 assert(subject != null);
2727 assert(procedure != null);
2729 final ListenerBase listener = getListenerBase(procedure);
2731 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2736 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2738 assert(subject != null);
2739 assert(procedure != null);
2741 final ListenerBase listener = getListenerBase(procedure);
2743 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2747 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2750 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2752 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2754 private Resource single = null;
2757 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2758 if(single == null) {
2761 single = INVALID_RESOURCE;
2766 public synchronized void finished(AsyncReadGraph graph) {
2767 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2768 else procedure.execute(graph, single);
2772 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2773 procedure.exception(graph, throwable);
2780 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2782 final int sId = querySupport.getId(subject);
2783 final int pId = querySupport.getId(predicate);
2786 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2787 } catch (DatabaseException e) {
2788 Logger.defaultLogError(e);
2793 static class Runner2Procedure implements IntProcedure {
2795 public int single = 0;
2796 public Throwable t = null;
2798 public void clear() {
2804 public void execute(ReadGraphImpl graph, int i) {
2805 if(single == 0) single = i;
2810 public void finished(ReadGraphImpl graph) {
2811 if(single == -1) single = 0;
2815 public void exception(ReadGraphImpl graph, Throwable throwable) {
2820 public int get() throws DatabaseException {
2822 if(t instanceof DatabaseException) throw (DatabaseException)t;
2823 else throw new DatabaseException(t);
2830 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2832 final int sId = querySupport.getId(subject);
2833 final int pId = querySupport.getId(predicate);
2835 Runner2Procedure proc = new Runner2Procedure();
2836 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2841 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2843 assert(subject != null);
2844 assert(predicate != null);
2846 final ListenerBase listener = getListenerBase(procedure);
2848 if(impl.parent != null || listener != null) {
2850 IntProcedure ip = new IntProcedure() {
2852 AtomicBoolean first = new AtomicBoolean(true);
2855 public void execute(ReadGraphImpl graph, int i) {
2858 procedure.execute(impl, querySupport.getResource(i));
2860 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2862 } catch (Throwable t2) {
2863 Logger.defaultLogError(t2);
2869 public void finished(ReadGraphImpl graph) {
2871 if(first.compareAndSet(true, false)) {
2872 procedure.finished(impl);
2873 // impl.state.barrier.dec(this);
2875 procedure.finished(impl.newRestart(graph));
2877 } catch (Throwable t2) {
2878 Logger.defaultLogError(t2);
2883 public void exception(ReadGraphImpl graph, Throwable t) {
2885 procedure.exception(graph, t);
2886 } catch (Throwable t2) {
2887 Logger.defaultLogError(t2);
2889 // impl.state.barrier.dec(this);
2893 public String toString() {
2894 return "forEachObject with " + procedure;
2899 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2900 // else impl.state.barrier.inc(null, null);
2902 forEachObject(impl, subject, predicate, listener, ip);
2906 IntProcedure ip = new IntProcedure() {
2909 public void execute(ReadGraphImpl graph, int i) {
2910 procedure.execute(graph, querySupport.getResource(i));
2914 public void finished(ReadGraphImpl graph) {
2915 procedure.finished(graph);
2919 public void exception(ReadGraphImpl graph, Throwable t) {
2920 procedure.exception(graph, t);
2924 public String toString() {
2925 return "forEachObject with " + procedure;
2930 forEachObject(impl, subject, predicate, listener, ip);
2937 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2939 assert(subject != null);
2940 assert(predicate != null);
2941 assert(procedure != null);
2943 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2945 private Set<Resource> current = null;
2946 private Set<Resource> run = new HashSet<Resource>();
2949 public void execute(AsyncReadGraph graph, Resource result) {
2951 boolean found = false;
2953 if(current != null) {
2955 found = current.remove(result);
2959 if(!found) procedure.add(graph, result);
2966 public void finished(AsyncReadGraph graph) {
2968 if(current != null) {
2969 for(Resource r : current) procedure.remove(graph, r);
2974 run = new HashSet<Resource>();
2979 public boolean isDisposed() {
2980 return procedure.isDisposed();
2984 public void exception(AsyncReadGraph graph, Throwable t) {
2985 procedure.exception(graph, t);
2989 public String toString() {
2990 return "forObjectSet " + procedure;
2998 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3000 assert(subject != null);
3001 assert(procedure != null);
3003 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
3005 private Set<Resource> current = null;
3006 private Set<Resource> run = new HashSet<Resource>();
3009 public void execute(AsyncReadGraph graph, Resource result) {
3011 boolean found = false;
3013 if(current != null) {
3015 found = current.remove(result);
3019 if(!found) procedure.add(graph, result);
3026 public void finished(AsyncReadGraph graph) {
3028 if(current != null) {
3029 for(Resource r : current) procedure.remove(graph, r);
3034 run = new HashSet<Resource>();
3039 public boolean isDisposed() {
3040 return procedure.isDisposed();
3044 public void exception(AsyncReadGraph graph, Throwable t) {
3045 procedure.exception(graph, t);
3049 public String toString() {
3050 return "forPredicateSet " + procedure;
3058 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3060 assert(subject != null);
3061 assert(procedure != null);
3063 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
3065 private Set<Resource> current = null;
3066 private Set<Resource> run = new HashSet<Resource>();
3069 public void execute(AsyncReadGraph graph, Resource result) {
3071 boolean found = false;
3073 if(current != null) {
3075 found = current.remove(result);
3079 if(!found) procedure.add(graph, result);
3086 public void finished(AsyncReadGraph graph) {
3088 if(current != null) {
3089 for(Resource r : current) procedure.remove(graph, r);
3094 run = new HashSet<Resource>();
3099 public boolean isDisposed() {
3100 return procedure.isDisposed();
3104 public void exception(AsyncReadGraph graph, Throwable t) {
3105 procedure.exception(graph, t);
3109 public String toString() {
3110 return "forPrincipalTypeSet " + procedure;
3118 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3120 assert(subject != null);
3121 assert(predicate != null);
3122 assert(procedure != null);
3124 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3126 private Set<Resource> current = null;
3127 private Set<Resource> run = new HashSet<Resource>();
3130 public void execute(AsyncReadGraph graph, Resource result) {
3132 boolean found = false;
3134 if(current != null) {
3136 found = current.remove(result);
3140 if(!found) procedure.add(graph, result);
3147 public void finished(AsyncReadGraph graph) {
3149 if(current != null) {
3150 for(Resource r : current) procedure.remove(graph, r);
3155 run = new HashSet<Resource>();
3160 public boolean isDisposed() {
3161 return procedure.isDisposed();
3165 public void exception(AsyncReadGraph graph, Throwable t) {
3166 procedure.exception(graph, t);
3170 public String toString() {
3171 return "forObjectSet " + procedure;
3179 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3181 assert(subject != null);
3182 assert(predicate != null);
3183 assert(procedure != null);
3185 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3187 private Set<Statement> current = null;
3188 private Set<Statement> run = new HashSet<Statement>();
3191 public void execute(AsyncReadGraph graph, Statement result) {
3193 boolean found = false;
3195 if(current != null) {
3197 found = current.remove(result);
3201 if(!found) procedure.add(graph, result);
3208 public void finished(AsyncReadGraph graph) {
3210 if(current != null) {
3211 for(Statement s : current) procedure.remove(graph, s);
3216 run = new HashSet<Statement>();
3221 public boolean isDisposed() {
3222 return procedure.isDisposed();
3226 public void exception(AsyncReadGraph graph, Throwable t) {
3227 procedure.exception(graph, t);
3231 public String toString() {
3232 return "forStatementSet " + procedure;
3240 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
3241 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3243 assert(subject != null);
3244 assert(predicate != null);
3245 assert(procedure != null);
3247 final ListenerBase listener = getListenerBase(procedure);
3250 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3253 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3255 procedure.execute(graph, querySupport.getResource(o));
3256 } catch (Throwable t2) {
3257 Logger.defaultLogError(t2);
3262 public void finished(ReadGraphImpl graph) {
3264 procedure.finished(graph);
3265 } catch (Throwable t2) {
3266 Logger.defaultLogError(t2);
3268 // impl.state.barrier.dec();
3272 public void exception(ReadGraphImpl graph, Throwable t) {
3274 procedure.exception(graph, t);
3275 } catch (Throwable t2) {
3276 Logger.defaultLogError(t2);
3278 // impl.state.barrier.dec();
3282 } catch (DatabaseException e) {
3283 Logger.defaultLogError(e);
3289 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3291 assert(subject != null);
3292 assert(procedure != null);
3294 final ListenerBase listener = getListenerBase(procedure);
3296 IntProcedure ip = new IntProcedure() {
3299 public void execute(ReadGraphImpl graph, int i) {
3301 procedure.execute(graph, querySupport.getResource(i));
3302 } catch (Throwable t2) {
3303 Logger.defaultLogError(t2);
3308 public void finished(ReadGraphImpl graph) {
3310 procedure.finished(graph);
3311 } catch (Throwable t2) {
3312 Logger.defaultLogError(t2);
3314 // impl.state.barrier.dec(this);
3318 public void exception(ReadGraphImpl graph, Throwable t) {
3320 procedure.exception(graph, t);
3321 } catch (Throwable t2) {
3322 Logger.defaultLogError(t2);
3324 // impl.state.barrier.dec(this);
3329 int sId = querySupport.getId(subject);
3331 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3332 // else impl.state.barrier.inc(null, null);
3335 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3336 } catch (DatabaseException e) {
3337 Logger.defaultLogError(e);
3343 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3345 assert(subject != null);
3346 assert(procedure != null);
3348 final ListenerBase listener = getListenerBase(procedure);
3350 // impl.state.barrier.inc();
3353 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3356 public void execute(ReadGraphImpl graph, int i) {
3358 procedure.execute(querySupport.getResource(i));
3359 } catch (Throwable t2) {
3360 Logger.defaultLogError(t2);
3365 public void finished(ReadGraphImpl graph) {
3367 procedure.finished();
3368 } catch (Throwable t2) {
3369 Logger.defaultLogError(t2);
3371 // impl.state.barrier.dec();
3375 public void exception(ReadGraphImpl graph, Throwable t) {
3377 procedure.exception(t);
3378 } catch (Throwable t2) {
3379 Logger.defaultLogError(t2);
3381 // impl.state.barrier.dec();
3385 } catch (DatabaseException e) {
3386 Logger.defaultLogError(e);
3390 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3392 assert(subject != null);
3393 assert(procedure != null);
3395 final ListenerBase listener = getListenerBase(procedure);
3396 assert(listener == null);
3398 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3401 public void execute(final ReadGraphImpl graph, IntSet set) {
3402 procedure.execute(graph, set);
3406 public void exception(ReadGraphImpl graph, Throwable t) {
3407 procedure.exception(graph, t);
3412 int sId = querySupport.getId(subject);
3415 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3416 } catch (DatabaseException e) {
3417 Logger.defaultLogError(e);
3423 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3425 assert(subject != null);
3427 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null, null);
3432 final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {
3434 assert(subject != null);
3435 assert(procedure != null);
3437 final ListenerBase listener = getListenerBase(procedure);
3438 assert(listener == null);
3442 QueryCache.runnerRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<RelationInfo>() {
3445 public void execute(final ReadGraphImpl graph, RelationInfo set) {
3446 procedure.execute(graph, set);
3450 public void exception(ReadGraphImpl graph, Throwable t) {
3451 procedure.exception(graph, t);
3455 } catch (DatabaseException e) {
3456 Logger.defaultLogError(e);
3462 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3464 assert(subject != null);
3465 assert(procedure != null);
3467 final ListenerBase listener = getListenerBase(procedure);
3470 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3472 AtomicBoolean first = new AtomicBoolean(true);
3475 public void execute(final ReadGraphImpl graph, IntSet set) {
3476 // final HashSet<Resource> result = new HashSet<Resource>();
3477 // set.forEach(new TIntProcedure() {
3480 // public boolean execute(int type) {
3481 // result.add(querySupport.getResource(type));
3487 if(first.compareAndSet(true, false)) {
3488 procedure.execute(graph, set);
3489 // impl.state.barrier.dec();
3491 procedure.execute(impl.newRestart(graph), set);
3493 } catch (Throwable t2) {
3494 Logger.defaultLogError(t2);
3499 public void exception(ReadGraphImpl graph, Throwable t) {
3501 if(first.compareAndSet(true, false)) {
3502 procedure.exception(graph, t);
3503 // impl.state.barrier.dec();
3505 procedure.exception(impl.newRestart(graph), t);
3507 } catch (Throwable t2) {
3508 Logger.defaultLogError(t2);
3513 } catch (DatabaseException e) {
3514 Logger.defaultLogError(e);
3520 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3522 assert(subject != null);
3523 assert(procedure != null);
3525 final ListenerBase listener = getListenerBase(procedure);
3527 IntProcedure ip = new IntProcedureAdapter() {
3530 public void execute(final ReadGraphImpl graph, int superRelation) {
3532 procedure.execute(graph, querySupport.getResource(superRelation));
3533 } catch (Throwable t2) {
3534 Logger.defaultLogError(t2);
3539 public void finished(final ReadGraphImpl graph) {
3541 procedure.finished(graph);
3542 } catch (Throwable t2) {
3543 Logger.defaultLogError(t2);
3545 // impl.state.barrier.dec(this);
3550 public void exception(ReadGraphImpl graph, Throwable t) {
3552 procedure.exception(graph, t);
3553 } catch (Throwable t2) {
3554 Logger.defaultLogError(t2);
3556 // impl.state.barrier.dec(this);
3561 int sId = querySupport.getId(subject);
3563 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3564 // else impl.state.barrier.inc(null, null);
3567 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3568 } catch (DatabaseException e) {
3569 Logger.defaultLogError(e);
3572 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3577 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3579 assert(subject != null);
3580 assert(procedure != null);
3582 final ListenerBase listener = getListenerBase(procedure);
3584 // impl.state.barrier.inc();
3586 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3591 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3593 assert(subject != null);
3594 assert(procedure != null);
3596 final ListenerBase listener = getListenerBase(procedure);
3598 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3601 public void execute(final ReadGraphImpl graph, IntSet set) {
3602 // final HashSet<Resource> result = new HashSet<Resource>();
3603 // set.forEach(new TIntProcedure() {
3606 // public boolean execute(int type) {
3607 // result.add(querySupport.getResource(type));
3613 procedure.execute(graph, set);
3614 } catch (Throwable t2) {
3615 Logger.defaultLogError(t2);
3617 // impl.state.barrier.dec(this);
3621 public void exception(ReadGraphImpl graph, Throwable t) {
3623 procedure.exception(graph, t);
3624 } catch (Throwable t2) {
3625 Logger.defaultLogError(t2);
3627 // impl.state.barrier.dec(this);
3632 int sId = querySupport.getId(subject);
3634 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3635 // else impl.state.barrier.inc(null, null);
3638 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3639 } catch (DatabaseException e) {
3640 Logger.defaultLogError(e);
3645 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3646 return getValue(impl, querySupport.getId(subject));
3649 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3650 return QueryCache.resultValueQuery(impl, subject, impl.parent, null, null);
3654 final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3656 assert(subject != null);
3658 int sId = querySupport.getId(subject);
3660 if(procedure != null) {
3662 final ListenerBase listener = getListenerBase(procedure);
3664 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3666 AtomicBoolean first = new AtomicBoolean(true);
3669 public void execute(ReadGraphImpl graph, byte[] result) {
3671 if(first.compareAndSet(true, false)) {
3672 procedure.execute(graph, result);
3673 // impl.state.barrier.dec(this);
3675 procedure.execute(impl.newRestart(graph), result);
3677 } catch (Throwable t2) {
3678 Logger.defaultLogError(t2);
3683 public void exception(ReadGraphImpl graph, Throwable t) {
3685 if(first.compareAndSet(true, false)) {
3686 procedure.exception(graph, t);
3687 // impl.state.barrier.dec(this);
3689 procedure.exception(impl.newRestart(graph), t);
3691 } catch (Throwable t2) {
3692 Logger.defaultLogError(t2);
3698 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3699 // else impl.state.barrier.inc(null, null);
3702 return QueryCacheBase.resultValueQuery(impl, sId, impl.parent, listener, ip);
3703 } catch (DatabaseException e) {
3704 Logger.defaultLogError(e);
3711 return QueryCacheBase.resultValueQuery(impl, sId, impl.parent, null, null);
3712 } catch (DatabaseException e) {
3713 Logger.defaultLogError(e);
3718 throw new IllegalStateException("Internal error");
3723 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3725 assert(subject != null);
3726 assert(procedure != null);
3728 final ListenerBase listener = getListenerBase(procedure);
3730 if(impl.parent != null || listener != null) {
3732 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3734 AtomicBoolean first = new AtomicBoolean(true);
3737 public void execute(ReadGraphImpl graph, byte[] result) {
3739 if(first.compareAndSet(true, false)) {
3740 procedure.execute(graph, result);
3741 // impl.state.barrier.dec(this);
3743 procedure.execute(impl.newRestart(graph), result);
3745 } catch (Throwable t2) {
3746 Logger.defaultLogError(t2);
3751 public void exception(ReadGraphImpl graph, Throwable t) {
3753 if(first.compareAndSet(true, false)) {
3754 procedure.exception(graph, t);
3755 // impl.state.barrier.dec(this);
3757 procedure.exception(impl.newRestart(graph), t);
3759 } catch (Throwable t2) {
3760 Logger.defaultLogError(t2);
3766 int sId = querySupport.getId(subject);
3768 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3769 // else impl.state.barrier.inc(null, null);
3772 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3773 } catch (DatabaseException e) {
3774 Logger.defaultLogError(e);
3779 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3782 public void execute(ReadGraphImpl graph, byte[] result) {
3784 procedure.execute(graph, result);
3789 public void exception(ReadGraphImpl graph, Throwable t) {
3791 procedure.exception(graph, t);
3797 int sId = querySupport.getId(subject);
3800 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3801 } catch (DatabaseException e) {
3802 Logger.defaultLogError(e);
3810 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3812 assert(relation != null);
3813 assert(procedure != null);
3815 final ListenerBase listener = getListenerBase(procedure);
3817 IntProcedure ip = new IntProcedure() {
3819 private int result = 0;
3821 final AtomicBoolean found = new AtomicBoolean(false);
3822 final AtomicBoolean done = new AtomicBoolean(false);
3825 public void finished(ReadGraphImpl graph) {
3827 // Shall fire exactly once!
3828 if(done.compareAndSet(false, true)) {
3831 procedure.exception(graph, new NoInverseException(""));
3832 // impl.state.barrier.dec(this);
3834 procedure.execute(graph, querySupport.getResource(result));
3835 // impl.state.barrier.dec(this);
3837 } catch (Throwable t) {
3838 Logger.defaultLogError(t);
3845 public void execute(ReadGraphImpl graph, int i) {
3847 if(found.compareAndSet(false, true)) {
3850 // Shall fire exactly once!
3851 if(done.compareAndSet(false, true)) {
3853 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3854 // impl.state.barrier.dec(this);
3855 } catch (Throwable t) {
3856 Logger.defaultLogError(t);
3864 public void exception(ReadGraphImpl graph, Throwable t) {
3865 // Shall fire exactly once!
3866 if(done.compareAndSet(false, true)) {
3868 procedure.exception(graph, t);
3869 // impl.state.barrier.dec(this);
3870 } catch (Throwable t2) {
3871 Logger.defaultLogError(t2);
3878 int sId = querySupport.getId(relation);
3880 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3881 // else impl.state.barrier.inc(null, null);
3884 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3885 } catch (DatabaseException e) {
3886 Logger.defaultLogError(e);
3892 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3895 assert(procedure != null);
3897 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3900 public void execute(ReadGraphImpl graph, Integer result) {
3902 procedure.execute(graph, querySupport.getResource(result));
3903 } catch (Throwable t2) {
3904 Logger.defaultLogError(t2);
3906 // impl.state.barrier.dec(this);
3910 public void exception(ReadGraphImpl graph, Throwable t) {
3913 procedure.exception(graph, t);
3914 } catch (Throwable t2) {
3915 Logger.defaultLogError(t2);
3917 // impl.state.barrier.dec(this);
3922 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3923 // else impl.state.barrier.inc(null, null);
3925 forResource(impl, id, impl.parent, ip);
3930 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3933 assert(procedure != null);
3935 // impl.state.barrier.inc();
3938 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
3941 public void execute(ReadGraphImpl graph, Integer result) {
3943 procedure.execute(graph, querySupport.getResource(result));
3944 } catch (Throwable t2) {
3945 Logger.defaultLogError(t2);
3947 // impl.state.barrier.dec();
3951 public void exception(ReadGraphImpl graph, Throwable t) {
3953 procedure.exception(graph, t);
3954 } catch (Throwable t2) {
3955 Logger.defaultLogError(t2);
3957 // impl.state.barrier.dec();
3961 } catch (DatabaseException e) {
3962 Logger.defaultLogError(e);
3968 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
3970 assert(subject != null);
3971 assert(procedure != null);
3973 final ListenerBase listener = getListenerBase(procedure);
3976 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener,QueryCache.emptyProcedureDirectPredicates);
3977 procedure.execute(impl, !result.isEmpty());
3978 } catch (DatabaseException e) {
3979 procedure.exception(impl, e);
3985 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
3987 assert(subject != null);
3988 assert(predicate != null);
3989 assert(procedure != null);
3991 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
3993 boolean found = false;
3996 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4001 synchronized public void finished(AsyncReadGraph graph) {
4003 procedure.execute(graph, found);
4004 } catch (Throwable t2) {
4005 Logger.defaultLogError(t2);
4007 // impl.state.barrier.dec(this);
4011 public void exception(AsyncReadGraph graph, Throwable t) {
4013 procedure.exception(graph, t);
4014 } catch (Throwable t2) {
4015 Logger.defaultLogError(t2);
4017 // impl.state.barrier.dec(this);
4022 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
4023 // else impl.state.barrier.inc(null, null);
4025 forEachObject(impl, subject, predicate, ip);
4030 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
4032 assert(subject != null);
4033 assert(predicate != null);
4034 assert(procedure != null);
4036 // impl.state.barrier.inc();
4038 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
4040 boolean found = false;
4043 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4044 if(resource.equals(object)) found = true;
4048 synchronized public void finished(AsyncReadGraph graph) {
4050 procedure.execute(graph, found);
4051 } catch (Throwable t2) {
4052 Logger.defaultLogError(t2);
4054 // impl.state.barrier.dec();
4058 public void exception(AsyncReadGraph graph, Throwable t) {
4060 procedure.exception(graph, t);
4061 } catch (Throwable t2) {
4062 Logger.defaultLogError(t2);
4064 // impl.state.barrier.dec();
4072 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4074 assert(subject != null);
4075 assert(procedure != null);
4077 final ListenerBase listener = getListenerBase(procedure);
4079 // impl.state.barrier.inc();
4082 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
4085 public void execute(ReadGraphImpl graph, byte[] object) {
4086 boolean result = object != null;
4088 procedure.execute(graph, result);
4089 } catch (Throwable t2) {
4090 Logger.defaultLogError(t2);
4092 // impl.state.barrier.dec();
4096 public void exception(ReadGraphImpl graph, Throwable t) {
4098 procedure.exception(graph, t);
4099 } catch (Throwable t2) {
4100 Logger.defaultLogError(t2);
4102 // impl.state.barrier.dec();
4106 } catch (DatabaseException e) {
4107 Logger.defaultLogError(e);
4113 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4115 assert(subject != null);
4116 assert(procedure != null);
4118 final ListenerBase listener = getListenerBase(procedure);
4122 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
4125 public void exception(ReadGraphImpl graph, Throwable t) {
4127 procedure.exception(graph, t);
4128 } catch (Throwable t2) {
4129 Logger.defaultLogError(t2);
4131 // impl.state.barrier.dec();
4135 public void execute(ReadGraphImpl graph, int i) {
4137 procedure.execute(graph, querySupport.getResource(i));
4138 } catch (Throwable t2) {
4139 Logger.defaultLogError(t2);
4144 public void finished(ReadGraphImpl graph) {
4146 procedure.finished(graph);
4147 } catch (Throwable t2) {
4148 Logger.defaultLogError(t2);
4150 // impl.state.barrier.dec();
4154 } catch (DatabaseException e) {
4155 Logger.defaultLogError(e);
4161 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
4163 // assert(request != null);
4164 // assert(procedure != null);
4166 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
4171 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
4173 // assert(graph != null);
4174 // assert(request != null);
4176 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
4177 // if(entry != null && entry.isReady()) {
4178 // return (T)entry.get(graph, this, null);
4180 // return request.perform(graph);
4185 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
4187 // assert(graph != null);
4188 // assert(request != null);
4190 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
4191 // if(entry != null && entry.isReady()) {
4192 // if(entry.isExcepted()) {
4193 // Throwable t = (Throwable)entry.getResult();
4194 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4195 // else throw new DatabaseException(t);
4197 // return (T)entry.getResult();
4201 // final DataContainer<T> result = new DataContainer<T>();
4202 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
4204 // request.register(graph, new Listener<T>() {
4207 // public void exception(Throwable t) {
4208 // exception.set(t);
4212 // public void execute(T t) {
4217 // public boolean isDisposed() {
4223 // Throwable t = exception.get();
4225 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4226 // else throw new DatabaseException(t);
4229 // return result.get();
4236 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
4238 // assert(graph != null);
4239 // assert(request != null);
4241 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
4242 // if(entry != null && entry.isReady()) {
4243 // if(entry.isExcepted()) {
4244 // procedure.exception(graph, (Throwable)entry.getResult());
4246 // procedure.execute(graph, (T)entry.getResult());
4249 // request.perform(graph, procedure);
4255 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4257 assert(request != null);
4258 assert(procedure != null);
4262 queryMultiRead(impl, request, parent, listener, procedure);
4264 } catch (DatabaseException e) {
4266 throw new IllegalStateException(e);
4273 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4275 assert(request != null);
4276 assert(procedure != null);
4278 // impl.state.barrier.inc();
4280 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4282 public void execute(AsyncReadGraph graph, T result) {
4285 procedure.execute(graph, result);
4286 } catch (Throwable t2) {
4287 Logger.defaultLogError(t2);
4292 public void finished(AsyncReadGraph graph) {
4295 procedure.finished(graph);
4296 } catch (Throwable t2) {
4297 Logger.defaultLogError(t2);
4300 // impl.state.barrier.dec();
4305 public String toString() {
4306 return procedure.toString();
4310 public void exception(AsyncReadGraph graph, Throwable t) {
4313 procedure.exception(graph, t);
4314 } catch (Throwable t2) {
4315 Logger.defaultLogError(t2);
4318 // impl.state.barrier.dec();
4327 final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {
4329 assert(request != null);
4330 assert(procedure != null);
4334 queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4337 public String toString() {
4338 return procedure.toString();
4342 public void execute(AsyncReadGraph graph, T result) {
4344 procedure.execute(result);
4345 } catch (Throwable t2) {
4346 Logger.defaultLogError(t2);
4351 public void exception(AsyncReadGraph graph, Throwable throwable) {
4353 procedure.exception(throwable);
4354 } catch (Throwable t2) {
4355 Logger.defaultLogError(t2);
4361 } catch (DatabaseException e) {
4363 throw new IllegalStateException(e);
4370 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4372 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4377 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4379 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4384 public VirtualGraph getValueProvider(Resource subject) {
4386 return querySupport.getValueProvider(querySupport.getId(subject));
4390 public boolean resumeTasks(ReadGraphImpl graph) {
4392 return querySupport.resume(graph);
4396 public boolean isImmutable(int resourceId) {
4397 return querySupport.isImmutable(resourceId);
4400 public boolean isImmutable(Resource resource) {
4401 ResourceImpl impl = (ResourceImpl)resource;
4402 return isImmutable(impl.id);
4407 public Layer0 getL0(ReadGraph graph) {
4409 L0 = Layer0.getInstance(graph);