1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.lang.ref.WeakReference;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.IdentityHashMap;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
32 import java.util.concurrent.Semaphore;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.locks.Condition;
36 import java.util.concurrent.locks.ReentrantLock;
38 import org.simantics.databoard.Bindings;
39 import org.simantics.db.AsyncReadGraph;
40 import org.simantics.db.DevelopmentKeys;
41 import org.simantics.db.DirectStatements;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.RelationInfo;
44 import org.simantics.db.Resource;
45 import org.simantics.db.Session;
46 import org.simantics.db.Statement;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
49 import org.simantics.db.common.utils.Logger;
50 import org.simantics.db.debug.ListenerReport;
51 import org.simantics.db.exception.DatabaseException;
52 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
53 import org.simantics.db.exception.NoInverseException;
54 import org.simantics.db.exception.ResourceNotFoundException;
55 import org.simantics.db.impl.DebugPolicy;
56 import org.simantics.db.impl.ResourceImpl;
57 import org.simantics.db.impl.graph.MultiIntProcedure;
58 import org.simantics.db.impl.graph.ReadGraphImpl;
59 import org.simantics.db.impl.graph.ReadGraphSupport;
60 import org.simantics.db.impl.graph.WriteGraphImpl;
61 import org.simantics.db.impl.procedure.IntProcedureAdapter;
62 import org.simantics.db.impl.procedure.InternalProcedure;
63 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
64 import org.simantics.db.impl.support.ResourceSupport;
65 import org.simantics.db.procedure.AsyncMultiListener;
66 import org.simantics.db.procedure.AsyncMultiProcedure;
67 import org.simantics.db.procedure.AsyncProcedure;
68 import org.simantics.db.procedure.AsyncSetListener;
69 import org.simantics.db.procedure.ListenerBase;
70 import org.simantics.db.procedure.MultiProcedure;
71 import org.simantics.db.procedure.Procedure;
72 import org.simantics.db.procedure.StatementProcedure;
73 import org.simantics.db.request.AsyncMultiRead;
74 import org.simantics.db.request.AsyncRead;
75 import org.simantics.db.request.ExternalRead;
76 import org.simantics.db.request.MultiRead;
77 import org.simantics.db.request.Read;
78 import org.simantics.db.request.RequestFlags;
79 import org.simantics.db.request.WriteTraits;
80 import org.simantics.layer0.Layer0;
81 import org.simantics.utils.DataContainer;
82 import org.simantics.utils.Development;
83 import org.simantics.utils.datastructures.Pair;
84 import org.simantics.utils.datastructures.collections.CollectionUtils;
85 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
87 import gnu.trove.procedure.TIntProcedure;
88 import gnu.trove.procedure.TLongProcedure;
89 import gnu.trove.procedure.TObjectProcedure;
90 import gnu.trove.set.hash.THashSet;
91 import gnu.trove.set.hash.TIntHashSet;
93 @SuppressWarnings({"rawtypes", "unchecked"})
94 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
96 public static int indent = 0;
101 public int boundQueries = 0;
104 final private int functionalRelation;
106 final private int superrelationOf;
108 final private int instanceOf;
110 final private int inverseOf;
112 final private int asserts;
114 final private int hasPredicate;
116 final private int hasPredicateInverse;
118 final private int hasObject;
120 final private int inherits;
122 final private int subrelationOf;
124 final private int rootLibrary;
127 * A cache for the root library resource. Initialized in
128 * {@link #getRootLibraryResource()}.
130 private volatile ResourceImpl rootLibraryResource;
132 final private int library;
134 final private int consistsOf;
136 final private int hasName;
138 AtomicInteger sleepers = new AtomicInteger(0);
140 private boolean updating = false;
143 private boolean firingListeners = false;
145 final public QueryCache cache;
146 final public QuerySupport querySupport;
147 final public Session session;
148 final public ResourceSupport resourceSupport;
150 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
152 QueryThread[] executors;
154 public ArrayList<SessionTask>[] queues;
158 INIT, RUN, SLEEP, DISPOSED
162 public ThreadState[] threadStates;
163 public ReentrantLock[] threadLocks;
164 public Condition[] threadConditions;
166 public ArrayList<SessionTask>[] ownTasks;
168 public ArrayList<SessionTask>[] ownSyncTasks;
170 ArrayList<SessionTask>[] delayQueues;
172 final Object querySupportLock;
174 public Long modificationCounter = 0L;
176 public void close() {
179 final public void scheduleOwn(int caller, SessionTask request) {
180 ownTasks[caller].add(request);
183 final public void scheduleAlways(int caller, SessionTask request) {
185 int performer = request.thread;
186 if(caller == performer) {
187 ownTasks[caller].add(request);
189 schedule(caller, request);
194 final public void schedule(int caller, SessionTask request) {
196 int performer = request.thread;
198 if(DebugPolicy.SCHEDULE)
199 System.out.println("schedule " + request + " " + caller + " -> " + performer);
201 assert(performer >= 0);
203 assert(request != null);
205 if(caller == performer) {
208 ReentrantLock queueLock = threadLocks[performer];
210 queues[performer].add(request);
211 // This thread could have been sleeping
212 if(queues[performer].size() == 1) {
213 if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
214 threadConditions[performer].signalAll();
223 final public int THREAD_MASK;
225 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
227 public static abstract class SessionTask {
229 final public int thread;
230 final public int syncCaller;
231 final public Object object;
233 public SessionTask(WriteTraits object, int thread) {
234 this.thread = thread;
235 this.syncCaller = -1;
236 this.object = object;
239 public SessionTask(Object object, int thread, int syncCaller) {
240 this.thread = thread;
241 this.syncCaller = syncCaller;
242 this.object = object;
245 public abstract void run(int thread);
248 public String toString() {
249 return "SessionTask[" + object + "]";
254 public static abstract class SessionRead extends SessionTask {
256 final public Semaphore notify;
257 final public DataContainer<Throwable> throwable;
259 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
260 super(object, thread, thread);
261 this.throwable = throwable;
262 this.notify = notify;
265 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
266 super(object, thread, syncThread);
267 this.throwable = throwable;
268 this.notify = notify;
273 long waitingTime = 0;
276 static int koss2 = 0;
278 public boolean resume(ReadGraphImpl graph) {
279 return executors[0].runSynchronized();
282 //private WeakReference<GarbageTracker> garbageTracker;
284 private class GarbageTracker {
287 protected void finalize() throws Throwable {
289 // System.err.println("GarbageTracker");
291 // garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
299 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
300 throws DatabaseException {
302 //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
305 THREAD_MASK = threads - 1;
308 cache = new QueryCache(core, threads);
309 session = querySupport.getSession();
310 resourceSupport = querySupport.getSupport();
311 querySupportLock = core.getLock();
313 executors = new QueryThread[THREADS];
314 queues = new ArrayList[THREADS];
315 threadLocks = new ReentrantLock[THREADS];
316 threadConditions = new Condition[THREADS];
317 threadStates = new ThreadState[THREADS];
318 ownTasks = new ArrayList[THREADS];
319 ownSyncTasks = new ArrayList[THREADS];
320 delayQueues = new ArrayList[THREADS * THREADS];
322 // freeSchedule = new AtomicInteger(0);
324 for (int i = 0; i < THREADS * THREADS; i++) {
325 delayQueues[i] = new ArrayList<SessionTask>();
328 for (int i = 0; i < THREADS; i++) {
330 // tasks[i] = new ArrayList<Runnable>();
331 ownTasks[i] = new ArrayList<SessionTask>();
332 ownSyncTasks[i] = new ArrayList<SessionTask>();
333 queues[i] = new ArrayList<SessionTask>();
334 threadLocks[i] = new ReentrantLock();
335 threadConditions[i] = threadLocks[i].newCondition();
336 // limits[i] = false;
337 threadStates[i] = ThreadState.INIT;
341 for (int i = 0; i < THREADS; i++) {
345 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
347 threadSet.add(executors[i]);
352 for (int i = 0; i < THREADS; i++) {
353 executors[i].start();
356 // Make sure that query threads are up and running
357 while(sleepers.get() != THREADS) {
360 } catch (InterruptedException e) {
365 rootLibrary = core.getBuiltin("http:/");
366 boolean builtinsInstalled = rootLibrary != 0;
368 if (builtinsInstalled) {
369 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
370 assert (functionalRelation != 0);
372 functionalRelation = 0;
374 if (builtinsInstalled) {
375 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
376 assert (instanceOf != 0);
380 if (builtinsInstalled) {
381 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
382 assert (inverseOf != 0);
387 if (builtinsInstalled) {
388 inherits = core.getBuiltin(Layer0.URIs.Inherits);
389 assert (inherits != 0);
393 if (builtinsInstalled) {
394 asserts = core.getBuiltin(Layer0.URIs.Asserts);
395 assert (asserts != 0);
399 if (builtinsInstalled) {
400 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
401 assert (hasPredicate != 0);
405 if (builtinsInstalled) {
406 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
407 assert (hasPredicateInverse != 0);
409 hasPredicateInverse = 0;
411 if (builtinsInstalled) {
412 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
413 assert (hasObject != 0);
417 if (builtinsInstalled) {
418 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
419 assert (subrelationOf != 0);
423 if (builtinsInstalled) {
424 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
425 assert (superrelationOf != 0);
429 if (builtinsInstalled) {
430 library = core.getBuiltin(Layer0.URIs.Library);
431 assert (library != 0);
435 if (builtinsInstalled) {
436 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
437 assert (consistsOf != 0);
441 if (builtinsInstalled) {
442 hasName = core.getBuiltin(Layer0.URIs.HasName);
443 assert (hasName != 0);
449 final public void releaseWrite(ReadGraphImpl graph) {
450 performDirtyUpdates(graph);
451 modificationCounter++;
454 final public int getId(final Resource r) {
455 return querySupport.getId(r);
458 public QuerySupport getCore() {
462 public int getFunctionalRelation() {
463 return functionalRelation;
466 public int getInherits() {
470 public int getInstanceOf() {
474 public int getInverseOf() {
478 public int getSubrelationOf() {
479 return subrelationOf;
482 public int getSuperrelationOf() {
483 return superrelationOf;
486 public int getAsserts() {
490 public int getHasPredicate() {
494 public int getHasPredicateInverse() {
495 return hasPredicateInverse;
498 public int getHasObject() {
502 public int getRootLibrary() {
506 public Resource getRootLibraryResource() {
507 if (rootLibraryResource == null) {
508 // Synchronization is not needed here, it doesn't matter if multiple
509 // threads simultaneously set rootLibraryResource once.
510 int root = getRootLibrary();
512 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
513 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
515 return rootLibraryResource;
518 public int getLibrary() {
522 public int getConsistsOf() {
526 public int getHasName() {
530 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
534 QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
537 public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
539 if (result != null && result != 0) {
540 procedure.execute(graph, result);
544 // Fall back to using the fixed builtins.
545 // result = querySupport.getBuiltin(id);
546 // if (result != 0) {
547 // procedure.execute(graph, result);
552 // result = querySupport.getRandomAccessReference(id);
553 // } catch (ResourceNotFoundException e) {
554 // procedure.exception(graph, e);
559 procedure.execute(graph, result);
561 procedure.exception(graph, new ResourceNotFoundException(id));
567 public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
568 procedure.exception(graph, t);
572 } catch (DatabaseException e) {
576 procedure.exception(graph, e);
578 } catch (DatabaseException e1) {
580 Logger.defaultLogError(e1);
588 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
590 Integer result = querySupport.getBuiltin(id);
592 procedure.execute(graph, result);
594 procedure.exception(graph, new ResourceNotFoundException(id));
599 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
602 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
603 } catch (DatabaseException e) {
604 throw new IllegalStateException(e);
609 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
613 QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
614 } catch (DatabaseException e) {
615 throw new IllegalStateException(e);
620 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
621 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
625 // public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
627 // return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
631 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
633 QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
637 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
639 QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
643 boolean isBound(ExternalReadEntry<?> entry) {
644 if(entry.hasParents()) return true;
645 else if(hasListener(entry)) return true;
649 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
651 if (parent != null && !inferred) {
653 if(!child.isImmutable(graph))
654 child.addParent(parent);
655 } catch (DatabaseException e) {
656 Logger.defaultLogError(e);
658 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
661 if (listener != null) {
662 return registerListener(child, listener, procedure);
670 static class Dummy implements InternalProcedure<Object>, IntProcedure {
673 public void execute(ReadGraphImpl graph, int i) {
677 public void finished(ReadGraphImpl graph) {
681 public void execute(ReadGraphImpl graph, Object result) {
685 public void exception(ReadGraphImpl graph, Throwable throwable) {
690 private static final Dummy dummy = new Dummy();
693 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
695 if (DebugPolicy.PERFORM)
696 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
699 assert (!collecting);
701 assert(query.assertNotDiscarded());
703 registerDependencies(graph, query, parent, listener, procedure, false);
705 // FRESH, REFUTED, EXCEPTED go here
706 if (!query.isReady()) {
711 query.computeForEach(graph, this, (Procedure)dummy, true);
712 return query.get(graph, this, null);
718 return query.get(graph, this, procedure);
726 interface QueryCollectorSupport {
727 public CacheCollectionResult allCaches();
728 public Collection<CacheEntry> getRootList();
729 public int getCurrentSize();
730 public int calculateCurrentSize();
731 public CacheEntryBase iterate(int level);
732 public void remove();
733 public void setLevel(CacheEntryBase entry, int level);
734 public boolean start(boolean flush);
737 interface QueryCollector {
739 public void collect(int youngTarget, int allowedTimeInMs);
743 class QueryCollectorSupportImpl implements QueryCollectorSupport {
745 private static final boolean DEBUG = false;
746 private static final double ITERATION_RATIO = 0.2;
748 private CacheCollectionResult iteration = new CacheCollectionResult();
749 private boolean fresh = true;
750 private boolean needDataInStart = true;
752 QueryCollectorSupportImpl() {
756 public CacheCollectionResult allCaches() {
757 CacheCollectionResult result = new CacheCollectionResult();
758 QueryProcessor.this.allCaches(result);
763 public boolean start(boolean flush) {
764 // We need new data from query maps
766 if(needDataInStart || flush) {
767 // Last run ended after processing all queries => refresh data
768 restart(flush ? 0.0 : ITERATION_RATIO);
770 // continue with previous big data
772 // Notify caller about iteration situation
773 return iteration.isAtStart();
776 private void restart(double targetRatio) {
778 needDataInStart = true;
780 long start = System.nanoTime();
783 // We need new data from query maps
785 int iterationSize = iteration.size()+1;
786 int diff = calculateCurrentSize()-iterationSize;
788 double ratio = (double)diff / (double)iterationSize;
789 boolean dirty = Math.abs(ratio) >= targetRatio;
792 iteration = allCaches();
794 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
795 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
796 System.err.print(" " + iteration.levels[i].size());
797 System.err.println("");
804 needDataInStart = false;
806 // We are returning here within the same GC round - reuse the cache table
815 public CacheEntryBase iterate(int level) {
817 CacheEntryBase entry = iteration.next(level);
819 restart(ITERATION_RATIO);
823 while(entry != null && entry.isDiscarded()) {
824 entry = iteration.next(level);
832 public void remove() {
837 public void setLevel(CacheEntryBase entry, int level) {
838 iteration.setLevel(entry, level);
841 public Collection<CacheEntry> getRootList() {
842 return cache.getRootList();
846 public int calculateCurrentSize() {
847 return cache.calculateCurrentSize();
851 public int getCurrentSize() {
856 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
858 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
859 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
861 public int querySize() {
865 public void gc(int youngTarget, int allowedTimeInMs) {
867 collector.collect(youngTarget, allowedTimeInMs);
871 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
873 assert (entry != null);
875 if (base.isDisposed())
878 return addListener(entry, base, procedure);
882 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
883 entry.setLastKnown(result);
886 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
888 assert (entry != null);
889 assert (procedure != null);
891 ArrayList<ListenerEntry> list = cache.listeners.get(entry);
893 list = new ArrayList<ListenerEntry>(1);
894 cache.listeners.put(entry, list);
897 ListenerEntry result = new ListenerEntry(entry, base, procedure);
898 int currentIndex = list.indexOf(result);
899 // There was already a listener
900 if(currentIndex > -1) {
901 ListenerEntry current = list.get(currentIndex);
902 if(!current.base.isDisposed()) return null;
903 list.set(currentIndex, result);
908 if(DebugPolicy.LISTENER) {
909 new Exception().printStackTrace();
910 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
917 private void scheduleListener(ListenerEntry entry) {
918 assert (entry != null);
919 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
920 scheduledListeners.add(entry);
923 private void removeListener(ListenerEntry entry) {
924 assert (entry != null);
925 ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
926 if(list == null) return;
927 boolean success = list.remove(entry);
930 cache.listeners.remove(entry.entry);
933 private boolean hasListener(CacheEntry entry) {
934 if(cache.listeners.get(entry) != null) return true;
938 boolean hasListenerAfterDisposing(CacheEntry entry) {
939 if(cache.listeners.get(entry) != null) {
940 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
941 ArrayList<ListenerEntry> list = null;
942 for (ListenerEntry e : entries) {
943 if (e.base.isDisposed()) {
944 if(list == null) list = new ArrayList<ListenerEntry>();
949 for (ListenerEntry e : list) {
953 if (entries.isEmpty()) {
954 cache.listeners.remove(entry);
962 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
963 hasListenerAfterDisposing(entry);
964 if(cache.listeners.get(entry) != null)
965 return cache.listeners.get(entry);
967 return Collections.emptyList();
970 void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
972 if(!workarea.containsKey(entry)) {
974 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
975 for(ListenerEntry e : getListenerEntries(entry))
978 workarea.put(entry, ls);
980 for(CacheEntry parent : entry.getParents(this)) {
981 processListenerReport(parent, workarea);
982 ls.addAll(workarea.get(parent));
989 public synchronized ListenerReport getListenerReport() throws IOException {
991 class ListenerReportImpl implements ListenerReport {
993 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
996 public void print(PrintStream b) {
997 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
998 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
999 for(ListenerBase l : e.getValue()) {
1000 Integer i = hist.get(l);
1001 hist.put(l, i != null ? i-1 : -1);
1005 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1006 b.print("" + -p.second + " " + p.first + "\n");
1014 ListenerReportImpl result = new ListenerReportImpl();
1016 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1017 for(CacheEntryBase entry : all) {
1018 hasListenerAfterDisposing(entry);
1020 for(CacheEntryBase entry : all) {
1021 processListenerReport(entry, result.workarea);
1028 public synchronized String reportListeners(File file) throws IOException {
1033 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1034 ListenerReport report = getListenerReport();
1037 return "Done reporting listeners.";
1041 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1043 if(entry.isDiscarded()) return;
1044 if(workarea.containsKey(entry)) return;
1046 Iterable<CacheEntry> parents = entry.getParents(this);
1047 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1048 for(CacheEntry e : parents) {
1049 if(e.isDiscarded()) continue;
1051 processParentReport(e, workarea);
1053 workarea.put(entry, ps);
1057 public synchronized String reportQueryActivity(File file) throws IOException {
1059 System.err.println("reportQueries " + file.getAbsolutePath());
1064 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1066 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
1067 Collections.reverse(entries);
1069 for(Pair<String,Integer> entry : entries) {
1070 b.println(entry.first + ": " + entry.second);
1075 Development.histogram.clear();
1081 public synchronized String reportQueries(File file) throws IOException {
1083 System.err.println("reportQueries " + file.getAbsolutePath());
1088 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1090 long start = System.nanoTime();
1092 // ArrayList<CacheEntry> all = ;
1094 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
1095 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1096 for(CacheEntryBase entry : caches) {
1097 processParentReport(entry, workarea);
1100 // for(CacheEntry e : all) System.err.println("entry: " + e);
1102 long duration = System.nanoTime() - start;
1103 System.err.println("Query root set in " + 1e-9*duration + "s.");
1105 start = System.nanoTime();
1107 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
1111 for(CacheEntry entry : workarea.keySet()) {
1112 boolean listener = hasListenerAfterDisposing(entry);
1113 boolean hasParents = entry.getParents(this).iterator().hasNext();
1116 flagMap.put(entry, 0);
1117 } else if (!hasParents) {
1119 flagMap.put(entry, 1);
1122 flagMap.put(entry, 2);
1124 // // Write leaf bit
1125 // entry.flags |= 4;
1128 boolean done = true;
1135 long start2 = System.nanoTime();
1137 int boundCounter = 0;
1138 int unboundCounter = 0;
1139 int unknownCounter = 0;
1141 for(CacheEntry<?> entry : workarea.keySet()) {
1143 //System.err.println("process " + entry);
1145 int flags = flagMap.get(entry);
1146 int bindStatus = flags & 3;
1148 if(bindStatus == 0) boundCounter++;
1149 else if(bindStatus == 1) unboundCounter++;
1150 else if(bindStatus == 2) unknownCounter++;
1152 if(bindStatus < 2) continue;
1155 for(CacheEntry parent : entry.getParents(this)) {
1157 if(parent.isDiscarded()) flagMap.put(parent, 1);
1159 int flags2 = flagMap.get(parent);
1160 int bindStatus2 = flags2 & 3;
1161 // Parent is bound => child is bound
1162 if(bindStatus2 == 0) {
1166 // Parent is unknown => child is unknown
1167 else if (bindStatus2 == 2) {
1174 flagMap.put(entry, newStatus);
1178 duration = System.nanoTime() - start2;
1179 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1180 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
1182 } while(!done && loops++ < 20);
1186 for(CacheEntry entry : workarea.keySet()) {
1188 int bindStatus = flagMap.get(entry);
1189 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
1195 duration = System.nanoTime() - start;
1196 System.err.println("Query analysis in " + 1e-9*duration + "s.");
1198 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
1200 for(CacheEntry entry : workarea.keySet()) {
1201 Class<?> clazz = entry.getClass();
1202 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
1203 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
1204 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
1205 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
1206 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
1207 Integer c = counts.get(clazz);
1208 if(c == null) counts.put(clazz, -1);
1209 else counts.put(clazz, c-1);
1212 b.print("// Simantics DB client query report file\n");
1213 b.print("// This file contains the following information\n");
1214 b.print("// -The amount of cached query instances per query class\n");
1215 b.print("// -The sizes of retained child sets\n");
1216 b.print("// -List of parents for each query (search for 'P <query name>')\n");
1217 b.print("// -Followed by status, where\n");
1218 b.print("// -0=bound\n");
1219 b.print("// -1=free\n");
1220 b.print("// -2=unknown\n");
1221 b.print("// -L=has listener\n");
1222 b.print("// -List of children for each query (search for 'C <query name>')\n");
1224 b.print("----------------------------------------\n");
1226 b.print("// Queries by class\n");
1227 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
1228 b.print(-p.second + " " + p.first.getName() + "\n");
1231 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
1232 for(CacheEntry e : workarea.keySet())
1235 boolean changed = true;
1237 while(changed && iter++<50) {
1241 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
1242 for(CacheEntry e : workarea.keySet())
1245 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
1246 Integer c = hist.get(e.getKey());
1247 for(CacheEntry p : e.getValue()) {
1248 Integer i = newHist.get(p);
1249 newHist.put(p, i+c);
1252 for(CacheEntry e : workarea.keySet()) {
1253 Integer value = newHist.get(e);
1254 Integer old = hist.get(e);
1255 if(!value.equals(old)) {
1257 // System.err.println("hist " + e + ": " + old + " => " + value);
1262 System.err.println("Retained set iteration " + iter);
1266 b.print("// Queries by retained set\n");
1267 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1268 b.print("" + -p.second + " " + p.first + "\n");
1271 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
1273 b.print("// Entry parent listing\n");
1274 for(CacheEntry entry : workarea.keySet()) {
1275 int status = flagMap.get(entry);
1276 boolean hasListener = hasListenerAfterDisposing(entry);
1277 b.print("Q " + entry.toString());
1279 b.print(" (L" + status + ")");
1282 b.print(" (" + status + ")");
1285 for(CacheEntry parent : workarea.get(entry)) {
1286 Collection<CacheEntry> inv = inverse.get(parent);
1288 inv = new ArrayList<CacheEntry>();
1289 inverse.put(parent, inv);
1292 b.print(" " + parent.toString());
1297 b.print("// Entry child listing\n");
1298 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
1299 b.print("C " + entry.getKey().toString());
1301 for(CacheEntry child : entry.getValue()) {
1302 Integer h = hist.get(child);
1306 b.print(" <no children>");
1308 b.print(" " + child.toString());
1313 b.print("#queries: " + workarea.keySet().size() + "\n");
1314 b.print("#listeners: " + listeners + "\n");
1318 return "Dumped " + workarea.keySet().size() + " queries.";
1324 public CacheEntry caller;
1326 public CacheEntry entry;
1330 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
1331 this.caller = caller;
1333 this.indent = indent;
1338 boolean removeQuery(CacheEntry entry) {
1340 // This entry has been removed before. No need to do anything here.
1341 if(entry.isDiscarded()) return false;
1343 assert (!entry.isDiscarded());
1345 Query query = entry.getQuery();
1347 query.removeEntry(this);
1352 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
1363 * @return true if this entry is being listened
1365 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
1369 CacheEntry entry = e.entry;
1371 //System.err.println("updateQuery " + entry);
1374 * If the dependency graph forms a DAG, some entries are inserted in the
1375 * todo list many times. They only need to be processed once though.
1377 if (entry.isDiscarded()) {
1378 if (Development.DEVELOPMENT) {
1379 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1380 System.out.print("D");
1381 for (int i = 0; i < e.indent; i++)
1382 System.out.print(" ");
1383 System.out.println(entry.getQuery());
1386 // System.err.println(" => DISCARDED");
1390 if (entry.isRefuted()) {
1391 if (Development.DEVELOPMENT) {
1392 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1393 System.out.print("R");
1394 for (int i = 0; i < e.indent; i++)
1395 System.out.print(" ");
1396 System.out.println(entry.getQuery());
1402 if (entry.isExcepted()) {
1403 if (Development.DEVELOPMENT) {
1404 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1405 System.out.print("E");
1410 if (entry.isPending()) {
1411 if (Development.DEVELOPMENT) {
1412 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1413 System.out.print("P");
1420 if (Development.DEVELOPMENT) {
1421 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1422 System.out.print("U ");
1423 for (int i = 0; i < e.indent; i++)
1424 System.out.print(" ");
1425 System.out.print(entry.getQuery());
1429 Query query = entry.getQuery();
1430 int type = query.type();
1432 boolean hasListener = hasListener(entry);
1434 if (Development.DEVELOPMENT) {
1435 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1436 if(hasListener(entry)) {
1437 System.out.println(" (L)");
1439 System.out.println("");
1444 if(entry.isPending() || entry.isExcepted()) {
1447 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1449 immediates.put(entry, entry);
1464 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
1466 immediates.put(entry, entry);
1480 // System.err.println(" => FOO " + type);
1483 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1484 if(entries != null) {
1485 for (ListenerEntry le : entries) {
1486 scheduleListener(le);
1491 // If invalid, update parents
1492 if (type == RequestFlags.INVALIDATE) {
1493 updateParents(e.indent, entry, todo);
1500 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
1502 Iterable<CacheEntry> oldParents = entry.getParents(this);
1503 for (CacheEntry parent : oldParents) {
1504 // System.err.println("updateParents " + entry + " => " + parent);
1505 if(!parent.isDiscarded())
1506 todo.push(new UpdateEntry(entry, parent, indent + 2));
1511 private boolean pruneListener(ListenerEntry entry) {
1512 if (entry.base.isDisposed()) {
1513 removeListener(entry);
1521 * @param av1 an array (guaranteed)
1522 * @param av2 any object
1523 * @return <code>true</code> if the two arrays are equal
1525 private final boolean arrayEquals(Object av1, Object av2) {
1528 Class<?> c1 = av1.getClass().getComponentType();
1529 Class<?> c2 = av2.getClass().getComponentType();
1530 if (c2 == null || !c1.equals(c2))
1532 boolean p1 = c1.isPrimitive();
1533 boolean p2 = c2.isPrimitive();
1537 return Arrays.equals((Object[]) av1, (Object[]) av2);
1538 if (boolean.class.equals(c1))
1539 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
1540 else if (byte.class.equals(c1))
1541 return Arrays.equals((byte[]) av1, (byte[]) av2);
1542 else if (int.class.equals(c1))
1543 return Arrays.equals((int[]) av1, (int[]) av2);
1544 else if (long.class.equals(c1))
1545 return Arrays.equals((long[]) av1, (long[]) av2);
1546 else if (float.class.equals(c1))
1547 return Arrays.equals((float[]) av1, (float[]) av2);
1548 else if (double.class.equals(c1))
1549 return Arrays.equals((double[]) av1, (double[]) av2);
1550 throw new RuntimeException("??? Contact application querySupport.");
1555 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
1559 Query query = entry.getQuery();
1561 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
1563 entry.prepareRecompute(querySupport);
1565 ReadGraphImpl parentGraph = graph.withParent(entry);
1567 query.recompute(parentGraph);
1569 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
1571 Object newValue = entry.getResult();
1573 if (ListenerEntry.NO_VALUE == oldValue) {
1574 if(DebugPolicy.CHANGES) {
1575 System.out.println("C " + query);
1576 System.out.println("- " + oldValue);
1577 System.out.println("- " + newValue);
1582 boolean changed = false;
1584 if (newValue != null) {
1585 if (newValue.getClass().isArray()) {
1586 changed = !arrayEquals(newValue, oldValue);
1588 changed = !newValue.equals(oldValue);
1591 changed = (oldValue != null);
1593 if(DebugPolicy.CHANGES && changed) {
1594 System.out.println("C " + query);
1595 System.out.println("- " + oldValue);
1596 System.out.println("- " + newValue);
1599 return changed ? newValue : ListenerEntry.NOT_CHANGED;
1601 } catch (Throwable t) {
1603 Logger.defaultLogError(t);
1605 return ListenerEntry.NO_VALUE;
1611 public boolean hasScheduledUpdates() {
1612 return !scheduledListeners.isEmpty();
1615 public void performScheduledUpdates(WriteGraphImpl graph) {
1618 assert (!cache.collecting);
1619 assert (!firingListeners);
1621 firingListeners = true;
1625 // Performing may cause further events to be scheduled.
1626 while (!scheduledListeners.isEmpty()) {
1629 // graph.state.barrier.inc();
1631 // Clone current events to make new entries possible during
1633 THashSet<ListenerEntry> entries = scheduledListeners;
1634 scheduledListeners = new THashSet<ListenerEntry>();
1636 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
1638 for (ListenerEntry listenerEntry : entries) {
1640 if (pruneListener(listenerEntry)) {
1641 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
1645 final CacheEntry entry = listenerEntry.entry;
1646 assert (entry != null);
1648 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
1650 if (newValue != ListenerEntry.NOT_CHANGED) {
1651 if(DebugPolicy.LISTENER)
1652 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
1653 schedule.add(listenerEntry);
1654 listenerEntry.setLastKnown(entry.getResult());
1659 for(ListenerEntry listenerEntry : schedule) {
1660 final CacheEntry entry = listenerEntry.entry;
1661 if(DebugPolicy.LISTENER)
1662 System.out.println("Firing " + listenerEntry.procedure);
1664 if(DebugPolicy.LISTENER)
1665 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
1666 entry.performFromCache(graph, listenerEntry.procedure);
1667 } catch (Throwable t) {
1668 t.printStackTrace();
1672 // graph.state.barrier.dec();
1673 // graph.waitAsync(null);
1674 // graph.state.barrier.assertReady();
1679 firingListeners = false;
1686 * @return true if this entry still has listeners
1688 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
1690 assert (!cache.collecting);
1694 boolean hadListeners = false;
1695 boolean listenersUnknown = false;
1699 assert(entry != null);
1700 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
1701 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
1702 todo.add(new UpdateEntry(null, entry, 0));
1706 // Walk the tree and collect immediate updates
1707 while (!todo.isEmpty()) {
1708 UpdateEntry e = todo.pop();
1709 hadListeners |= updateQuery(e, todo, immediates);
1712 if(immediates.isEmpty()) break;
1714 // Evaluate all immediate updates and collect parents to update
1715 for(CacheEntry immediate : immediates.values()) {
1717 if(immediate.isDiscarded()) {
1721 if(immediate.isExcepted()) {
1723 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
1724 if (newValue != ListenerEntry.NOT_CHANGED)
1725 updateParents(0, immediate, todo);
1729 Object oldValue = immediate.getResult();
1730 Object newValue = compareTo(graph, immediate, oldValue);
1732 if (newValue != ListenerEntry.NOT_CHANGED) {
1733 updateParents(0, immediate, todo);
1735 // If not changed, keep the old value
1736 immediate.setResult(oldValue);
1737 listenersUnknown = true;
1747 } catch (Throwable t) {
1748 Logger.defaultLogError(t);
1754 return hadListeners | listenersUnknown;
1758 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
1759 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
1760 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
1761 // Maybe use a mutex from util.concurrent?
1762 private Object primitiveUpdateLock = new Object();
1763 private THashSet scheduledPrimitiveUpdates = new THashSet();
1765 public void performDirtyUpdates(final ReadGraphImpl graph) {
1767 cache.dirty = false;
1770 if (Development.DEVELOPMENT) {
1771 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1772 System.err.println("== Query update ==");
1776 // Special case - one statement
1777 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1779 long arg0 = scheduledObjectUpdates.getFirst();
1781 final int subject = (int)(arg0 >>> 32);
1782 final int predicate = (int)(arg0 & 0xffffffff);
1784 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1785 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1786 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1788 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1789 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1790 if(principalTypes != null) update(graph, principalTypes);
1791 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1792 if(types != null) update(graph, types);
1795 if(predicate == subrelationOf) {
1796 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1797 if(superRelations != null) update(graph, superRelations);
1800 DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1801 if(dp != null) update(graph, dp);
1802 OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
1803 if(os != null) update(graph, os);
1805 scheduledObjectUpdates.clear();
1810 // Special case - one value
1811 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
1813 int arg0 = scheduledValueUpdates.getFirst();
1815 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1816 if(valueQuery != null) update(graph, valueQuery);
1818 scheduledValueUpdates.clear();
1823 final TIntHashSet predicates = new TIntHashSet();
1824 final TIntHashSet orderedSets = new TIntHashSet();
1826 THashSet primitiveUpdates;
1827 synchronized (primitiveUpdateLock) {
1828 primitiveUpdates = scheduledPrimitiveUpdates;
1829 scheduledPrimitiveUpdates = new THashSet();
1832 primitiveUpdates.forEach(new TObjectProcedure() {
1835 public boolean execute(Object arg0) {
1837 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
1838 if (query != null) {
1839 boolean listening = update(graph, query);
1840 if (!listening && !query.hasParents()) {
1841 cache.externalReadEntryMap.remove(arg0);
1850 scheduledValueUpdates.forEach(new TIntProcedure() {
1853 public boolean execute(int arg0) {
1854 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
1855 if(valueQuery != null) update(graph, valueQuery);
1861 scheduledInvalidates.forEach(new TIntProcedure() {
1864 public boolean execute(int resource) {
1866 ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
1867 if(valueQuery != null) update(graph, valueQuery);
1869 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
1870 if(principalTypes != null) update(graph, principalTypes);
1871 Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
1872 if(types != null) update(graph, types);
1874 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
1875 if(superRelations != null) update(graph, superRelations);
1877 predicates.add(resource);
1884 scheduledObjectUpdates.forEach(new TLongProcedure() {
1887 public boolean execute(long arg0) {
1889 final int subject = (int)(arg0 >>> 32);
1890 final int predicate = (int)(arg0 & 0xffffffff);
1892 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
1893 PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
1894 if(principalTypes != null) update(graph, principalTypes);
1895 Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
1896 if(types != null) update(graph, types);
1899 if(predicate == subrelationOf) {
1900 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
1901 if(superRelations != null) update(graph, superRelations);
1904 predicates.add(subject);
1905 orderedSets.add(predicate);
1913 predicates.forEach(new TIntProcedure() {
1916 public boolean execute(final int subject) {
1918 for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
1919 for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
1920 for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
1922 DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
1923 if(entry != null) update(graph, entry);
1931 orderedSets.forEach(new TIntProcedure() {
1934 public boolean execute(int orderedSet) {
1936 OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
1937 if(entry != null) update(graph, entry);
1945 // for (Integer subject : predicates) {
1946 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
1947 // if(entry != null) update(graph, entry);
1951 if (Development.DEVELOPMENT) {
1952 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
1953 System.err.println("== Query update ends ==");
1957 scheduledValueUpdates.clear();
1958 scheduledObjectUpdates.clear();
1959 scheduledInvalidates.clear();
1963 public void updateValue(final int resource) {
1964 scheduledValueUpdates.add(resource);
1968 public void updateStatements(final int resource, final int predicate) {
1969 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
1973 private int lastInvalidate = 0;
1975 public void invalidateResource(final int resource) {
1976 if(lastInvalidate == resource) return;
1977 scheduledValueUpdates.add(resource);
1978 lastInvalidate = resource;
1982 public void updatePrimitive(final ExternalRead primitive) {
1984 // External reads may be updated from arbitrary threads.
1985 // Synchronize to prevent race-conditions.
1986 synchronized (primitiveUpdateLock) {
1987 scheduledPrimitiveUpdates.add(primitive);
1989 querySupport.dirtyPrimitives();
1994 public synchronized String toString() {
1995 return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
1999 protected void doDispose() {
2001 for(int index = 0; index < THREADS; index++) {
2002 executors[index].dispose();
2006 for(int i=0;i<100;i++) {
2008 boolean alive = false;
2009 for(int index = 0; index < THREADS; index++) {
2010 alive |= executors[index].isAlive();
2015 } catch (InterruptedException e) {
2016 Logger.defaultLogError(e);
2021 // Then start interrupting
2022 for(int i=0;i<100;i++) {
2024 boolean alive = false;
2025 for(int index = 0; index < THREADS; index++) {
2026 alive |= executors[index].isAlive();
2029 for(int index = 0; index < THREADS; index++) {
2030 executors[index].interrupt();
2034 // // Then just destroy
2035 // for(int index = 0; index < THREADS; index++) {
2036 // executors[index].destroy();
2039 for(int index = 0; index < THREADS; index++) {
2041 executors[index].join(5000);
2042 } catch (InterruptedException e) {
2043 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2045 executors[index] = null;
2050 public int getHits() {
2054 public int getMisses() {
2055 return cache.misses;
2058 public int getSize() {
2062 public Set<Long> getReferencedClusters() {
2063 HashSet<Long> result = new HashSet<Long>();
2064 for (CacheEntry entry : QueryCache.entriesObjects(this)) {
2065 Objects query = (Objects) entry.getQuery();
2066 result.add(querySupport.getClusterId(query.r1()));
2068 for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
2069 DirectPredicates query = (DirectPredicates) entry.getQuery();
2070 result.add(querySupport.getClusterId(query.id));
2072 for (CacheEntry entry : cache.valueQueryMap.values()) {
2073 ValueQuery query = (ValueQuery) entry.getQuery();
2074 result.add(querySupport.getClusterId(query.id));
2079 public void assertDone() {
2082 CacheCollectionResult allCaches(CacheCollectionResult result) {
2084 return cache.allCaches(result);
2088 public void printDiagnostics() {
2091 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
2092 querySupport.requestCluster(graph, clusterId, runnable);
2095 public int clean() {
2096 collector.collect(0, Integer.MAX_VALUE);
2100 public void clean(final Collection<ExternalRead<?>> requests) {
2101 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
2102 Iterator<ExternalRead<?>> iterator = requests.iterator();
2104 public CacheCollectionResult allCaches() {
2105 throw new UnsupportedOperationException();
2108 public CacheEntryBase iterate(int level) {
2109 if(iterator.hasNext()) {
2110 ExternalRead<?> request = iterator.next();
2111 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2112 if (entry != null) return entry;
2113 else return iterate(level);
2115 iterator = requests.iterator();
2120 public void remove() {
2121 throw new UnsupportedOperationException();
2124 public void setLevel(CacheEntryBase entry, int level) {
2125 throw new UnsupportedOperationException();
2128 public Collection<CacheEntry> getRootList() {
2129 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
2130 for (ExternalRead<?> request : requests) {
2131 ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
2138 public int getCurrentSize() {
2142 public int calculateCurrentSize() {
2143 // This tells the collector to attempt collecting everything.
2144 return Integer.MAX_VALUE;
2147 public boolean start(boolean flush) {
2151 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
2154 public void scanPending() {
2156 cache.scanPending();
2160 public ReadGraphImpl graphForVirtualRequest() {
2161 return ReadGraphImpl.createAsync(this);
2165 private HashMap<Resource, Class<?>> builtinValues;
2167 public Class<?> getBuiltinValue(Resource r) {
2168 if(builtinValues == null) initBuiltinValues();
2169 return builtinValues.get(r);
2172 Exception callerException = null;
2174 public interface AsyncBarrier {
2177 // public void inc(String debug);
2178 // public void dec(String debug);
2181 // final public QueryProcessor processor;
2182 // final public QuerySupport support;
2184 // boolean disposed = false;
2186 private void initBuiltinValues() {
2188 Layer0 b = getSession().peekService(Layer0.class);
2189 if(b == null) return;
2191 builtinValues = new HashMap<Resource, Class<?>>();
2193 builtinValues.put(b.String, String.class);
2194 builtinValues.put(b.Double, Double.class);
2195 builtinValues.put(b.Float, Float.class);
2196 builtinValues.put(b.Long, Long.class);
2197 builtinValues.put(b.Integer, Integer.class);
2198 builtinValues.put(b.Byte, Byte.class);
2199 builtinValues.put(b.Boolean, Boolean.class);
2201 builtinValues.put(b.StringArray, String[].class);
2202 builtinValues.put(b.DoubleArray, double[].class);
2203 builtinValues.put(b.FloatArray, float[].class);
2204 builtinValues.put(b.LongArray, long[].class);
2205 builtinValues.put(b.IntegerArray, int[].class);
2206 builtinValues.put(b.ByteArray, byte[].class);
2207 builtinValues.put(b.BooleanArray, boolean[].class);
2211 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
2213 // if (null == provider2) {
2214 // this.processor = null;
2218 // this.processor = provider2;
2219 // support = provider2.getCore();
2220 // initBuiltinValues();
2224 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
2225 // return new ReadGraphSupportImpl(impl.processor);
2229 final public Session getSession() {
2233 final public ResourceSupport getResourceSupport() {
2234 return resourceSupport;
2238 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2240 throw new UnsupportedOperationException();
2242 // assert(subject != null);
2243 // assert(procedure != null);
2245 // final ListenerBase listener = getListenerBase(procedure);
2247 // IntProcedure ip = new IntProcedure() {
2249 // AtomicBoolean first = new AtomicBoolean(true);
2252 // public void execute(ReadGraphImpl graph, int i) {
2254 // if(first.get()) {
2255 // procedure.execute(graph, querySupport.getResource(i));
2257 // procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2259 // } catch (Throwable t2) {
2260 // Logger.defaultLogError(t2);
2265 // public void finished(ReadGraphImpl graph) {
2267 // if(first.compareAndSet(true, false)) {
2268 // procedure.finished(graph);
2269 //// impl.state.barrier.dec(this);
2271 // procedure.finished(impl.newRestart(graph));
2274 // } catch (Throwable t2) {
2275 // Logger.defaultLogError(t2);
2280 // public void exception(ReadGraphImpl graph, Throwable t) {
2282 // if(first.compareAndSet(true, false)) {
2283 // procedure.exception(graph, t);
2285 // procedure.exception(impl.newRestart(graph), t);
2287 // } catch (Throwable t2) {
2288 // Logger.defaultLogError(t2);
2294 // int sId = querySupport.getId(subject);
2297 // QueryCache.runnerPredicates(impl, sId, impl.parent, listener, ip);
2298 // } catch (DatabaseException e) {
2299 // Logger.defaultLogError(e);
2305 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
2307 throw new UnsupportedOperationException();
2309 // assert(subject != null);
2310 // assert(procedure != null);
2312 // final ListenerBase listener = getListenerBase(procedure);
2315 // QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
2318 // public void execute(ReadGraphImpl graph, int i) {
2320 // procedure.execute(querySupport.getResource(i));
2321 // } catch (Throwable t2) {
2322 // Logger.defaultLogError(t2);
2327 // public void finished(ReadGraphImpl graph) {
2329 // procedure.finished();
2330 // } catch (Throwable t2) {
2331 // Logger.defaultLogError(t2);
2333 //// impl.state.barrier.dec();
2337 // public void exception(ReadGraphImpl graph, Throwable t) {
2339 // procedure.exception(t);
2340 // } catch (Throwable t2) {
2341 // Logger.defaultLogError(t2);
2343 //// impl.state.barrier.dec();
2347 // } catch (DatabaseException e) {
2348 // Logger.defaultLogError(e);
2354 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
2355 return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
2359 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2360 final Resource predicate, final MultiProcedure<Statement> procedure) {
2362 assert(subject != null);
2363 assert(predicate != null);
2364 assert(procedure != null);
2366 final ListenerBase listener = getListenerBase(procedure);
2368 // impl.state.barrier.inc();
2371 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
2374 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2376 procedure.execute(querySupport.getStatement(s, p, o));
2377 } catch (Throwable t2) {
2378 Logger.defaultLogError(t2);
2383 public void finished(ReadGraphImpl graph) {
2385 procedure.finished();
2386 } catch (Throwable t2) {
2387 Logger.defaultLogError(t2);
2389 // impl.state.barrier.dec();
2393 public void exception(ReadGraphImpl graph, Throwable t) {
2395 procedure.exception(t);
2396 } catch (Throwable t2) {
2397 Logger.defaultLogError(t2);
2399 // impl.state.barrier.dec();
2403 } catch (DatabaseException e) {
2404 Logger.defaultLogError(e);
2410 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2411 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2413 assert(subject != null);
2414 assert(predicate != null);
2415 assert(procedure != null);
2417 final ListenerBase listener = getListenerBase(procedure);
2419 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2421 boolean first = true;
2424 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2427 procedure.execute(graph, querySupport.getStatement(s, p, o));
2429 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
2431 } catch (Throwable t2) {
2432 Logger.defaultLogError(t2);
2437 public void finished(ReadGraphImpl graph) {
2442 procedure.finished(graph);
2443 // impl.state.barrier.dec(this);
2445 procedure.finished(impl.newRestart(graph));
2447 } catch (Throwable t2) {
2448 Logger.defaultLogError(t2);
2454 public void exception(ReadGraphImpl graph, Throwable t) {
2459 procedure.exception(graph, t);
2460 // impl.state.barrier.dec(this);
2462 procedure.exception(impl.newRestart(graph), t);
2464 } catch (Throwable t2) {
2465 Logger.defaultLogError(t2);
2472 int sId = querySupport.getId(subject);
2473 int pId = querySupport.getId(predicate);
2475 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2476 // else impl.state.barrier.inc(null, null);
2479 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2480 } catch (DatabaseException e) {
2481 Logger.defaultLogError(e);
2487 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
2488 final Resource predicate, final StatementProcedure procedure) {
2490 assert(subject != null);
2491 assert(predicate != null);
2492 assert(procedure != null);
2494 final ListenerBase listener = getListenerBase(procedure);
2496 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
2498 boolean first = true;
2501 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2504 procedure.execute(graph, s, p, o);
2506 procedure.execute(impl.newRestart(graph), s, p, o);
2508 } catch (Throwable t2) {
2509 Logger.defaultLogError(t2);
2514 public void finished(ReadGraphImpl graph) {
2519 procedure.finished(graph);
2520 // impl.state.barrier.dec(this);
2522 procedure.finished(impl.newRestart(graph));
2524 } catch (Throwable t2) {
2525 Logger.defaultLogError(t2);
2531 public void exception(ReadGraphImpl graph, Throwable t) {
2536 procedure.exception(graph, t);
2537 // impl.state.barrier.dec(this);
2539 procedure.exception(impl.newRestart(graph), t);
2541 } catch (Throwable t2) {
2542 Logger.defaultLogError(t2);
2549 int sId = querySupport.getId(subject);
2550 int pId = querySupport.getId(predicate);
2552 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
2553 // else impl.state.barrier.inc(null, null);
2556 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
2557 } catch (DatabaseException e) {
2558 Logger.defaultLogError(e);
2564 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
2566 assert(subject != null);
2567 assert(predicate != null);
2568 assert(procedure != null);
2570 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
2572 private Set<Statement> current = null;
2573 private Set<Statement> run = new HashSet<Statement>();
2576 public void execute(AsyncReadGraph graph, Statement result) {
2578 boolean found = false;
2580 if(current != null) {
2582 found = current.remove(result);
2586 if(!found) procedure.add(graph, result);
2593 public void finished(AsyncReadGraph graph) {
2595 if(current != null) {
2596 for(Statement r : current) procedure.remove(graph, r);
2601 run = new HashSet<Statement>();
2606 public void exception(AsyncReadGraph graph, Throwable t) {
2607 procedure.exception(graph, t);
2611 public boolean isDisposed() {
2612 return procedure.isDisposed();
2620 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
2621 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
2623 assert(subject != null);
2624 assert(predicate != null);
2625 assert(procedure != null);
2627 final ListenerBase listener = getListenerBase(procedure);
2629 // impl.state.barrier.inc();
2632 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
2635 public void execute(ReadGraphImpl graph, int s, int p, int o) {
2637 procedure.execute(graph, querySupport.getStatement(s, p, o));
2638 } catch (Throwable t2) {
2639 Logger.defaultLogError(t2);
2644 public void finished(ReadGraphImpl graph) {
2646 procedure.finished(graph);
2647 } catch (Throwable t2) {
2648 Logger.defaultLogError(t2);
2650 // impl.state.barrier.dec();
2654 public void exception(ReadGraphImpl graph, Throwable t) {
2656 procedure.exception(graph, t);
2657 } catch (Throwable t2) {
2658 Logger.defaultLogError(t2);
2660 // impl.state.barrier.dec();
2664 } catch (DatabaseException e) {
2665 Logger.defaultLogError(e);
2670 private static ListenerBase getListenerBase(Object procedure) {
2671 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
2676 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
2678 assert(subject != null);
2679 assert(predicate != null);
2680 assert(procedure != null);
2682 final ListenerBase listener = getListenerBase(procedure);
2684 // impl.state.barrier.inc();
2687 QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
2690 public void execute(ReadGraphImpl graph, int i) {
2692 procedure.execute(querySupport.getResource(i));
2693 } catch (Throwable t2) {
2694 Logger.defaultLogError(t2);
2699 public void finished(ReadGraphImpl graph) {
2701 procedure.finished();
2702 } catch (Throwable t2) {
2703 Logger.defaultLogError(t2);
2705 // impl.state.barrier.dec();
2709 public void exception(ReadGraphImpl graph, Throwable t) {
2710 System.out.println("forEachObject exception " + t);
2712 procedure.exception(t);
2713 } catch (Throwable t2) {
2714 Logger.defaultLogError(t2);
2716 // impl.state.barrier.dec();
2720 } catch (DatabaseException e) {
2721 Logger.defaultLogError(e);
2727 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
2729 throw new UnsupportedOperationException();
2731 // assert(subject != null);
2732 // assert(procedure != null);
2734 // final ListenerBase listener = getListenerBase(procedure);
2736 // MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);
2738 // int sId = querySupport.getId(subject);
2741 // QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, proc);
2742 // } catch (DatabaseException e) {
2743 // Logger.defaultLogError(e);
2749 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {
2751 assert(subject != null);
2752 assert(procedure != null);
2754 final ListenerBase listener = getListenerBase(procedure);
2756 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
2761 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
2763 assert(subject != null);
2764 assert(procedure != null);
2766 final ListenerBase listener = getListenerBase(procedure);
2768 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
2772 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
2775 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
2777 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
2779 private Resource single = null;
2782 public synchronized void execute(AsyncReadGraph graph, Resource result) {
2783 if(single == null) {
2786 single = INVALID_RESOURCE;
2791 public synchronized void finished(AsyncReadGraph graph) {
2792 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
2793 else procedure.execute(graph, single);
2797 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
2798 procedure.exception(graph, throwable);
2805 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
2807 final int sId = querySupport.getId(subject);
2808 final int pId = querySupport.getId(predicate);
2811 QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
2812 } catch (DatabaseException e) {
2813 Logger.defaultLogError(e);
2818 static class Runner2Procedure implements IntProcedure {
2820 public int single = 0;
2821 public Throwable t = null;
2823 public void clear() {
2829 public void execute(ReadGraphImpl graph, int i) {
2830 if(single == 0) single = i;
2835 public void finished(ReadGraphImpl graph) {
2836 if(single == -1) single = 0;
2840 public void exception(ReadGraphImpl graph, Throwable throwable) {
2845 public int get() throws DatabaseException {
2847 if(t instanceof DatabaseException) throw (DatabaseException)t;
2848 else throw new DatabaseException(t);
2855 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
2857 final int sId = querySupport.getId(subject);
2858 final int pId = querySupport.getId(predicate);
2860 Runner2Procedure proc = new Runner2Procedure();
2861 QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
2866 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
2868 assert(subject != null);
2869 assert(predicate != null);
2871 final ListenerBase listener = getListenerBase(procedure);
2873 if(impl.parent != null || listener != null) {
2875 IntProcedure ip = new IntProcedure() {
2877 AtomicBoolean first = new AtomicBoolean(true);
2880 public void execute(ReadGraphImpl graph, int i) {
2883 procedure.execute(impl, querySupport.getResource(i));
2885 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
2887 } catch (Throwable t2) {
2888 Logger.defaultLogError(t2);
2894 public void finished(ReadGraphImpl graph) {
2896 if(first.compareAndSet(true, false)) {
2897 procedure.finished(impl);
2898 // impl.state.barrier.dec(this);
2900 procedure.finished(impl.newRestart(graph));
2902 } catch (Throwable t2) {
2903 Logger.defaultLogError(t2);
2908 public void exception(ReadGraphImpl graph, Throwable t) {
2910 procedure.exception(graph, t);
2911 } catch (Throwable t2) {
2912 Logger.defaultLogError(t2);
2914 // impl.state.barrier.dec(this);
2918 public String toString() {
2919 return "forEachObject with " + procedure;
2924 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
2925 // else impl.state.barrier.inc(null, null);
2927 forEachObject(impl, subject, predicate, listener, ip);
2931 IntProcedure ip = new IntProcedure() {
2934 public void execute(ReadGraphImpl graph, int i) {
2935 procedure.execute(graph, querySupport.getResource(i));
2939 public void finished(ReadGraphImpl graph) {
2940 procedure.finished(graph);
2944 public void exception(ReadGraphImpl graph, Throwable t) {
2945 procedure.exception(graph, t);
2949 public String toString() {
2950 return "forEachObject with " + procedure;
2955 forEachObject(impl, subject, predicate, listener, ip);
2962 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
2964 assert(subject != null);
2965 assert(predicate != null);
2966 assert(procedure != null);
2968 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
2970 private Set<Resource> current = null;
2971 private Set<Resource> run = new HashSet<Resource>();
2974 public void execute(AsyncReadGraph graph, Resource result) {
2976 boolean found = false;
2978 if(current != null) {
2980 found = current.remove(result);
2984 if(!found) procedure.add(graph, result);
2991 public void finished(AsyncReadGraph graph) {
2993 if(current != null) {
2994 for(Resource r : current) procedure.remove(graph, r);
2999 run = new HashSet<Resource>();
3004 public boolean isDisposed() {
3005 return procedure.isDisposed();
3009 public void exception(AsyncReadGraph graph, Throwable t) {
3010 procedure.exception(graph, t);
3014 public String toString() {
3015 return "forObjectSet " + procedure;
3023 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3025 assert(subject != null);
3026 assert(procedure != null);
3028 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
3030 private Set<Resource> current = null;
3031 private Set<Resource> run = new HashSet<Resource>();
3034 public void execute(AsyncReadGraph graph, Resource result) {
3036 boolean found = false;
3038 if(current != null) {
3040 found = current.remove(result);
3044 if(!found) procedure.add(graph, result);
3051 public void finished(AsyncReadGraph graph) {
3053 if(current != null) {
3054 for(Resource r : current) procedure.remove(graph, r);
3059 run = new HashSet<Resource>();
3064 public boolean isDisposed() {
3065 return procedure.isDisposed();
3069 public void exception(AsyncReadGraph graph, Throwable t) {
3070 procedure.exception(graph, t);
3074 public String toString() {
3075 return "forPredicateSet " + procedure;
3083 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3085 assert(subject != null);
3086 assert(procedure != null);
3088 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
3090 private Set<Resource> current = null;
3091 private Set<Resource> run = new HashSet<Resource>();
3094 public void execute(AsyncReadGraph graph, Resource result) {
3096 boolean found = false;
3098 if(current != null) {
3100 found = current.remove(result);
3104 if(!found) procedure.add(graph, result);
3111 public void finished(AsyncReadGraph graph) {
3113 if(current != null) {
3114 for(Resource r : current) procedure.remove(graph, r);
3119 run = new HashSet<Resource>();
3124 public boolean isDisposed() {
3125 return procedure.isDisposed();
3129 public void exception(AsyncReadGraph graph, Throwable t) {
3130 procedure.exception(graph, t);
3134 public String toString() {
3135 return "forPrincipalTypeSet " + procedure;
3143 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3145 assert(subject != null);
3146 assert(predicate != null);
3147 assert(procedure != null);
3149 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3151 private Set<Resource> current = null;
3152 private Set<Resource> run = new HashSet<Resource>();
3155 public void execute(AsyncReadGraph graph, Resource result) {
3157 boolean found = false;
3159 if(current != null) {
3161 found = current.remove(result);
3165 if(!found) procedure.add(graph, result);
3172 public void finished(AsyncReadGraph graph) {
3174 if(current != null) {
3175 for(Resource r : current) procedure.remove(graph, r);
3180 run = new HashSet<Resource>();
3185 public boolean isDisposed() {
3186 return procedure.isDisposed();
3190 public void exception(AsyncReadGraph graph, Throwable t) {
3191 procedure.exception(graph, t);
3195 public String toString() {
3196 return "forObjectSet " + procedure;
3204 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3206 assert(subject != null);
3207 assert(predicate != null);
3208 assert(procedure != null);
3210 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3212 private Set<Statement> current = null;
3213 private Set<Statement> run = new HashSet<Statement>();
3216 public void execute(AsyncReadGraph graph, Statement result) {
3218 boolean found = false;
3220 if(current != null) {
3222 found = current.remove(result);
3226 if(!found) procedure.add(graph, result);
3233 public void finished(AsyncReadGraph graph) {
3235 if(current != null) {
3236 for(Statement s : current) procedure.remove(graph, s);
3241 run = new HashSet<Statement>();
3246 public boolean isDisposed() {
3247 return procedure.isDisposed();
3251 public void exception(AsyncReadGraph graph, Throwable t) {
3252 procedure.exception(graph, t);
3256 public String toString() {
3257 return "forStatementSet " + procedure;
3265 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
3266 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3268 assert(subject != null);
3269 assert(predicate != null);
3270 assert(procedure != null);
3272 final ListenerBase listener = getListenerBase(procedure);
3275 QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
3278 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3280 procedure.execute(graph, querySupport.getResource(o));
3281 } catch (Throwable t2) {
3282 Logger.defaultLogError(t2);
3287 public void finished(ReadGraphImpl graph) {
3289 procedure.finished(graph);
3290 } catch (Throwable t2) {
3291 Logger.defaultLogError(t2);
3293 // impl.state.barrier.dec();
3297 public void exception(ReadGraphImpl graph, Throwable t) {
3299 procedure.exception(graph, t);
3300 } catch (Throwable t2) {
3301 Logger.defaultLogError(t2);
3303 // impl.state.barrier.dec();
3307 } catch (DatabaseException e) {
3308 Logger.defaultLogError(e);
3314 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3316 assert(subject != null);
3317 assert(procedure != null);
3319 final ListenerBase listener = getListenerBase(procedure);
3321 IntProcedure ip = new IntProcedure() {
3324 public void execute(ReadGraphImpl graph, int i) {
3326 procedure.execute(graph, querySupport.getResource(i));
3327 } catch (Throwable t2) {
3328 Logger.defaultLogError(t2);
3333 public void finished(ReadGraphImpl graph) {
3335 procedure.finished(graph);
3336 } catch (Throwable t2) {
3337 Logger.defaultLogError(t2);
3339 // impl.state.barrier.dec(this);
3343 public void exception(ReadGraphImpl graph, Throwable t) {
3345 procedure.exception(graph, t);
3346 } catch (Throwable t2) {
3347 Logger.defaultLogError(t2);
3349 // impl.state.barrier.dec(this);
3354 int sId = querySupport.getId(subject);
3356 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
3357 // else impl.state.barrier.inc(null, null);
3360 QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
3361 } catch (DatabaseException e) {
3362 Logger.defaultLogError(e);
3368 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3370 assert(subject != null);
3371 assert(procedure != null);
3373 final ListenerBase listener = getListenerBase(procedure);
3375 // impl.state.barrier.inc();
3378 QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
3381 public void execute(ReadGraphImpl graph, int i) {
3383 procedure.execute(querySupport.getResource(i));
3384 } catch (Throwable t2) {
3385 Logger.defaultLogError(t2);
3390 public void finished(ReadGraphImpl graph) {
3392 procedure.finished();
3393 } catch (Throwable t2) {
3394 Logger.defaultLogError(t2);
3396 // impl.state.barrier.dec();
3400 public void exception(ReadGraphImpl graph, Throwable t) {
3402 procedure.exception(t);
3403 } catch (Throwable t2) {
3404 Logger.defaultLogError(t2);
3406 // impl.state.barrier.dec();
3410 } catch (DatabaseException e) {
3411 Logger.defaultLogError(e);
3415 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3417 assert(subject != null);
3418 assert(procedure != null);
3420 final ListenerBase listener = getListenerBase(procedure);
3421 assert(listener == null);
3423 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3426 public void execute(final ReadGraphImpl graph, IntSet set) {
3427 procedure.execute(graph, set);
3431 public void exception(ReadGraphImpl graph, Throwable t) {
3432 procedure.exception(graph, t);
3437 int sId = querySupport.getId(subject);
3440 QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
3441 } catch (DatabaseException e) {
3442 Logger.defaultLogError(e);
3448 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3450 assert(subject != null);
3452 return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
3457 final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {
3459 assert(subject != null);
3460 assert(procedure != null);
3462 final ListenerBase listener = getListenerBase(procedure);
3463 assert(listener == null);
3467 QueryCache.runnerRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<RelationInfo>() {
3470 public void execute(final ReadGraphImpl graph, RelationInfo set) {
3471 procedure.execute(graph, set);
3475 public void exception(ReadGraphImpl graph, Throwable t) {
3476 procedure.exception(graph, t);
3480 } catch (DatabaseException e) {
3481 Logger.defaultLogError(e);
3487 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3489 assert(subject != null);
3490 assert(procedure != null);
3492 final ListenerBase listener = getListenerBase(procedure);
3495 QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
3497 AtomicBoolean first = new AtomicBoolean(true);
3500 public void execute(final ReadGraphImpl graph, IntSet set) {
3501 // final HashSet<Resource> result = new HashSet<Resource>();
3502 // set.forEach(new TIntProcedure() {
3505 // public boolean execute(int type) {
3506 // result.add(querySupport.getResource(type));
3512 if(first.compareAndSet(true, false)) {
3513 procedure.execute(graph, set);
3514 // impl.state.barrier.dec();
3516 procedure.execute(impl.newRestart(graph), set);
3518 } catch (Throwable t2) {
3519 Logger.defaultLogError(t2);
3524 public void exception(ReadGraphImpl graph, Throwable t) {
3526 if(first.compareAndSet(true, false)) {
3527 procedure.exception(graph, t);
3528 // impl.state.barrier.dec();
3530 procedure.exception(impl.newRestart(graph), t);
3532 } catch (Throwable t2) {
3533 Logger.defaultLogError(t2);
3538 } catch (DatabaseException e) {
3539 Logger.defaultLogError(e);
3545 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3547 assert(subject != null);
3548 assert(procedure != null);
3550 final ListenerBase listener = getListenerBase(procedure);
3552 IntProcedure ip = new IntProcedureAdapter() {
3555 public void execute(final ReadGraphImpl graph, int superRelation) {
3557 procedure.execute(graph, querySupport.getResource(superRelation));
3558 } catch (Throwable t2) {
3559 Logger.defaultLogError(t2);
3564 public void finished(final ReadGraphImpl graph) {
3566 procedure.finished(graph);
3567 } catch (Throwable t2) {
3568 Logger.defaultLogError(t2);
3570 // impl.state.barrier.dec(this);
3575 public void exception(ReadGraphImpl graph, Throwable t) {
3577 procedure.exception(graph, t);
3578 } catch (Throwable t2) {
3579 Logger.defaultLogError(t2);
3581 // impl.state.barrier.dec(this);
3586 int sId = querySupport.getId(subject);
3588 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
3589 // else impl.state.barrier.inc(null, null);
3592 QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
3593 } catch (DatabaseException e) {
3594 Logger.defaultLogError(e);
3597 // DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
3602 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
3604 assert(subject != null);
3605 assert(procedure != null);
3607 final ListenerBase listener = getListenerBase(procedure);
3609 // impl.state.barrier.inc();
3611 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3616 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
3618 assert(subject != null);
3619 assert(procedure != null);
3621 final ListenerBase listener = getListenerBase(procedure);
3623 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
3626 public void execute(final ReadGraphImpl graph, IntSet set) {
3627 // final HashSet<Resource> result = new HashSet<Resource>();
3628 // set.forEach(new TIntProcedure() {
3631 // public boolean execute(int type) {
3632 // result.add(querySupport.getResource(type));
3638 procedure.execute(graph, set);
3639 } catch (Throwable t2) {
3640 Logger.defaultLogError(t2);
3642 // impl.state.barrier.dec(this);
3646 public void exception(ReadGraphImpl graph, Throwable t) {
3648 procedure.exception(graph, t);
3649 } catch (Throwable t2) {
3650 Logger.defaultLogError(t2);
3652 // impl.state.barrier.dec(this);
3657 int sId = querySupport.getId(subject);
3659 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
3660 // else impl.state.barrier.inc(null, null);
3663 QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
3664 } catch (DatabaseException e) {
3665 Logger.defaultLogError(e);
3670 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
3671 return getValue(impl, querySupport.getId(subject));
3674 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
3675 return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
3679 final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3681 assert(subject != null);
3682 assert(procedure != null);
3684 int sId = querySupport.getId(subject);
3686 // if(procedure != null) {
3688 final ListenerBase listener = getListenerBase(procedure);
3690 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3692 AtomicBoolean first = new AtomicBoolean(true);
3695 public void execute(ReadGraphImpl graph, byte[] result) {
3697 if(first.compareAndSet(true, false)) {
3698 procedure.execute(graph, result);
3699 // impl.state.barrier.dec(this);
3701 procedure.execute(impl.newRestart(graph), result);
3703 } catch (Throwable t2) {
3704 Logger.defaultLogError(t2);
3709 public void exception(ReadGraphImpl graph, Throwable t) {
3711 if(first.compareAndSet(true, false)) {
3712 procedure.exception(graph, t);
3713 // impl.state.barrier.dec(this);
3715 procedure.exception(impl.newRestart(graph), t);
3717 } catch (Throwable t2) {
3718 Logger.defaultLogError(t2);
3724 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3725 // else impl.state.barrier.inc(null, null);
3728 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3729 } catch (DatabaseException e) {
3730 throw new IllegalStateException("Internal error");
3735 // return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
3739 // throw new IllegalStateException("Internal error");
3744 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
3746 assert(subject != null);
3747 assert(procedure != null);
3749 final ListenerBase listener = getListenerBase(procedure);
3751 if(impl.parent != null || listener != null) {
3753 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3755 AtomicBoolean first = new AtomicBoolean(true);
3758 public void execute(ReadGraphImpl graph, byte[] result) {
3760 if(first.compareAndSet(true, false)) {
3761 procedure.execute(graph, result);
3762 // impl.state.barrier.dec(this);
3764 procedure.execute(impl.newRestart(graph), result);
3766 } catch (Throwable t2) {
3767 Logger.defaultLogError(t2);
3772 public void exception(ReadGraphImpl graph, Throwable t) {
3774 if(first.compareAndSet(true, false)) {
3775 procedure.exception(graph, t);
3776 // impl.state.barrier.dec(this);
3778 procedure.exception(impl.newRestart(graph), t);
3780 } catch (Throwable t2) {
3781 Logger.defaultLogError(t2);
3787 int sId = querySupport.getId(subject);
3789 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
3790 // else impl.state.barrier.inc(null, null);
3793 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3794 } catch (DatabaseException e) {
3795 Logger.defaultLogError(e);
3800 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
3803 public void execute(ReadGraphImpl graph, byte[] result) {
3805 procedure.execute(graph, result);
3810 public void exception(ReadGraphImpl graph, Throwable t) {
3812 procedure.exception(graph, t);
3818 int sId = querySupport.getId(subject);
3821 QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
3822 } catch (DatabaseException e) {
3823 Logger.defaultLogError(e);
3831 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
3833 assert(relation != null);
3834 assert(procedure != null);
3836 final ListenerBase listener = getListenerBase(procedure);
3838 IntProcedure ip = new IntProcedure() {
3840 private int result = 0;
3842 final AtomicBoolean found = new AtomicBoolean(false);
3843 final AtomicBoolean done = new AtomicBoolean(false);
3846 public void finished(ReadGraphImpl graph) {
3848 // Shall fire exactly once!
3849 if(done.compareAndSet(false, true)) {
3852 procedure.exception(graph, new NoInverseException(""));
3853 // impl.state.barrier.dec(this);
3855 procedure.execute(graph, querySupport.getResource(result));
3856 // impl.state.barrier.dec(this);
3858 } catch (Throwable t) {
3859 Logger.defaultLogError(t);
3866 public void execute(ReadGraphImpl graph, int i) {
3868 if(found.compareAndSet(false, true)) {
3871 // Shall fire exactly once!
3872 if(done.compareAndSet(false, true)) {
3874 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
3875 // impl.state.barrier.dec(this);
3876 } catch (Throwable t) {
3877 Logger.defaultLogError(t);
3885 public void exception(ReadGraphImpl graph, Throwable t) {
3886 // Shall fire exactly once!
3887 if(done.compareAndSet(false, true)) {
3889 procedure.exception(graph, t);
3890 // impl.state.barrier.dec(this);
3891 } catch (Throwable t2) {
3892 Logger.defaultLogError(t2);
3899 int sId = querySupport.getId(relation);
3901 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
3902 // else impl.state.barrier.inc(null, null);
3905 QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
3906 } catch (DatabaseException e) {
3907 Logger.defaultLogError(e);
3913 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3916 assert(procedure != null);
3918 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
3921 public void execute(ReadGraphImpl graph, Integer result) {
3923 procedure.execute(graph, querySupport.getResource(result));
3924 } catch (Throwable t2) {
3925 Logger.defaultLogError(t2);
3927 // impl.state.barrier.dec(this);
3931 public void exception(ReadGraphImpl graph, Throwable t) {
3934 procedure.exception(graph, t);
3935 } catch (Throwable t2) {
3936 Logger.defaultLogError(t2);
3938 // impl.state.barrier.dec(this);
3943 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
3944 // else impl.state.barrier.inc(null, null);
3946 forResource(impl, id, impl.parent, ip);
3951 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
3954 assert(procedure != null);
3956 // impl.state.barrier.inc();
3959 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
3962 public void execute(ReadGraphImpl graph, Integer result) {
3964 procedure.execute(graph, querySupport.getResource(result));
3965 } catch (Throwable t2) {
3966 Logger.defaultLogError(t2);
3968 // impl.state.barrier.dec();
3972 public void exception(ReadGraphImpl graph, Throwable t) {
3974 procedure.exception(graph, t);
3975 } catch (Throwable t2) {
3976 Logger.defaultLogError(t2);
3978 // impl.state.barrier.dec();
3982 } catch (DatabaseException e) {
3983 Logger.defaultLogError(e);
3989 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
3991 assert(subject != null);
3992 assert(procedure != null);
3994 final ListenerBase listener = getListenerBase(procedure);
3997 IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
3998 procedure.execute(impl, !result.isEmpty());
3999 } catch (DatabaseException e) {
4000 procedure.exception(impl, e);
4006 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
4008 assert(subject != null);
4009 assert(predicate != null);
4010 assert(procedure != null);
4012 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
4014 boolean found = false;
4017 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4022 synchronized public void finished(AsyncReadGraph graph) {
4024 procedure.execute(graph, found);
4025 } catch (Throwable t2) {
4026 Logger.defaultLogError(t2);
4028 // impl.state.barrier.dec(this);
4032 public void exception(AsyncReadGraph graph, Throwable t) {
4034 procedure.exception(graph, t);
4035 } catch (Throwable t2) {
4036 Logger.defaultLogError(t2);
4038 // impl.state.barrier.dec(this);
4043 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
4044 // else impl.state.barrier.inc(null, null);
4046 forEachObject(impl, subject, predicate, ip);
4051 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
4053 assert(subject != null);
4054 assert(predicate != null);
4055 assert(procedure != null);
4057 // impl.state.barrier.inc();
4059 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
4061 boolean found = false;
4064 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4065 if(resource.equals(object)) found = true;
4069 synchronized public void finished(AsyncReadGraph graph) {
4071 procedure.execute(graph, found);
4072 } catch (Throwable t2) {
4073 Logger.defaultLogError(t2);
4075 // impl.state.barrier.dec();
4079 public void exception(AsyncReadGraph graph, Throwable t) {
4081 procedure.exception(graph, t);
4082 } catch (Throwable t2) {
4083 Logger.defaultLogError(t2);
4085 // impl.state.barrier.dec();
4093 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4095 assert(subject != null);
4096 assert(procedure != null);
4098 final ListenerBase listener = getListenerBase(procedure);
4100 // impl.state.barrier.inc();
4103 QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
4106 public void execute(ReadGraphImpl graph, byte[] object) {
4107 boolean result = object != null;
4109 procedure.execute(graph, result);
4110 } catch (Throwable t2) {
4111 Logger.defaultLogError(t2);
4113 // impl.state.barrier.dec();
4117 public void exception(ReadGraphImpl graph, Throwable t) {
4119 procedure.exception(graph, t);
4120 } catch (Throwable t2) {
4121 Logger.defaultLogError(t2);
4123 // impl.state.barrier.dec();
4127 } catch (DatabaseException e) {
4128 Logger.defaultLogError(e);
4134 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4136 assert(subject != null);
4137 assert(procedure != null);
4139 final ListenerBase listener = getListenerBase(procedure);
4143 QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
4146 public void exception(ReadGraphImpl graph, Throwable t) {
4148 procedure.exception(graph, t);
4149 } catch (Throwable t2) {
4150 Logger.defaultLogError(t2);
4152 // impl.state.barrier.dec();
4156 public void execute(ReadGraphImpl graph, int i) {
4158 procedure.execute(graph, querySupport.getResource(i));
4159 } catch (Throwable t2) {
4160 Logger.defaultLogError(t2);
4165 public void finished(ReadGraphImpl graph) {
4167 procedure.finished(graph);
4168 } catch (Throwable t2) {
4169 Logger.defaultLogError(t2);
4171 // impl.state.barrier.dec();
4175 } catch (DatabaseException e) {
4176 Logger.defaultLogError(e);
4182 // final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
4184 // assert(request != null);
4185 // assert(procedure != null);
4187 // QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
4192 // final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
4194 // assert(graph != null);
4195 // assert(request != null);
4197 // final ReadEntry entry = (ReadEntry)cache.getCached(request);
4198 // if(entry != null && entry.isReady()) {
4199 // return (T)entry.get(graph, this, null);
4201 // return request.perform(graph);
4206 // final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
4208 // assert(graph != null);
4209 // assert(request != null);
4211 // final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
4212 // if(entry != null && entry.isReady()) {
4213 // if(entry.isExcepted()) {
4214 // Throwable t = (Throwable)entry.getResult();
4215 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4216 // else throw new DatabaseException(t);
4218 // return (T)entry.getResult();
4222 // final DataContainer<T> result = new DataContainer<T>();
4223 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
4225 // request.register(graph, new Listener<T>() {
4228 // public void exception(Throwable t) {
4229 // exception.set(t);
4233 // public void execute(T t) {
4238 // public boolean isDisposed() {
4244 // Throwable t = exception.get();
4246 // if(t instanceof DatabaseException) throw (DatabaseException)t;
4247 // else throw new DatabaseException(t);
4250 // return result.get();
4257 // final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
4259 // assert(graph != null);
4260 // assert(request != null);
4262 // final AsyncReadEntry entry = cache.asyncReadMap.get(request);
4263 // if(entry != null && entry.isReady()) {
4264 // if(entry.isExcepted()) {
4265 // procedure.exception(graph, (Throwable)entry.getResult());
4267 // procedure.execute(graph, (T)entry.getResult());
4270 // request.perform(graph, procedure);
4276 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4278 assert(request != null);
4279 assert(procedure != null);
4283 queryMultiRead(impl, request, parent, listener, procedure);
4285 } catch (DatabaseException e) {
4287 throw new IllegalStateException(e);
4294 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
4296 assert(request != null);
4297 assert(procedure != null);
4299 // impl.state.barrier.inc();
4301 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
4303 public void execute(AsyncReadGraph graph, T result) {
4306 procedure.execute(graph, result);
4307 } catch (Throwable t2) {
4308 Logger.defaultLogError(t2);
4313 public void finished(AsyncReadGraph graph) {
4316 procedure.finished(graph);
4317 } catch (Throwable t2) {
4318 Logger.defaultLogError(t2);
4321 // impl.state.barrier.dec();
4326 public String toString() {
4327 return procedure.toString();
4331 public void exception(AsyncReadGraph graph, Throwable t) {
4334 procedure.exception(graph, t);
4335 } catch (Throwable t2) {
4336 Logger.defaultLogError(t2);
4339 // impl.state.barrier.dec();
4348 // final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
4350 // assert(request != null);
4351 // assert(procedure != null);
4355 // queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
4358 // public String toString() {
4359 // return procedure.toString();
4363 // public void execute(AsyncReadGraph graph, T result) {
4365 // procedure.execute(result);
4366 // } catch (Throwable t2) {
4367 // Logger.defaultLogError(t2);
4372 // public void exception(AsyncReadGraph graph, Throwable throwable) {
4374 // procedure.exception(throwable);
4375 // } catch (Throwable t2) {
4376 // Logger.defaultLogError(t2);
4382 // } catch (DatabaseException e) {
4384 // throw new IllegalStateException(e);
4391 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
4393 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
4398 public VirtualGraph getProvider(Resource subject, Resource predicate) {
4400 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
4405 public VirtualGraph getValueProvider(Resource subject) {
4407 return querySupport.getValueProvider(querySupport.getId(subject));
4411 public boolean resumeTasks(ReadGraphImpl graph) {
4413 return querySupport.resume(graph);
4417 public boolean isImmutable(int resourceId) {
4418 return querySupport.isImmutable(resourceId);
4421 public boolean isImmutable(Resource resource) {
4422 ResourceImpl impl = (ResourceImpl)resource;
4423 return isImmutable(impl.id);
4428 public Layer0 getL0(ReadGraph graph) {
4430 L0 = Layer0.getInstance(graph);