1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
35 import org.simantics.databoard.Bindings;
36 import org.simantics.db.AsyncReadGraph;
37 import org.simantics.db.DevelopmentKeys;
38 import org.simantics.db.DirectStatements;
39 import org.simantics.db.ReadGraph;
40 import org.simantics.db.RelationInfo;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
46 import org.simantics.db.common.utils.Logger;
47 import org.simantics.db.debug.ListenerReport;
48 import org.simantics.db.exception.DatabaseException;
49 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
50 import org.simantics.db.exception.NoInverseException;
51 import org.simantics.db.exception.ResourceNotFoundException;
52 import org.simantics.db.impl.DebugPolicy;
53 import org.simantics.db.impl.ResourceImpl;
54 import org.simantics.db.impl.graph.ReadGraphImpl;
55 import org.simantics.db.impl.graph.ReadGraphSupport;
56 import org.simantics.db.impl.graph.WriteGraphImpl;
57 import org.simantics.db.impl.procedure.IntProcedureAdapter;
58 import org.simantics.db.impl.procedure.InternalProcedure;
59 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
60 import org.simantics.db.impl.support.ResourceSupport;
61 import org.simantics.db.procedure.AsyncMultiListener;
62 import org.simantics.db.procedure.AsyncMultiProcedure;
63 import org.simantics.db.procedure.AsyncProcedure;
64 import org.simantics.db.procedure.AsyncSetListener;
65 import org.simantics.db.procedure.ListenerBase;
66 import org.simantics.db.procedure.MultiProcedure;
67 import org.simantics.db.procedure.StatementProcedure;
68 import org.simantics.db.procedure.SyncMultiProcedure;
69 import org.simantics.db.request.AsyncMultiRead;
70 import org.simantics.db.request.ExternalRead;
71 import org.simantics.db.request.MultiRead;
72 import org.simantics.db.request.RequestFlags;
73 import org.simantics.layer0.Layer0;
74 import org.simantics.utils.DataContainer;
75 import org.simantics.utils.Development;
76 import org.simantics.utils.datastructures.Pair;
77 import org.simantics.utils.datastructures.collections.CollectionUtils;
78 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
80 import gnu.trove.procedure.TIntProcedure;
81 import gnu.trove.procedure.TLongProcedure;
82 import gnu.trove.procedure.TObjectProcedure;
83 import gnu.trove.set.hash.THashSet;
84 import gnu.trove.set.hash.TIntHashSet;
86 @SuppressWarnings({"rawtypes", "unchecked"})
87 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
89 public static int indent = 0;
94 public int boundQueries = 0;
97 final private int functionalRelation;
99 final private int superrelationOf;
101 final private int instanceOf;
103 final private int inverseOf;
105 final private int asserts;
107 final private int hasPredicate;
109 final private int hasPredicateInverse;
111 final private int hasObject;
113 final private int inherits;
115 final private int subrelationOf;
117 final private int rootLibrary;
120 * A cache for the root library resource. Initialized in
121 * {@link #getRootLibraryResource()}.
123 private volatile ResourceImpl rootLibraryResource;
125 final private int library;
127 final private int consistsOf;
129 final private int hasName;
131 AtomicInteger sleepers = new AtomicInteger(0);
133 private boolean updating = false;
136 private boolean firingListeners = false;
138 final public QueryCache cache;
139 final public QuerySupport querySupport;
140 final public Session session;
141 final public ResourceSupport resourceSupport;
143 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
145 QueryThread[] executors;
147 // public ArrayList<SessionTask>[] queues;
149 public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
153 INIT, RUN, SLEEP, DISPOSED
157 public ThreadState[] threadStates;
158 // public ReentrantLock[] threadLocks;
159 // public Condition[] threadConditions;
161 //public ArrayList<SessionTask>[] ownTasks;
163 //public ArrayList<SessionTask>[] ownSyncTasks;
165 //ArrayList<SessionTask>[] delayQueues;
167 final Object querySupportLock;
169 public Long modificationCounter = 0L;
171 public void close() {
174 SessionTask getOwnTask(int thread) {
175 synchronized(querySupportLock) {
177 while(index < freeScheduling.size()) {
178 SessionTask task = freeScheduling.get(index);
179 if(task.thread == thread && !task.systemCall)
180 return freeScheduling.remove(index);
187 public boolean performPending(int thread) {
188 SessionTask task = getOwnTask(thread);
197 // final public void scheduleOwn(int caller, SessionTask request) {
198 // ownTasks[caller].add(request);
201 final public void schedule(SessionTask request) {
203 int performer = request.thread;
205 if(DebugPolicy.SCHEDULE)
206 System.out.println("schedule " + request + " " + " -> " + performer);
208 //assert(performer >= 0);
210 assert(request != null);
212 // if(caller == performer) {
213 // request.run(caller);
216 // if(performer == THREADS) {
218 synchronized(querySupportLock) {
220 //new Exception().printStackTrace();
222 freeScheduling.add(request);
224 querySupportLock.notifyAll();
226 //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
228 // for(int i=0;i<THREADS;i++) {
229 // ReentrantLock queueLock = threadLocks[i];
231 // //queues[performer].add(request);
232 // //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
233 // threadConditions[i].signalAll();
234 // queueLock.unlock();
243 // ReentrantLock queueLock = threadLocks[performer];
245 // queues[performer].add(request);
246 // // This thread could have been sleeping
247 // if(queues[performer].size() == 1) {
248 // //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
249 // threadConditions[performer].signalAll();
251 // queueLock.unlock();
258 final public int THREAD_MASK;
260 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
262 public static abstract class SessionTask {
264 final public int thread;
265 final public boolean systemCall;
266 // final public int syncCaller;
267 //final public Object object;
269 public SessionTask(boolean systemCall) {
270 this.thread = QueryProcessor.thread.get();
271 this.systemCall = systemCall;
272 // this.syncCaller = -1;
273 //this.object = object;
276 // public SessionTask(Object object, int syncCaller) {
277 // this.thread = QueryProcessor.thread.get();
278 // this.syncCaller = syncCaller;
279 // this.object = object;
282 public abstract void run(int thread);
285 public String toString() {
286 return "SessionTask[" + super.toString() + "]";
291 public static abstract class SessionRead extends SessionTask {
293 final public Semaphore notify;
294 final public DataContainer<Throwable> throwable;
296 public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
298 this.throwable = throwable;
299 this.notify = notify;
304 long waitingTime = 0;
307 static int koss2 = 0;
309 public boolean resume(ReadGraphImpl graph) {
310 return executors[0].runSynchronized();
313 //private WeakReference<GarbageTracker> garbageTracker;
315 private class GarbageTracker {
318 protected void finalize() throws Throwable {
320 // System.err.println("GarbageTracker");
322 // garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
330 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
331 throws DatabaseException {
333 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
336 THREAD_MASK = threads - 1;
339 cache = new QueryCache(core, threads);
340 session = querySupport.getSession();
341 resourceSupport = querySupport.getSupport();
342 querySupportLock = core.getLock();
344 executors = new QueryThread[THREADS];
345 // queues = new ArrayList[THREADS];
346 // threadLocks = new ReentrantLock[THREADS];
347 // threadConditions = new Condition[THREADS];
348 threadStates = new ThreadState[THREADS];
349 // ownTasks = new ArrayList[THREADS];
350 // ownSyncTasks = new ArrayList[THREADS];
351 // delayQueues = new ArrayList[THREADS * THREADS];
353 // freeSchedule = new AtomicInteger(0);
355 // for (int i = 0; i < THREADS * THREADS; i++) {
356 // delayQueues[i] = new ArrayList<SessionTask>();
359 for (int i = 0; i < THREADS; i++) {
361 // tasks[i] = new ArrayList<Runnable>();
362 // ownTasks[i] = new ArrayList<SessionTask>();
363 // ownSyncTasks[i] = new ArrayList<SessionTask>();
364 // queues[i] = new ArrayList<SessionTask>();
365 // threadLocks[i] = new ReentrantLock();
366 // threadConditions[i] = threadLocks[i].newCondition();
367 // limits[i] = false;
368 threadStates[i] = ThreadState.INIT;
372 for (int i = 0; i < THREADS; i++) {
376 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
378 threadSet.add(executors[i]);
383 for (int i = 0; i < THREADS; i++) {
384 executors[i].start();
387 // Make sure that query threads are up and running
388 while(sleepers.get() != THREADS) {
391 } catch (InterruptedException e) {
396 rootLibrary = core.getBuiltin("http:/");
397 boolean builtinsInstalled = rootLibrary != 0;
399 if (builtinsInstalled) {
400 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
401 assert (functionalRelation != 0);
403 functionalRelation = 0;
405 if (builtinsInstalled) {
406 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
407 assert (instanceOf != 0);
411 if (builtinsInstalled) {
412 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
413 assert (inverseOf != 0);
418 if (builtinsInstalled) {
419 inherits = core.getBuiltin(Layer0.URIs.Inherits);
420 assert (inherits != 0);
424 if (builtinsInstalled) {
425 asserts = core.getBuiltin(Layer0.URIs.Asserts);
426 assert (asserts != 0);
430 if (builtinsInstalled) {
431 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
432 assert (hasPredicate != 0);
436 if (builtinsInstalled) {
437 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
438 assert (hasPredicateInverse != 0);
440 hasPredicateInverse = 0;
442 if (builtinsInstalled) {
443 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
444 assert (hasObject != 0);
448 if (builtinsInstalled) {
449 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
450 assert (subrelationOf != 0);
454 if (builtinsInstalled) {
455 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
456 assert (superrelationOf != 0);
460 if (builtinsInstalled) {
461 library = core.getBuiltin(Layer0.URIs.Library);
462 assert (library != 0);
466 if (builtinsInstalled) {
467 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
468 assert (consistsOf != 0);
472 if (builtinsInstalled) {
473 hasName = core.getBuiltin(Layer0.URIs.HasName);
474 assert (hasName != 0);
480 final public void releaseWrite(ReadGraphImpl graph) {
481 performDirtyUpdates(graph);
482 modificationCounter++;
485 final public int getId(final Resource r) {
486 return querySupport.getId(r);
489 public QuerySupport getCore() {
493 public int getFunctionalRelation() {
494 return functionalRelation;
497 public int getInherits() {
501 public int getInstanceOf() {
505 public int getInverseOf() {
509 public int getSubrelationOf() {
510 return subrelationOf;
513 public int getSuperrelationOf() {
514 return superrelationOf;
517 public int getAsserts() {
521 public int getHasPredicate() {
525 public int getHasPredicateInverse() {
526 return hasPredicateInverse;
529 public int getHasObject() {
533 public int getRootLibrary() {
537 public Resource getRootLibraryResource() {
538 if (rootLibraryResource == null) {
539 // Synchronization is not needed here, it doesn't matter if multiple
540 // threads simultaneously set rootLibraryResource once.
541 int root = getRootLibrary();
543 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
544 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
546 return rootLibraryResource;
549 public int getLibrary() {
553 public int getConsistsOf() {
557 public int getHasName() {
561 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
565 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
568 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
570 if (result != null && result != 0) {
571 procedure.execute(graph, result);
575 // Fall back to using the fixed builtins.
576 // result = querySupport.getBuiltin(id);
577 // if (result != 0) {
578 // procedure.execute(graph, result);
583 // result = querySupport.getRandomAccessReference(id);
584 // } catch (ResourceNotFoundException e) {
585 // procedure.exception(graph, e);
590 procedure.execute(graph, result);
592 procedure.exception(graph, new ResourceNotFoundException(id));
598 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
599 procedure.exception(graph, t);
603 } catch (DatabaseException e) {
607 procedure.exception(graph, e);
609 } catch (DatabaseException e1) {
611 Logger.defaultLogError(e1);
619 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
621 Integer result = querySupport.getBuiltin(id);
623 procedure.execute(graph, result);
625 procedure.exception(graph, new ResourceNotFoundException(id));
630 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
633 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
634 } catch (DatabaseException e) {
635 throw new IllegalStateException(e);
640 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
644 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
645 } catch (DatabaseException e) {
646 throw new IllegalStateException(e);
651 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
652 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
656 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
658 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
662 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
664 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
668 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
670 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
674 boolean isBound(ExternalReadEntry<?> entry) {
675 if(entry.hasParents()) return true;
676 else if(hasListener(entry)) return true;
680 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
682 if (parent != null && !inferred) {
684 if(!child.isImmutable(graph))
685 child.addParent(parent);
686 } catch (DatabaseException e) {
687 Logger.defaultLogError(e);
689 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
692 if (listener != null) {
693 return registerListener(child, listener, procedure);
701 static class Dummy implements InternalProcedure<Object>, IntProcedure {
704 public void execute(ReadGraphImpl graph, int i) {
708 public void finished(ReadGraphImpl graph) {
712 public void execute(ReadGraphImpl graph, Object result) {
716 public void exception(ReadGraphImpl graph, Throwable throwable) {
721 private static final Dummy dummy = new Dummy();
724 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
726 if (DebugPolicy.PERFORM)
727 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
730 assert (!collecting);
732 assert(query.assertNotDiscarded());
734 registerDependencies(graph, query, parent, listener, procedure, false);
736 // FRESH, REFUTED, EXCEPTED go here
737 if (!query.isReady()) {
742 query.computeForEach(graph, this, (Procedure)dummy, true);
743 return query.get(graph, this, null);
749 return query.get(graph, this, procedure);
757 interface QueryCollectorSupport {
758 public CacheCollectionResult allCaches();
759 public Collection<CacheEntry> getRootList();
760 public int getCurrentSize();
761 public int calculateCurrentSize();
762 public CacheEntryBase iterate(int level);
763 public void remove();
764 public void setLevel(CacheEntryBase entry, int level);
765 public boolean start(boolean flush);
768 interface QueryCollector {
770 public void collect(int youngTarget, int allowedTimeInMs);
774 class QueryCollectorSupportImpl implements QueryCollectorSupport {
776 private static final boolean DEBUG = false;
777 private static final double ITERATION_RATIO = 0.2;
779 private CacheCollectionResult iteration = new CacheCollectionResult();
780 private boolean fresh = true;
781 private boolean needDataInStart = true;
783 QueryCollectorSupportImpl() {
787 public CacheCollectionResult allCaches() {
788 CacheCollectionResult result = new CacheCollectionResult();
789 QueryProcessor.this.allCaches(result);
794 public boolean start(boolean flush) {
795 // We need new data from query maps
797 if(needDataInStart || flush) {
798 // Last run ended after processing all queries => refresh data
799 restart(flush ? 0.0 : ITERATION_RATIO);
801 // continue with previous big data
803 // Notify caller about iteration situation
804 return iteration.isAtStart();
807 private void restart(double targetRatio) {
809 needDataInStart = true;
811 long start = System.nanoTime();
814 // We need new data from query maps
816 int iterationSize = iteration.size()+1;
817 int diff = calculateCurrentSize()-iterationSize;
819 double ratio = (double)diff / (double)iterationSize;
820 boolean dirty = Math.abs(ratio) >= targetRatio;
823 iteration = allCaches();
825 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
826 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
827 System.err.print(" " + iteration.levels[i].size());
828 System.err.println("");
835 needDataInStart = false;
837 // We are returning here within the same GC round - reuse the cache table
846 public CacheEntryBase iterate(int level) {
848 CacheEntryBase entry = iteration.next(level);
850 restart(ITERATION_RATIO);
854 while(entry != null && entry.isDiscarded()) {
855 entry = iteration.next(level);
863 public void remove() {
868 public void setLevel(CacheEntryBase entry, int level) {
869 iteration.setLevel(entry, level);
872 public Collection<CacheEntry> getRootList() {
873 return cache.getRootList();
877 public int calculateCurrentSize() {
878 return cache.calculateCurrentSize();
882 public int getCurrentSize() {
887 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
889 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
890 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
892 public int querySize() {
896 public void gc(int youngTarget, int allowedTimeInMs) {
898 collector.collect(youngTarget, allowedTimeInMs);
902 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
904 assert (entry != null);
906 if (base.isDisposed())
909 return addListener(entry, base, procedure);
913 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
914 entry.setLastKnown(result);
917 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
919 assert (entry != null);
920 assert (procedure != null);
922 ArrayList<ListenerEntry> list = cache.listeners.get(entry);
924 list = new ArrayList<ListenerEntry>(1);
925 cache.listeners.put(entry, list);
928 ListenerEntry result = new ListenerEntry(entry, base, procedure);
929 int currentIndex = list.indexOf(result);
930 // There was already a listener
931 if(currentIndex > -1) {
932 ListenerEntry current = list.get(currentIndex);
933 if(!current.base.isDisposed()) return null;
934 list.set(currentIndex, result);
939 if(DebugPolicy.LISTENER) {
940 new Exception().printStackTrace();
941 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
948 private void scheduleListener(ListenerEntry entry) {
949 assert (entry != null);
950 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
951 scheduledListeners.add(entry);
954 private void removeListener(ListenerEntry entry) {
955 assert (entry != null);
956 ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
957 if(list == null) return;
958 boolean success = list.remove(entry);
961 cache.listeners.remove(entry.entry);
964 private boolean hasListener(CacheEntry entry) {
965 if(cache.listeners.get(entry) != null) return true;
969 boolean hasListenerAfterDisposing(CacheEntry entry) {
970 if(cache.listeners.get(entry) != null) {
971 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
972 ArrayList<ListenerEntry> list = null;
973 for (ListenerEntry e : entries) {
974 if (e.base.isDisposed()) {
975 if(list == null) list = new ArrayList<ListenerEntry>();
980 for (ListenerEntry e : list) {
984 if (entries.isEmpty()) {
985 cache.listeners.remove(entry);
993 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
994 hasListenerAfterDisposing(entry);
995 if(cache.listeners.get(entry) != null)
996 return cache.listeners.get(entry);
998 return Collections.emptyList();
1001 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
1003 if(!workarea.containsKey(entry)) {
1005 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
1006 for(ListenerEntry e : getListenerEntries(entry))
1009 workarea.put(entry, ls);
1011 for(CacheEntry parent : entry.getParents(this)) {
1012 processListenerReport(parent, workarea);
1013 ls.addAll(workarea.get(parent));
1020 public synchronized ListenerReport getListenerReport() throws IOException {
1022 class ListenerReportImpl implements ListenerReport {
1024 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
1027 public void print(PrintStream b) {
1028 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
1029 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
1030 for(ListenerBase l : e.getValue()) {
1031 Integer i = hist.get(l);
1032 hist.put(l, i != null ? i-1 : -1);
1036 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1037 b.print("" + -p.second + " " + p.first + "\n");
1045 ListenerReportImpl result = new ListenerReportImpl();
1047 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1048 for(CacheEntryBase entry : all) {
1049 hasListenerAfterDisposing(entry);
1051 for(CacheEntryBase entry : all) {
1052 processListenerReport(entry, result.workarea);
1059 public synchronized String reportListeners(File file) throws IOException {
1064 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1065 ListenerReport report = getListenerReport();
1068 return "Done reporting listeners.";
1072 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1074 if(entry.isDiscarded()) return;
1075 if(workarea.containsKey(entry)) return;
1077 Iterable<CacheEntry> parents = entry.getParents(this);
1078 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1079 for(CacheEntry e : parents) {
1080 if(e.isDiscarded()) continue;
1082 processParentReport(e, workarea);
1084 workarea.put(entry, ps);
1088 public synchronized String reportQueryActivity(File file) throws IOException {
1090 System.err.println("reportQueries " + file.getAbsolutePath());
1095 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1097 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
1098 Collections.reverse(entries);
1100 for(Pair<String,Integer> entry : entries) {
1101 b.println(entry.first + ": " + entry.second);
1106 Development.histogram.clear();
1112 public synchronized String reportQueries(File file) throws IOException {
1114 System.err.println("reportQueries " + file.getAbsolutePath());
1119 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1121 long start = System.nanoTime();
1123 // ArrayList<CacheEntry> all = ;
1125 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
1126 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1127 for(CacheEntryBase entry : caches) {
1128 processParentReport(entry, workarea);
1131 // for(CacheEntry e : all) System.err.println("entry: " + e);
1133 long duration = System.nanoTime() - start;
1134 System.err.println("Query root set in " + 1e-9*duration + "s.");
1136 start = System.nanoTime();
1138 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
1142 for(CacheEntry entry : workarea.keySet()) {
1143 boolean listener = hasListenerAfterDisposing(entry);
1144 boolean hasParents = entry.getParents(this).iterator().hasNext();
1147 flagMap.put(entry, 0);
1148 } else if (!hasParents) {
1150 flagMap.put(entry, 1);
1153 flagMap.put(entry, 2);
1155 // // Write leaf bit
1156 // entry.flags |= 4;
1159 boolean done = true;
1166 long start2 = System.nanoTime();
1168 int boundCounter = 0;
1169 int unboundCounter = 0;
1170 int unknownCounter = 0;
1172 for(CacheEntry<?> entry : workarea.keySet()) {
1174 //System.err.println("process " + entry);
1176 int flags = flagMap.get(entry);
1177 int bindStatus = flags & 3;
1179 if(bindStatus == 0) boundCounter++;
1180 else if(bindStatus == 1) unboundCounter++;
1181 else if(bindStatus == 2) unknownCounter++;
1183 if(bindStatus < 2) continue;
1186 for(CacheEntry parent : entry.getParents(this)) {
1188 if(parent.isDiscarded()) flagMap.put(parent, 1);
1190 int flags2 = flagMap.get(parent);
1191 int bindStatus2 = flags2 & 3;
1192 // Parent is bound => child is bound
1193 if(bindStatus2 == 0) {
1197 // Parent is unknown => child is unknown
1198 else if (bindStatus2 == 2) {
1205 flagMap.put(entry, newStatus);
1209 duration = System.nanoTime() - start2;
1210 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1211 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1213 } while(!done && loops++ < 20);
1217 for(CacheEntry entry : workarea.keySet()) {
1219 int bindStatus = flagMap.get(entry);
1220 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1226 duration = System.nanoTime() - start;
1227 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1229 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1231 for(CacheEntry entry : workarea.keySet()) {
1232 Class<?> clazz = entry.getClass();
1233 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
1234 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
1235 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
1236 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
1237 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
1238 Integer c = counts.get(clazz);
1239 if(c == null) counts.put(clazz, -1);
1240 else counts.put(clazz, c-1);
1243 b.print("// Simantics DB client query report file\n");
1244 b.print("// This file contains the following information\n");
1245 b.print("// -The amount of cached query instances per query class\n");
1246 b.print("// -The sizes of retained child sets\n");
1247 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1248 b.print("// -Followed by status, where\n");
1249 b.print("// -0=bound\n");
1250 b.print("// -1=free\n");
1251 b.print("// -2=unknown\n");
1252 b.print("// -L=has listener\n");
1253 b.print("// -List of children for each query (search for 'C <query name>')\n");
1255 b.print("----------------------------------------\n");
1257 b.print("// Queries by class\n");
1258 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1259 b.print(-p.second + " " + p.first.getName() + "\n");
1262 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1263 for(CacheEntry e : workarea.keySet())
1266 boolean changed = true;
1268 while(changed && iter++<50) {
1272 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1273 for(CacheEntry e : workarea.keySet())
1276 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1277 Integer c = hist.get(e.getKey());
1278 for(CacheEntry p : e.getValue()) {
1279 Integer i = newHist.get(p);
1280 newHist.put(p, i+c);
1283 for(CacheEntry e : workarea.keySet()) {
1284 Integer value = newHist.get(e);
1285 Integer old = hist.get(e);
1286 if(!value.equals(old)) {
1288 // System.err.println("hist " + e + ": " + old + " => " + value);
1293 System.err.println("Retained set iteration " + iter);
1297 b.print("// Queries by retained set\n");
1298 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1299 b.print("" + -p.second + " " + p.first + "\n");
1302 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1304 b.print("// Entry parent listing\n");
1305 for(CacheEntry entry : workarea.keySet()) {
1306 int status = flagMap.get(entry);
1307 boolean hasListener = hasListenerAfterDisposing(entry);
1308 b.print("Q " + entry.toString());
1310 b.print(" (L" + status + ")");
1313 b.print(" (" + status + ")");
1316 for(CacheEntry parent : workarea.get(entry)) {
1317 Collection<CacheEntry> inv = inverse.get(parent);
1319 inv = new ArrayList<CacheEntry>();
1320 inverse.put(parent, inv);
1323 b.print(" " + parent.toString());
1328 b.print("// Entry child listing\n");
1329 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1330 b.print("C " + entry.getKey().toString());
1332 for(CacheEntry child : entry.getValue()) {
1333 Integer h = hist.get(child);
1337 b.print(" <no children>");
1339 b.print(" " + child.toString());
1344 b.print("#queries: " + workarea.keySet().size() + "\n");
1345 b.print("#listeners: " + listeners + "\n");
1349 return "Dumped " + workarea.keySet().size() + " queries.";
1355 public CacheEntry caller;
1357 public CacheEntry entry;
1361 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
1362 this.caller = caller;
1364 this.indent = indent;
1369 boolean removeQuery(CacheEntry entry) {
1371 // This entry has been removed before. No need to do anything here.
1372 if(entry.isDiscarded()) return false;
1374 assert (!entry.isDiscarded());
1376 Query query = entry.getQuery();
1378 query.removeEntry(this);
1383 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1394 * @return true if this entry is being listened
1396 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1400 CacheEntry entry = e.entry;
1402 //System.err.println("updateQuery " + entry);
1405 * If the dependency graph forms a DAG, some entries are inserted in the
1406 * todo list many times. They only need to be processed once though.
1408 if (entry.isDiscarded()) {
1409 if (Development.DEVELOPMENT) {
1410 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1411 System.out.print("D");
1412 for (int i = 0; i < e.indent; i++)
1413 System.out.print(" ");
1414 System.out.println(entry.getQuery());
1417 // System.err.println(" => DISCARDED");
1421 if (entry.isRefuted()) {
1422 if (Development.DEVELOPMENT) {
1423 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1424 System.out.print("R");
1425 for (int i = 0; i < e.indent; i++)
1426 System.out.print(" ");
1427 System.out.println(entry.getQuery());
1433 if (entry.isExcepted()) {
1434 if (Development.DEVELOPMENT) {
1435 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1436 System.out.print("E");
1441 if (entry.isPending()) {
1442 if (Development.DEVELOPMENT) {
1443 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1444 System.out.print("P");
1451 if (Development.DEVELOPMENT) {
1452 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1453 System.out.print("U ");
1454 for (int i = 0; i < e.indent; i++)
1455 System.out.print(" ");
1456 System.out.print(entry.getQuery());
1460 Query query = entry.getQuery();
1461 int type = query.type();
1463 boolean hasListener = hasListener(entry);
1465 if (Development.DEVELOPMENT) {
1466 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1467 if(hasListener(entry)) {
1468 System.out.println(" (L)");
1470 System.out.println("");
1475 if(entry.isPending() || entry.isExcepted()) {
1478 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1480 immediates.put(entry, entry);
1495 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1497 immediates.put(entry, entry);
1511 // System.err.println(" => FOO " + type);
1514 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1515 if(entries != null) {
1516 for (ListenerEntry le : entries) {
1517 scheduleListener(le);
1522 // If invalid, update parents
1523 if (type == RequestFlags.INVALIDATE) {
1524 updateParents(e.indent, entry, todo);
1531 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
1533 Iterable<CacheEntry> oldParents = entry.getParents(this);
1534 for (CacheEntry parent : oldParents) {
1535 // System.err.println("updateParents " + entry + " => " + parent);
1536 if(!parent.isDiscarded())
1537 todo.push(new UpdateEntry(entry, parent, indent + 2));
1542 private boolean pruneListener(ListenerEntry entry) {
1543 if (entry.base.isDisposed()) {
1544 removeListener(entry);
1552 * @param av1 an array (guaranteed)
1553 * @param av2 any object
1554 * @return <code>true</code> if the two arrays are equal
1556 private final boolean arrayEquals(Object av1, Object av2) {
1559 Class<?> c1 = av1.getClass().getComponentType();
1560 Class<?> c2 = av2.getClass().getComponentType();
1561 if (c2 == null || !c1.equals(c2))
1563 boolean p1 = c1.isPrimitive();
1564 boolean p2 = c2.isPrimitive();
1568 return Arrays.equals((Object[]) av1, (Object[]) av2);
1569 if (boolean.class.equals(c1))
1570 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1571 else if (byte.class.equals(c1))
1572 return Arrays.equals((byte[]) av1, (byte[]) av2);
1573 else if (int.class.equals(c1))
1574 return Arrays.equals((int[]) av1, (int[]) av2);
1575 else if (long.class.equals(c1))
1576 return Arrays.equals((long[]) av1, (long[]) av2);
1577 else if (float.class.equals(c1))
1578 return Arrays.equals((float[]) av1, (float[]) av2);
1579 else if (double.class.equals(c1))
1580 return Arrays.equals((double[]) av1, (double[]) av2);
1581 throw new RuntimeException("??? Contact application querySupport.");
1586 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1590 Query query = entry.getQuery();
1592 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
1594 entry.prepareRecompute(querySupport);
1596 ReadGraphImpl parentGraph = graph.withParent(entry);
1598 query.recompute(parentGraph);
1600 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1602 Object newValue = entry.getResult();
1604 if (ListenerEntry.NO_VALUE == oldValue) {
1605 if(DebugPolicy.CHANGES) {
1606 System.out.println("C " + query);
1607 System.out.println("- " + oldValue);
1608 System.out.println("- " + newValue);
1613 boolean changed = false;
1615 if (newValue != null) {
1616 if (newValue.getClass().isArray()) {
1617 changed = !arrayEquals(newValue, oldValue);
1619 changed = !newValue.equals(oldValue);
1622 changed = (oldValue != null);
1624 if(DebugPolicy.CHANGES && changed) {
1625 System.out.println("C " + query);
1626 System.out.println("- " + oldValue);
1627 System.out.println("- " + newValue);
1630 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1632 } catch (Throwable t) {
1634 Logger.defaultLogError(t);
1636 return ListenerEntry.NO_VALUE;
1642 public boolean hasScheduledUpdates() {
1643 return !scheduledListeners.isEmpty();
1646 public void performScheduledUpdates(WriteGraphImpl graph) {
1649 assert (!cache.collecting);
1650 assert (!firingListeners);
1652 firingListeners = true;
1656 // Performing may cause further events to be scheduled.
1657 while (!scheduledListeners.isEmpty()) {
1660 // graph.state.barrier.inc();
1662 // Clone current events to make new entries possible during
1664 THashSet<ListenerEntry> entries = scheduledListeners;
1665 scheduledListeners = new THashSet<ListenerEntry>();
1667 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
1669 for (ListenerEntry listenerEntry : entries) {
1671 if (pruneListener(listenerEntry)) {
1672 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
1676 final CacheEntry entry = listenerEntry.entry;
1677 assert (entry != null);
1679 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
1681 if (newValue != ListenerEntry.NOT_CHANGED) {
1682 if(DebugPolicy.LISTENER)
1683 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
1684 schedule.add(listenerEntry);
1685 listenerEntry.setLastKnown(entry.getResult());
1690 for(ListenerEntry listenerEntry : schedule) {
1691 final CacheEntry entry = listenerEntry.entry;
1692 if(DebugPolicy.LISTENER)
1693 System.out.println("Firing " + listenerEntry.procedure);
1695 if(DebugPolicy.LISTENER)
1696 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
1697 entry.performFromCache(graph, listenerEntry.procedure);
1698 } catch (Throwable t) {
1699 t.printStackTrace();
1703 // graph.state.barrier.dec();
1704 // graph.waitAsync(null);
1705 // graph.state.barrier.assertReady();
1710 firingListeners = false;
1717 * @return true if this entry still has listeners
1719 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1721 assert (!cache.collecting);
1725 boolean hadListeners = false;
1726 boolean listenersUnknown = false;
1730 assert(entry != null);
1731 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1732 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1733 todo.add(new UpdateEntry(null, entry, 0));
1737 // Walk the tree and collect immediate updates
1738 while (!todo.isEmpty()) {
1739 UpdateEntry e = todo.pop();
1740 hadListeners |= updateQuery(e, todo, immediates);
1743 if(immediates.isEmpty()) break;
1745 // Evaluate all immediate updates and collect parents to update
1746 for(CacheEntry immediate : immediates.values()) {
1748 if(immediate.isDiscarded()) {
1752 if(immediate.isExcepted()) {
1754 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1755 if (newValue != ListenerEntry.NOT_CHANGED)
1756 updateParents(0, immediate, todo);
1760 Object oldValue = immediate.getResult();
1761 Object newValue = compareTo(graph, immediate, oldValue);
1763 if (newValue != ListenerEntry.NOT_CHANGED) {
1764 updateParents(0, immediate, todo);
1766 // If not changed, keep the old value
1767 immediate.setResult(oldValue);
1768 listenersUnknown = true;
1778 } catch (Throwable t) {
1779 Logger.defaultLogError(t);
1785 return hadListeners | listenersUnknown;
1789 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1790 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1791 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1792 // Maybe use a mutex from util.concurrent?
1793 private Object primitiveUpdateLock = new Object();
1794 private THashSet scheduledPrimitiveUpdates = new THashSet();
1796 public void performDirtyUpdates(final ReadGraphImpl graph) {
1798 cache.dirty = false;
1801 if (Development.DEVELOPMENT) {
1802 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1803 System.err.println("== Query update ==");
1807 // Special case - one statement
1808 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1810 long arg0 = scheduledObjectUpdates.getFirst();
1812 final int subject = (int)(arg0 >>> 32);
1813 final int predicate = (int)(arg0 & 0xffffffff);
1815 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1816 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1817 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1819 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1820 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1821 if(principalTypes != null) update(graph, principalTypes);
1822 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1823 if(types != null) update(graph, types);
1826 if(predicate == subrelationOf) {
1827 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1828 if(superRelations != null) update(graph, superRelations);
1831 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1832 if(dp != null) update(graph, dp);
1833 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1834 if(os != null) update(graph, os);
1836 scheduledObjectUpdates.clear();
1841 // Special case - one value
1842 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1844 int arg0 = scheduledValueUpdates.getFirst();
1846 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1847 if(valueQuery != null) update(graph, valueQuery);
1849 scheduledValueUpdates.clear();
1854 final TIntHashSet predicates = new TIntHashSet();
1855 final TIntHashSet orderedSets = new TIntHashSet();
1857 THashSet primitiveUpdates;
1858 synchronized (primitiveUpdateLock) {
1859 primitiveUpdates = scheduledPrimitiveUpdates;
1860 scheduledPrimitiveUpdates = new THashSet();
1863 primitiveUpdates.forEach(new TObjectProcedure() {
1866 public boolean execute(Object arg0) {
1868 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1869 if (query != null) {
1870 boolean listening = update(graph, query);
1871 if (!listening && !query.hasParents()) {
1872 cache.externalReadEntryMap.remove(arg0);
1881 scheduledValueUpdates.forEach(new TIntProcedure() {
1884 public boolean execute(int arg0) {
1885 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1886 if(valueQuery != null) update(graph, valueQuery);
1892 scheduledInvalidates.forEach(new TIntProcedure() {
1895 public boolean execute(int resource) {
1897 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1898 if(valueQuery != null) update(graph, valueQuery);
1900 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1901 if(principalTypes != null) update(graph, principalTypes);
1902 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1903 if(types != null) update(graph, types);
1905 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1906 if(superRelations != null) update(graph, superRelations);
1908 predicates.add(resource);
1915 scheduledObjectUpdates.forEach(new TLongProcedure() {
1918 public boolean execute(long arg0) {
1920 final int subject = (int)(arg0 >>> 32);
1921 final int predicate = (int)(arg0 & 0xffffffff);
1923 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1924 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1925 if(principalTypes != null) update(graph, principalTypes);
1926 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1927 if(types != null) update(graph, types);
1930 if(predicate == subrelationOf) {
1931 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1932 if(superRelations != null) update(graph, superRelations);
1935 predicates.add(subject);
1936 orderedSets.add(predicate);
1944 predicates.forEach(new TIntProcedure() {
1947 public boolean execute(final int subject) {
1949 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1950 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1951 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1953 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1954 if(entry != null) update(graph, entry);
1962 orderedSets.forEach(new TIntProcedure() {
1965 public boolean execute(int orderedSet) {
1967 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1968 if(entry != null) update(graph, entry);
1976 // for (Integer subject : predicates) {
1977 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
1978 // if(entry != null) update(graph, entry);
1982 if (Development.DEVELOPMENT) {
1983 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1984 System.err.println("== Query update ends ==");
1988 scheduledValueUpdates.clear();
1989 scheduledObjectUpdates.clear();
1990 scheduledInvalidates.clear();
1994 public void updateValue(final int resource) {
1995 scheduledValueUpdates.add(resource);
1999 public void updateStatements(final int resource, final int predicate) {
2000 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
2004 private int lastInvalidate = 0;
2006 public void invalidateResource(final int resource) {
2007 if(lastInvalidate == resource) return;
2008 scheduledValueUpdates.add(resource);
2009 lastInvalidate = resource;
2013 public void updatePrimitive(final ExternalRead primitive) {
2015 // External reads may be updated from arbitrary threads.
2016 // Synchronize to prevent race-conditions.
2017 synchronized (primitiveUpdateLock) {
2018 scheduledPrimitiveUpdates.add(primitive);
2020 querySupport.dirtyPrimitives();
2025 public synchronized String toString() {
2026 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
2030 protected void doDispose() {
2032 for(int index = 0; index < THREADS; index++) {
2033 executors[index].dispose();
2037 for(int i=0;i<100;i++) {
2039 boolean alive = false;
2040 for(int index = 0; index < THREADS; index++) {
2041 alive |= executors[index].isAlive();
2046 } catch (InterruptedException e) {
2047 Logger.defaultLogError(e);
2052 // Then start interrupting
2053 for(int i=0;i<100;i++) {
2055 boolean alive = false;
2056 for(int index = 0; index < THREADS; index++) {
2057 alive |= executors[index].isAlive();
2060 for(int index = 0; index < THREADS; index++) {
2061 executors[index].interrupt();
2065 // // Then just destroy
2066 // for(int index = 0; index < THREADS; index++) {
2067 // executors[index].destroy();
2070 for(int index = 0; index < THREADS; index++) {
2072 executors[index].join(5000);
2073 } catch (InterruptedException e) {
2074 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2076 executors[index] = null;
2081 public int getHits() {
2085 public int getMisses() {
2086 return cache.misses;
2089 public int getSize() {
2093 public Set<Long> getReferencedClusters() {
2094 HashSet<Long> result = new HashSet<Long>();
2095 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
2096 Objects query = (Objects) entry.getQuery();
2097 result.add(querySupport.getClusterId(query.r1()));
2099 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
2100 DirectPredicates query = (DirectPredicates) entry.getQuery();
2101 result.add(querySupport.getClusterId(query.id));
2103 for (CacheEntry entry : cache.valueQueryMap.values()) {
2104 ValueQuery query = (ValueQuery) entry.getQuery();
2105 result.add(querySupport.getClusterId(query.id));
2110 public void assertDone() {
2113 CacheCollectionResult allCaches(CacheCollectionResult result) {
2115 return cache.allCaches(result);
2119 public void printDiagnostics() {
2122 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
2123 querySupport.requestCluster(graph, clusterId, runnable);
2126 public int clean() {
2127 collector.collect(0, Integer.MAX_VALUE);
2131 public void clean(final Collection<ExternalRead<?>> requests) {
2132 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
2133 Iterator<ExternalRead<?>> iterator = requests.iterator();
2135 public CacheCollectionResult allCaches() {
2136 throw new UnsupportedOperationException();
2139 public CacheEntryBase iterate(int level) {
2140 if(iterator.hasNext()) {
2141 ExternalRead<?> request = iterator.next();
2142 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2143 if (entry != null) return entry;
2144 else return iterate(level);
2146 iterator = requests.iterator();
2151 public void remove() {
2152 throw new UnsupportedOperationException();
2155 public void setLevel(CacheEntryBase entry, int level) {
2156 throw new UnsupportedOperationException();
2159 public Collection<CacheEntry> getRootList() {
2160 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
2161 for (ExternalRead<?> request : requests) {
2162 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2169 public int getCurrentSize() {
2173 public int calculateCurrentSize() {
2174 // This tells the collector to attempt collecting everything.
2175 return Integer.MAX_VALUE;
2178 public boolean start(boolean flush) {
2182 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
2185 public void scanPending() {
2187 cache.scanPending();
2191 public ReadGraphImpl graphForVirtualRequest() {
2192 return ReadGraphImpl.createAsync(this);
2196 private HashMap<Resource, Class<?>> builtinValues;
2198 public Class<?> getBuiltinValue(Resource r) {
2199 if(builtinValues == null) initBuiltinValues();
2200 return builtinValues.get(r);
2203 Exception callerException = null;
2205 public interface AsyncBarrier {
2208 // public void inc(String debug);
2209 // public void dec(String debug);
2212 // final public QueryProcessor processor;
2213 // final public QuerySupport support;
2215 // boolean disposed = false;
2217 private void initBuiltinValues() {
2219 Layer0 b = getSession().peekService(Layer0.class);
2220 if(b == null) return;
2222 builtinValues = new HashMap<Resource, Class<?>>();
2224 builtinValues.put(b.String, String.class);
2225 builtinValues.put(b.Double, Double.class);
2226 builtinValues.put(b.Float, Float.class);
2227 builtinValues.put(b.Long, Long.class);
2228 builtinValues.put(b.Integer, Integer.class);
2229 builtinValues.put(b.Byte, Byte.class);
2230 builtinValues.put(b.Boolean, Boolean.class);
2232 builtinValues.put(b.StringArray, String[].class);
2233 builtinValues.put(b.DoubleArray, double[].class);
2234 builtinValues.put(b.FloatArray, float[].class);
2235 builtinValues.put(b.LongArray, long[].class);
2236 builtinValues.put(b.IntegerArray, int[].class);
2237 builtinValues.put(b.ByteArray, byte[].class);
2238 builtinValues.put(b.BooleanArray, boolean[].class);
2242 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
2244 // if (null == provider2) {
2245 // this.processor = null;
2249 // this.processor = provider2;
2250 // support = provider2.getCore();
2251 // initBuiltinValues();
2255 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2256 // return new ReadGraphSupportImpl(impl.processor);
2260 final public Session getSession() {
2264 final public ResourceSupport getResourceSupport() {
2265 return resourceSupport;
2269 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2271 throw new UnsupportedOperationException();
2273 // assert(subject != null);
2274 // assert(procedure != null);
2276 // final ListenerBase listener = getListenerBase(procedure);
2278 // IntProcedure ip = new IntProcedure() {
2280 // AtomicBoolean first = new AtomicBoolean(true);
2283 // public void execute(ReadGraphImpl graph, int i) {
2285 // if(first.get()) {
2286 // procedure.execute(graph, querySupport.getResource(i));
2288 // procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2290 // } catch (Throwable t2) {
2291 // Logger.defaultLogError(t2);
2296 // public void finished(ReadGraphImpl graph) {
2298 // if(first.compareAndSet(true, false)) {
2299 // procedure.finished(graph);
2300 //// impl.state.barrier.dec(this);
2302 // procedure.finished(impl.newRestart(graph));
2305 // } catch (Throwable t2) {
2306 // Logger.defaultLogError(t2);
2311 // public void exception(ReadGraphImpl graph, Throwable t) {
2313 // if(first.compareAndSet(true, false)) {
2314 // procedure.exception(graph, t);
2316 // procedure.exception(impl.newRestart(graph), t);
2318 // } catch (Throwable t2) {
2319 // Logger.defaultLogError(t2);
2325 // int sId = querySupport.getId(subject);
2328 // QueryCache.runnerPredicates(impl, sId, impl.parent, listener, ip);
2329 // } catch (DatabaseException e) {
2330 // Logger.defaultLogError(e);
2336 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2338 throw new UnsupportedOperationException();
2340 // assert(subject != null);
2341 // assert(procedure != null);
2343 // final ListenerBase listener = getListenerBase(procedure);
2346 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2349 // public void execute(ReadGraphImpl graph, int i) {
2351 // procedure.execute(querySupport.getResource(i));
2352 // } catch (Throwable t2) {
2353 // Logger.defaultLogError(t2);
2358 // public void finished(ReadGraphImpl graph) {
2360 // procedure.finished();
2361 // } catch (Throwable t2) {
2362 // Logger.defaultLogError(t2);
2364 //// impl.state.barrier.dec();
2368 // public void exception(ReadGraphImpl graph, Throwable t) {
2370 // procedure.exception(t);
2371 // } catch (Throwable t2) {
2372 // Logger.defaultLogError(t2);
2374 //// impl.state.barrier.dec();
2378 // } catch (DatabaseException e) {
2379 // Logger.defaultLogError(e);
2385 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2386 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2390 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2391 final Resource predicate, final MultiProcedure<Statement> procedure) {
2393 assert(subject != null);
2394 assert(predicate != null);
2395 assert(procedure != null);
2397 final ListenerBase listener = getListenerBase(procedure);
2399 // impl.state.barrier.inc();
2402 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2405 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2407 procedure.execute(querySupport.getStatement(s, p, o));
2408 } catch (Throwable t2) {
2409 Logger.defaultLogError(t2);
2414 public void finished(ReadGraphImpl graph) {
2416 procedure.finished();
2417 } catch (Throwable t2) {
2418 Logger.defaultLogError(t2);
2420 // impl.state.barrier.dec();
2424 public void exception(ReadGraphImpl graph, Throwable t) {
2426 procedure.exception(t);
2427 } catch (Throwable t2) {
2428 Logger.defaultLogError(t2);
2430 // impl.state.barrier.dec();
2434 } catch (DatabaseException e) {
2435 Logger.defaultLogError(e);
2441 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2442 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2444 assert(subject != null);
2445 assert(predicate != null);
2446 assert(procedure != null);
2448 final ListenerBase listener = getListenerBase(procedure);
2450 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2452 boolean first = true;
2455 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2458 procedure.execute(graph, querySupport.getStatement(s, p, o));
2460 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2462 } catch (Throwable t2) {
2463 Logger.defaultLogError(t2);
2468 public void finished(ReadGraphImpl graph) {
2473 procedure.finished(graph);
2474 // impl.state.barrier.dec(this);
2476 procedure.finished(impl.newRestart(graph));
2478 } catch (Throwable t2) {
2479 Logger.defaultLogError(t2);
2485 public void exception(ReadGraphImpl graph, Throwable t) {
2490 procedure.exception(graph, t);
2491 // impl.state.barrier.dec(this);
2493 procedure.exception(impl.newRestart(graph), t);
2495 } catch (Throwable t2) {
2496 Logger.defaultLogError(t2);
2503 int sId = querySupport.getId(subject);
2504 int pId = querySupport.getId(predicate);
2506 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2507 // else impl.state.barrier.inc(null, null);
2510 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2511 } catch (DatabaseException e) {
2512 Logger.defaultLogError(e);
2518 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2519 final Resource predicate, final StatementProcedure procedure) {
2521 assert(subject != null);
2522 assert(predicate != null);
2523 assert(procedure != null);
2525 final ListenerBase listener = getListenerBase(procedure);
2527 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2529 boolean first = true;
2532 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2535 procedure.execute(graph, s, p, o);
2537 procedure.execute(impl.newRestart(graph), s, p, o);
2539 } catch (Throwable t2) {
2540 Logger.defaultLogError(t2);
2545 public void finished(ReadGraphImpl graph) {
2550 procedure.finished(graph);
2551 // impl.state.barrier.dec(this);
2553 procedure.finished(impl.newRestart(graph));
2555 } catch (Throwable t2) {
2556 Logger.defaultLogError(t2);
2562 public void exception(ReadGraphImpl graph, Throwable t) {
2567 procedure.exception(graph, t);
2568 // impl.state.barrier.dec(this);
2570 procedure.exception(impl.newRestart(graph), t);
2572 } catch (Throwable t2) {
2573 Logger.defaultLogError(t2);
2580 int sId = querySupport.getId(subject);
2581 int pId = querySupport.getId(predicate);
2583 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2584 // else impl.state.barrier.inc(null, null);
2587 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2588 } catch (DatabaseException e) {
2589 Logger.defaultLogError(e);
2595 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2597 assert(subject != null);
2598 assert(predicate != null);
2599 assert(procedure != null);
2601 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2603 private Set<Statement> current = null;
2604 private Set<Statement> run = new HashSet<Statement>();
2607 public void execute(AsyncReadGraph graph, Statement result) {
2609 boolean found = false;
2611 if(current != null) {
2613 found = current.remove(result);
2617 if(!found) procedure.add(graph, result);
2624 public void finished(AsyncReadGraph graph) {
2626 if(current != null) {
2627 for(Statement r : current) procedure.remove(graph, r);
2632 run = new HashSet<Statement>();
2637 public void exception(AsyncReadGraph graph, Throwable t) {
2638 procedure.exception(graph, t);
2642 public boolean isDisposed() {
2643 return procedure.isDisposed();
2651 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2652 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2654 assert(subject != null);
2655 assert(predicate != null);
2656 assert(procedure != null);
2658 final ListenerBase listener = getListenerBase(procedure);
2660 // impl.state.barrier.inc();
2663 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2666 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2668 procedure.execute(graph, querySupport.getStatement(s, p, o));
2669 } catch (Throwable t2) {
2670 Logger.defaultLogError(t2);
2675 public void finished(ReadGraphImpl graph) {
2677 procedure.finished(graph);
2678 } catch (Throwable t2) {
2679 Logger.defaultLogError(t2);
2681 // impl.state.barrier.dec();
2685 public void exception(ReadGraphImpl graph, Throwable t) {
2687 procedure.exception(graph, t);
2688 } catch (Throwable t2) {
2689 Logger.defaultLogError(t2);
2691 // impl.state.barrier.dec();
2695 } catch (DatabaseException e) {
2696 Logger.defaultLogError(e);
2701 private static ListenerBase getListenerBase(Object procedure) {
2702 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2707 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2709 assert(subject != null);
2710 assert(predicate != null);
2711 assert(procedure != null);
2713 final ListenerBase listener = getListenerBase(procedure);
2715 // impl.state.barrier.inc();
2718 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2721 public void execute(ReadGraphImpl graph, int i) {
2723 procedure.execute(querySupport.getResource(i));
2724 } catch (Throwable t2) {
2725 Logger.defaultLogError(t2);
2730 public void finished(ReadGraphImpl graph) {
2732 procedure.finished();
2733 } catch (Throwable t2) {
2734 Logger.defaultLogError(t2);
2736 // impl.state.barrier.dec();
2740 public void exception(ReadGraphImpl graph, Throwable t) {
2741 System.out.println("forEachObject exception " + t);
2743 procedure.exception(t);
2744 } catch (Throwable t2) {
2745 Logger.defaultLogError(t2);
2747 // impl.state.barrier.dec();
2751 } catch (DatabaseException e) {
2752 Logger.defaultLogError(e);
2758 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
2760 assert(subject != null);
2761 assert(procedure != null);
2763 final ListenerBase listener = getListenerBase(procedure);
2765 int sId = querySupport.getId(subject);
2768 QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
2771 public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
2772 procedure.execute(graph, result);
2776 public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
2777 procedure.exception(graph, throwable);
2781 } catch (DatabaseException e) {
2782 Logger.defaultLogError(e);
2787 final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
2789 // assert(subject != null);
2790 // assert(procedure != null);
2792 // final ListenerBase listener = getListenerBase(procedure);
2794 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2796 return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
2801 // final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2803 // assert(subject != null);
2804 // assert(procedure != null);
2806 // final ListenerBase listener = getListenerBase(procedure);
2808 // org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2812 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2815 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2817 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2819 private Resource single = null;
2822 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2823 if(single == null) {
2826 single = INVALID_RESOURCE;
2831 public synchronized void finished(AsyncReadGraph graph) {
2832 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2833 else procedure.execute(graph, single);
2837 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2838 procedure.exception(graph, throwable);
2845 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2847 final int sId = querySupport.getId(subject);
2848 final int pId = querySupport.getId(predicate);
2851 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2852 } catch (DatabaseException e) {
2853 Logger.defaultLogError(e);
2858 static class Runner2Procedure implements IntProcedure {
2860 public int single = 0;
2861 public Throwable t = null;
2863 public void clear() {
2869 public void execute(ReadGraphImpl graph, int i) {
2870 if(single == 0) single = i;
2875 public void finished(ReadGraphImpl graph) {
2876 if(single == -1) single = 0;
2880 public void exception(ReadGraphImpl graph, Throwable throwable) {
2885 public int get() throws DatabaseException {
2887 if(t instanceof DatabaseException) throw (DatabaseException)t;
2888 else throw new DatabaseException(t);
2895 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2897 final int sId = querySupport.getId(subject);
2898 final int pId = querySupport.getId(predicate);
2900 Runner2Procedure proc = new Runner2Procedure();
2901 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2906 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2908 assert(subject != null);
2909 assert(predicate != null);
2911 final ListenerBase listener = getListenerBase(procedure);
2913 if(impl.parent != null || listener != null) {
2915 IntProcedure ip = new IntProcedure() {
2917 AtomicBoolean first = new AtomicBoolean(true);
2920 public void execute(ReadGraphImpl graph, int i) {
2923 procedure.execute(impl, querySupport.getResource(i));
2925 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2927 } catch (Throwable t2) {
2928 Logger.defaultLogError(t2);
2934 public void finished(ReadGraphImpl graph) {
2936 if(first.compareAndSet(true, false)) {
2937 procedure.finished(impl);
2938 // impl.state.barrier.dec(this);
2940 procedure.finished(impl.newRestart(graph));
2942 } catch (Throwable t2) {
2943 Logger.defaultLogError(t2);
2948 public void exception(ReadGraphImpl graph, Throwable t) {
2950 procedure.exception(graph, t);
2951 } catch (Throwable t2) {
2952 Logger.defaultLogError(t2);
2954 // impl.state.barrier.dec(this);
2958 public String toString() {
2959 return "forEachObject with " + procedure;
2964 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2965 // else impl.state.barrier.inc(null, null);
2967 forEachObject(impl, subject, predicate, listener, ip);
2971 IntProcedure ip = new IntProcedure() {
2974 public void execute(ReadGraphImpl graph, int i) {
2975 procedure.execute(graph, querySupport.getResource(i));
2979 public void finished(ReadGraphImpl graph) {
2980 procedure.finished(graph);
2984 public void exception(ReadGraphImpl graph, Throwable t) {
2985 procedure.exception(graph, t);
2989 public String toString() {
2990 return "forEachObject with " + procedure;
2995 forEachObject(impl, subject, predicate, listener, ip);
3002 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3004 assert(subject != null);
3005 assert(predicate != null);
3006 assert(procedure != null);
3008 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3010 private Set<Resource> current = null;
3011 private Set<Resource> run = new HashSet<Resource>();
3014 public void execute(AsyncReadGraph graph, Resource result) {
3016 boolean found = false;
3018 if(current != null) {
3020 found = current.remove(result);
3024 if(!found) procedure.add(graph, result);
3031 public void finished(AsyncReadGraph graph) {
3033 if(current != null) {
3034 for(Resource r : current) procedure.remove(graph, r);
3039 run = new HashSet<Resource>();
3044 public boolean isDisposed() {
3045 return procedure.isDisposed();
3049 public void exception(AsyncReadGraph graph, Throwable t) {
3050 procedure.exception(graph, t);
3054 public String toString() {
3055 return "forObjectSet " + procedure;
3063 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3065 assert(subject != null);
3066 assert(procedure != null);
3068 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
3070 private Set<Resource> current = null;
3071 private Set<Resource> run = new HashSet<Resource>();
3074 public void execute(AsyncReadGraph graph, Resource result) {
3076 boolean found = false;
3078 if(current != null) {
3080 found = current.remove(result);
3084 if(!found) procedure.add(graph, result);
3091 public void finished(AsyncReadGraph graph) {
3093 if(current != null) {
3094 for(Resource r : current) procedure.remove(graph, r);
3099 run = new HashSet<Resource>();
3104 public boolean isDisposed() {
3105 return procedure.isDisposed();
3109 public void exception(AsyncReadGraph graph, Throwable t) {
3110 procedure.exception(graph, t);
3114 public String toString() {
3115 return "forPredicateSet " + procedure;
3123 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3125 assert(subject != null);
3126 assert(procedure != null);
3128 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
3130 private Set<Resource> current = null;
3131 private Set<Resource> run = new HashSet<Resource>();
3134 public void execute(AsyncReadGraph graph, Resource result) {
3136 boolean found = false;
3138 if(current != null) {
3140 found = current.remove(result);
3144 if(!found) procedure.add(graph, result);
3151 public void finished(AsyncReadGraph graph) {
3153 if(current != null) {
3154 for(Resource r : current) procedure.remove(graph, r);
3159 run = new HashSet<Resource>();
3164 public boolean isDisposed() {
3165 return procedure.isDisposed();
3169 public void exception(AsyncReadGraph graph, Throwable t) {
3170 procedure.exception(graph, t);
3174 public String toString() {
3175 return "forPrincipalTypeSet " + procedure;
3183 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3185 assert(subject != null);
3186 assert(predicate != null);
3187 assert(procedure != null);
3189 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3191 private Set<Resource> current = null;
3192 private Set<Resource> run = new HashSet<Resource>();
3195 public void execute(AsyncReadGraph graph, Resource result) {
3197 boolean found = false;
3199 if(current != null) {
3201 found = current.remove(result);
3205 if(!found) procedure.add(graph, result);
3212 public void finished(AsyncReadGraph graph) {
3214 if(current != null) {
3215 for(Resource r : current) procedure.remove(graph, r);
3220 run = new HashSet<Resource>();
3225 public boolean isDisposed() {
3226 return procedure.isDisposed();
3230 public void exception(AsyncReadGraph graph, Throwable t) {
3231 procedure.exception(graph, t);
3235 public String toString() {
3236 return "forObjectSet " + procedure;
3244 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3246 assert(subject != null);
3247 assert(predicate != null);
3248 assert(procedure != null);
3250 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3252 private Set<Statement> current = null;
3253 private Set<Statement> run = new HashSet<Statement>();
3256 public void execute(AsyncReadGraph graph, Statement result) {
3258 boolean found = false;
3260 if(current != null) {
3262 found = current.remove(result);
3266 if(!found) procedure.add(graph, result);
3273 public void finished(AsyncReadGraph graph) {
3275 if(current != null) {
3276 for(Statement s : current) procedure.remove(graph, s);
3281 run = new HashSet<Statement>();
3286 public boolean isDisposed() {
3287 return procedure.isDisposed();
3291 public void exception(AsyncReadGraph graph, Throwable t) {
3292 procedure.exception(graph, t);
3296 public String toString() {
3297 return "forStatementSet " + procedure;
3305 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
3306 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3308 assert(subject != null);
3309 assert(predicate != null);
3310 assert(procedure != null);
3312 final ListenerBase listener = getListenerBase(procedure);
3315 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3318 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3320 procedure.execute(graph, querySupport.getResource(o));
3321 } catch (Throwable t2) {
3322 Logger.defaultLogError(t2);
3327 public void finished(ReadGraphImpl graph) {
3329 procedure.finished(graph);
3330 } catch (Throwable t2) {
3331 Logger.defaultLogError(t2);
3333 // impl.state.barrier.dec();
3337 public void exception(ReadGraphImpl graph, Throwable t) {
3339 procedure.exception(graph, t);
3340 } catch (Throwable t2) {
3341 Logger.defaultLogError(t2);
3343 // impl.state.barrier.dec();
3347 } catch (DatabaseException e) {
3348 Logger.defaultLogError(e);
3354 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3356 assert(subject != null);
3357 assert(procedure != null);
3359 final ListenerBase listener = getListenerBase(procedure);
3361 IntProcedure ip = new IntProcedure() {
3364 public void execute(ReadGraphImpl graph, int i) {
3366 procedure.execute(graph, querySupport.getResource(i));
3367 } catch (Throwable t2) {
3368 Logger.defaultLogError(t2);
3373 public void finished(ReadGraphImpl graph) {
3375 procedure.finished(graph);
3376 } catch (Throwable t2) {
3377 Logger.defaultLogError(t2);
3379 // impl.state.barrier.dec(this);
3383 public void exception(ReadGraphImpl graph, Throwable t) {
3385 procedure.exception(graph, t);
3386 } catch (Throwable t2) {
3387 Logger.defaultLogError(t2);
3389 // impl.state.barrier.dec(this);
3394 int sId = querySupport.getId(subject);
3396 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3397 // else impl.state.barrier.inc(null, null);
3400 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3401 } catch (DatabaseException e) {
3402 Logger.defaultLogError(e);
3408 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3410 assert(subject != null);
3411 assert(procedure != null);
3413 final ListenerBase listener = getListenerBase(procedure);
3415 // impl.state.barrier.inc();
3418 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3421 public void execute(ReadGraphImpl graph, int i) {
3423 procedure.execute(querySupport.getResource(i));
3424 } catch (Throwable t2) {
3425 Logger.defaultLogError(t2);
3430 public void finished(ReadGraphImpl graph) {
3432 procedure.finished();
3433 } catch (Throwable t2) {
3434 Logger.defaultLogError(t2);
3436 // impl.state.barrier.dec();
3440 public void exception(ReadGraphImpl graph, Throwable t) {
3442 procedure.exception(t);
3443 } catch (Throwable t2) {
3444 Logger.defaultLogError(t2);
3446 // impl.state.barrier.dec();
3450 } catch (DatabaseException e) {
3451 Logger.defaultLogError(e);
3455 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3457 assert(subject != null);
3458 assert(procedure != null);
3460 final ListenerBase listener = getListenerBase(procedure);
3461 assert(listener == null);
3463 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3466 public void execute(final ReadGraphImpl graph, IntSet set) {
3467 procedure.execute(graph, set);
3471 public void exception(ReadGraphImpl graph, Throwable t) {
3472 procedure.exception(graph, t);
3477 int sId = querySupport.getId(subject);
3480 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3481 } catch (DatabaseException e) {
3482 Logger.defaultLogError(e);
3488 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3490 assert(subject != null);
3492 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3497 final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3499 assert(subject != null);
3501 return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null);
3506 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3508 assert(subject != null);
3509 assert(procedure != null);
3511 final ListenerBase listener = getListenerBase(procedure);
3514 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3516 AtomicBoolean first = new AtomicBoolean(true);
3519 public void execute(final ReadGraphImpl graph, IntSet set) {
3520 // final HashSet<Resource> result = new HashSet<Resource>();
3521 // set.forEach(new TIntProcedure() {
3524 // public boolean execute(int type) {
3525 // result.add(querySupport.getResource(type));
3531 if(first.compareAndSet(true, false)) {
3532 procedure.execute(graph, set);
3533 // impl.state.barrier.dec();
3535 procedure.execute(impl.newRestart(graph), set);
3537 } catch (Throwable t2) {
3538 Logger.defaultLogError(t2);
3543 public void exception(ReadGraphImpl graph, Throwable t) {
3545 if(first.compareAndSet(true, false)) {
3546 procedure.exception(graph, t);
3547 // impl.state.barrier.dec();
3549 procedure.exception(impl.newRestart(graph), t);
3551 } catch (Throwable t2) {
3552 Logger.defaultLogError(t2);
3557 } catch (DatabaseException e) {
3558 Logger.defaultLogError(e);
3564 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3566 assert(subject != null);
3567 assert(procedure != null);
3569 final ListenerBase listener = getListenerBase(procedure);
3571 IntProcedure ip = new IntProcedureAdapter() {
3574 public void execute(final ReadGraphImpl graph, int superRelation) {
3576 procedure.execute(graph, querySupport.getResource(superRelation));
3577 } catch (Throwable t2) {
3578 Logger.defaultLogError(t2);
3583 public void finished(final ReadGraphImpl graph) {
3585 procedure.finished(graph);
3586 } catch (Throwable t2) {
3587 Logger.defaultLogError(t2);
3589 // impl.state.barrier.dec(this);
3594 public void exception(ReadGraphImpl graph, Throwable t) {
3596 procedure.exception(graph, t);
3597 } catch (Throwable t2) {
3598 Logger.defaultLogError(t2);
3600 // impl.state.barrier.dec(this);
3605 int sId = querySupport.getId(subject);
3607 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3608 // else impl.state.barrier.inc(null, null);
3611 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3612 } catch (DatabaseException e) {
3613 Logger.defaultLogError(e);
3616 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3621 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3623 assert(subject != null);
3624 assert(procedure != null);
3626 final ListenerBase listener = getListenerBase(procedure);
3628 // impl.state.barrier.inc();
3630 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3635 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3637 assert(subject != null);
3638 assert(procedure != null);
3640 final ListenerBase listener = getListenerBase(procedure);
3642 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3645 public void execute(final ReadGraphImpl graph, IntSet set) {
3646 // final HashSet<Resource> result = new HashSet<Resource>();
3647 // set.forEach(new TIntProcedure() {
3650 // public boolean execute(int type) {
3651 // result.add(querySupport.getResource(type));
3657 procedure.execute(graph, set);
3658 } catch (Throwable t2) {
3659 Logger.defaultLogError(t2);
3661 // impl.state.barrier.dec(this);
3665 public void exception(ReadGraphImpl graph, Throwable t) {
3667 procedure.exception(graph, t);
3668 } catch (Throwable t2) {
3669 Logger.defaultLogError(t2);
3671 // impl.state.barrier.dec(this);
3676 int sId = querySupport.getId(subject);
3678 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3679 // else impl.state.barrier.inc(null, null);
3682 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3683 } catch (DatabaseException e) {
3684 Logger.defaultLogError(e);
3689 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3690 return getValue(impl, querySupport.getId(subject));
3693 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3694 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3698 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3700 assert(subject != null);
3701 assert(procedure != null);
3703 int sId = querySupport.getId(subject);
3705 // if(procedure != null) {
3707 final ListenerBase listener = getListenerBase(procedure);
3709 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3711 AtomicBoolean first = new AtomicBoolean(true);
3714 public void execute(ReadGraphImpl graph, byte[] result) {
3716 if(first.compareAndSet(true, false)) {
3717 procedure.execute(graph, result);
3718 // impl.state.barrier.dec(this);
3720 procedure.execute(impl.newRestart(graph), result);
3722 } catch (Throwable t2) {
3723 Logger.defaultLogError(t2);
3728 public void exception(ReadGraphImpl graph, Throwable t) {
3730 if(first.compareAndSet(true, false)) {
3731 procedure.exception(graph, t);
3732 // impl.state.barrier.dec(this);
3734 procedure.exception(impl.newRestart(graph), t);
3736 } catch (Throwable t2) {
3737 Logger.defaultLogError(t2);
3743 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3744 // else impl.state.barrier.inc(null, null);
3747 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3748 } catch (DatabaseException e) {
3749 throw new IllegalStateException("Internal error");
3754 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3758 // throw new IllegalStateException("Internal error");
3763 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3765 assert(subject != null);
3766 assert(procedure != null);
3768 final ListenerBase listener = getListenerBase(procedure);
3770 if(impl.parent != null || listener != null) {
3772 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3774 AtomicBoolean first = new AtomicBoolean(true);
3777 public void execute(ReadGraphImpl graph, byte[] result) {
3779 if(first.compareAndSet(true, false)) {
3780 procedure.execute(graph, result);
3781 // impl.state.barrier.dec(this);
3783 procedure.execute(impl.newRestart(graph), result);
3785 } catch (Throwable t2) {
3786 Logger.defaultLogError(t2);
3791 public void exception(ReadGraphImpl graph, Throwable t) {
3793 if(first.compareAndSet(true, false)) {
3794 procedure.exception(graph, t);
3795 // impl.state.barrier.dec(this);
3797 procedure.exception(impl.newRestart(graph), t);
3799 } catch (Throwable t2) {
3800 Logger.defaultLogError(t2);
3806 int sId = querySupport.getId(subject);
3808 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3809 // else impl.state.barrier.inc(null, null);
3812 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3813 } catch (DatabaseException e) {
3814 Logger.defaultLogError(e);
3819 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3822 public void execute(ReadGraphImpl graph, byte[] result) {
3824 procedure.execute(graph, result);
3829 public void exception(ReadGraphImpl graph, Throwable t) {
3831 procedure.exception(graph, t);
3837 int sId = querySupport.getId(subject);
3840 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3841 } catch (DatabaseException e) {
3842 Logger.defaultLogError(e);
3850 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3852 assert(relation != null);
3853 assert(procedure != null);
3855 final ListenerBase listener = getListenerBase(procedure);
3857 IntProcedure ip = new IntProcedure() {
3859 private int result = 0;
3861 final AtomicBoolean found = new AtomicBoolean(false);
3862 final AtomicBoolean done = new AtomicBoolean(false);
3865 public void finished(ReadGraphImpl graph) {
3867 // Shall fire exactly once!
3868 if(done.compareAndSet(false, true)) {
3871 procedure.exception(graph, new NoInverseException(""));
3872 // impl.state.barrier.dec(this);
3874 procedure.execute(graph, querySupport.getResource(result));
3875 // impl.state.barrier.dec(this);
3877 } catch (Throwable t) {
3878 Logger.defaultLogError(t);
3885 public void execute(ReadGraphImpl graph, int i) {
3887 if(found.compareAndSet(false, true)) {
3890 // Shall fire exactly once!
3891 if(done.compareAndSet(false, true)) {
3893 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3894 // impl.state.barrier.dec(this);
3895 } catch (Throwable t) {
3896 Logger.defaultLogError(t);
3904 public void exception(ReadGraphImpl graph, Throwable t) {
3905 // Shall fire exactly once!
3906 if(done.compareAndSet(false, true)) {
3908 procedure.exception(graph, t);
3909 // impl.state.barrier.dec(this);
3910 } catch (Throwable t2) {
3911 Logger.defaultLogError(t2);
3918 int sId = querySupport.getId(relation);
3920 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3921 // else impl.state.barrier.inc(null, null);
3924 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3925 } catch (DatabaseException e) {
3926 Logger.defaultLogError(e);
3932 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3935 assert(procedure != null);
3937 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3940 public void execute(ReadGraphImpl graph, Integer result) {
3942 procedure.execute(graph, querySupport.getResource(result));
3943 } catch (Throwable t2) {
3944 Logger.defaultLogError(t2);
3946 // impl.state.barrier.dec(this);
3950 public void exception(ReadGraphImpl graph, Throwable t) {
3953 procedure.exception(graph, t);
3954 } catch (Throwable t2) {
3955 Logger.defaultLogError(t2);
3957 // impl.state.barrier.dec(this);
3962 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3963 // else impl.state.barrier.inc(null, null);
3965 forResource(impl, id, impl.parent, ip);
3970 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3973 assert(procedure != null);
3975 // impl.state.barrier.inc();
3978 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
3981 public void execute(ReadGraphImpl graph, Integer result) {
3983 procedure.execute(graph, querySupport.getResource(result));
3984 } catch (Throwable t2) {
3985 Logger.defaultLogError(t2);
3987 // impl.state.barrier.dec();
3991 public void exception(ReadGraphImpl graph, Throwable t) {
3993 procedure.exception(graph, t);
3994 } catch (Throwable t2) {
3995 Logger.defaultLogError(t2);
3997 // impl.state.barrier.dec();
4001 } catch (DatabaseException e) {
4002 Logger.defaultLogError(e);
4008 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4010 assert(subject != null);
4011 assert(procedure != null);
4013 final ListenerBase listener = getListenerBase(procedure);
4016 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
4017 procedure.execute(impl, !result.isEmpty());
4018 } catch (DatabaseException e) {
4019 procedure.exception(impl, e);
4025 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
4027 assert(subject != null);
4028 assert(predicate != null);
4029 assert(procedure != null);
4031 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
4033 boolean found = false;
4036 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4041 synchronized public void finished(AsyncReadGraph graph) {
4043 procedure.execute(graph, found);
4044 } catch (Throwable t2) {
4045 Logger.defaultLogError(t2);
4047 // impl.state.barrier.dec(this);
4051 public void exception(AsyncReadGraph graph, Throwable t) {
4053 procedure.exception(graph, t);
4054 } catch (Throwable t2) {
4055 Logger.defaultLogError(t2);
4057 // impl.state.barrier.dec(this);
4062 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
4063 // else impl.state.barrier.inc(null, null);
4065 forEachObject(impl, subject, predicate, ip);
4070 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
4072 assert(subject != null);
4073 assert(predicate != null);
4074 assert(procedure != null);
4076 // impl.state.barrier.inc();
4078 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
4080 boolean found = false;
4083 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4084 if(resource.equals(object)) found = true;
4088 synchronized public void finished(AsyncReadGraph graph) {
4090 procedure.execute(graph, found);
4091 } catch (Throwable t2) {
4092 Logger.defaultLogError(t2);
4094 // impl.state.barrier.dec();
4098 public void exception(AsyncReadGraph graph, Throwable t) {
4100 procedure.exception(graph, t);
4101 } catch (Throwable t2) {
4102 Logger.defaultLogError(t2);
4104 // impl.state.barrier.dec();
4112 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4114 assert(subject != null);
4115 assert(procedure != null);
4117 final ListenerBase listener = getListenerBase(procedure);
4119 // impl.state.barrier.inc();
4122 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
4125 public void execute(ReadGraphImpl graph, byte[] object) {
4126 boolean result = object != null;
4128 procedure.execute(graph, result);
4129 } catch (Throwable t2) {
4130 Logger.defaultLogError(t2);
4132 // impl.state.barrier.dec();
4136 public void exception(ReadGraphImpl graph, Throwable t) {
4138 procedure.exception(graph, t);
4139 } catch (Throwable t2) {
4140 Logger.defaultLogError(t2);
4142 // impl.state.barrier.dec();
4146 } catch (DatabaseException e) {
4147 Logger.defaultLogError(e);
4153 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4155 assert(subject != null);
4156 assert(procedure != null);
4158 final ListenerBase listener = getListenerBase(procedure);
4162 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
4165 public void exception(ReadGraphImpl graph, Throwable t) {
4167 procedure.exception(graph, t);
4168 } catch (Throwable t2) {
4169 Logger.defaultLogError(t2);
4171 // impl.state.barrier.dec();
4175 public void execute(ReadGraphImpl graph, int i) {
4177 procedure.execute(graph, querySupport.getResource(i));
4178 } catch (Throwable t2) {
4179 Logger.defaultLogError(t2);
4184 public void finished(ReadGraphImpl graph) {
4186 procedure.finished(graph);
4187 } catch (Throwable t2) {
4188 Logger.defaultLogError(t2);
4190 // impl.state.barrier.dec();
4194 } catch (DatabaseException e) {
4195 Logger.defaultLogError(e);
4201 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
4203 // assert(request != null);
4204 // assert(procedure != null);
4206 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
4211 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
4213 // assert(graph != null);
4214 // assert(request != null);
4216 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
4217 // if(entry != null && entry.isReady()) {
4218 // return (T)entry.get(graph, this, null);
4220 // return request.perform(graph);
4225 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
4227 // assert(graph != null);
4228 // assert(request != null);
4230 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
4231 // if(entry != null && entry.isReady()) {
4232 // if(entry.isExcepted()) {
4233 // Throwable t = (Throwable)entry.getResult();
4234 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4235 // else throw new DatabaseException(t);
4237 // return (T)entry.getResult();
4241 // final DataContainer<T> result = new DataContainer<T>();
4242 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
4244 // request.register(graph, new Listener<T>() {
4247 // public void exception(Throwable t) {
4248 // exception.set(t);
4252 // public void execute(T t) {
4257 // public boolean isDisposed() {
4263 // Throwable t = exception.get();
4265 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4266 // else throw new DatabaseException(t);
4269 // return result.get();
4276 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
4278 // assert(graph != null);
4279 // assert(request != null);
4281 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
4282 // if(entry != null && entry.isReady()) {
4283 // if(entry.isExcepted()) {
4284 // procedure.exception(graph, (Throwable)entry.getResult());
4286 // procedure.execute(graph, (T)entry.getResult());
4289 // request.perform(graph, procedure);
4295 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final SyncMultiProcedure<T> procedure, ListenerBase listener) {
4297 assert(request != null);
4298 assert(procedure != null);
4302 queryMultiRead(impl, request, parent, listener, procedure);
4304 } catch (DatabaseException e) {
4306 throw new IllegalStateException(e);
4313 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4315 assert(request != null);
4316 assert(procedure != null);
4318 // impl.state.barrier.inc();
4320 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4322 public void execute(AsyncReadGraph graph, T result) {
4325 procedure.execute(graph, result);
4326 } catch (Throwable t2) {
4327 Logger.defaultLogError(t2);
4332 public void finished(AsyncReadGraph graph) {
4335 procedure.finished(graph);
4336 } catch (Throwable t2) {
4337 Logger.defaultLogError(t2);
4340 // impl.state.barrier.dec();
4345 public String toString() {
4346 return procedure.toString();
4350 public void exception(AsyncReadGraph graph, Throwable t) {
4353 procedure.exception(graph, t);
4354 } catch (Throwable t2) {
4355 Logger.defaultLogError(t2);
4358 // impl.state.barrier.dec();
4367 // final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
4369 // assert(request != null);
4370 // assert(procedure != null);
4374 // queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4377 // public String toString() {
4378 // return procedure.toString();
4382 // public void execute(AsyncReadGraph graph, T result) {
4384 // procedure.execute(result);
4385 // } catch (Throwable t2) {
4386 // Logger.defaultLogError(t2);
4391 // public void exception(AsyncReadGraph graph, Throwable throwable) {
4393 // procedure.exception(throwable);
4394 // } catch (Throwable t2) {
4395 // Logger.defaultLogError(t2);
4401 // } catch (DatabaseException e) {
4403 // throw new IllegalStateException(e);
4410 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4412 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4417 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4419 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4424 public VirtualGraph getValueProvider(Resource subject) {
4426 return querySupport.getValueProvider(querySupport.getId(subject));
4430 public boolean resumeTasks(ReadGraphImpl graph) {
4432 return querySupport.resume(graph);
4436 public boolean isImmutable(int resourceId) {
4437 return querySupport.isImmutable(resourceId);
4440 public boolean isImmutable(Resource resource) {
4441 ResourceImpl impl = (ResourceImpl)resource;
4442 return isImmutable(impl.id);
4447 public Layer0 getL0(ReadGraph graph) {
4449 L0 = Layer0.getInstance(graph);
4454 public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
4455 protected Integer initialValue() {