1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
37 import org.simantics.databoard.Bindings;
38 import org.simantics.db.AsyncReadGraph;
39 import org.simantics.db.DevelopmentKeys;
40 import org.simantics.db.DirectStatements;
41 import org.simantics.db.ReadGraph;
42 import org.simantics.db.RelationInfo;
43 import org.simantics.db.Resource;
44 import org.simantics.db.Session;
45 import org.simantics.db.Statement;
46 import org.simantics.db.VirtualGraph;
47 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
48 import org.simantics.db.common.utils.Logger;
49 import org.simantics.db.debug.ListenerReport;
50 import org.simantics.db.exception.DatabaseException;
51 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
52 import org.simantics.db.exception.NoInverseException;
53 import org.simantics.db.exception.ResourceNotFoundException;
54 import org.simantics.db.impl.DebugPolicy;
55 import org.simantics.db.impl.ResourceImpl;
56 import org.simantics.db.impl.graph.MultiIntProcedure;
57 import org.simantics.db.impl.graph.ReadGraphImpl;
58 import org.simantics.db.impl.graph.ReadGraphSupport;
59 import org.simantics.db.impl.graph.WriteGraphImpl;
60 import org.simantics.db.impl.procedure.IntProcedureAdapter;
61 import org.simantics.db.impl.procedure.InternalProcedure;
62 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
63 import org.simantics.db.impl.support.ResourceSupport;
64 import org.simantics.db.procedure.AsyncMultiListener;
65 import org.simantics.db.procedure.AsyncMultiProcedure;
66 import org.simantics.db.procedure.AsyncProcedure;
67 import org.simantics.db.procedure.AsyncSetListener;
68 import org.simantics.db.procedure.Listener;
69 import org.simantics.db.procedure.ListenerBase;
70 import org.simantics.db.procedure.MultiProcedure;
71 import org.simantics.db.procedure.Procedure;
72 import org.simantics.db.procedure.StatementProcedure;
73 import org.simantics.db.request.AsyncMultiRead;
74 import org.simantics.db.request.AsyncRead;
75 import org.simantics.db.request.ExternalRead;
76 import org.simantics.db.request.MultiRead;
77 import org.simantics.db.request.Read;
78 import org.simantics.db.request.RequestFlags;
79 import org.simantics.db.request.WriteTraits;
80 import org.simantics.layer0.Layer0;
81 import org.simantics.utils.DataContainer;
82 import org.simantics.utils.Development;
83 import org.simantics.utils.datastructures.Pair;
84 import org.simantics.utils.datastructures.collections.CollectionUtils;
85 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
87 import gnu.trove.map.hash.THashMap;
88 import gnu.trove.procedure.TIntProcedure;
89 import gnu.trove.procedure.TLongProcedure;
90 import gnu.trove.procedure.TObjectProcedure;
91 import gnu.trove.set.hash.THashSet;
92 import gnu.trove.set.hash.TIntHashSet;
94 @SuppressWarnings({"rawtypes", "unchecked"})
95 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
97 final public UnaryQueryHashMap<IntProcedure> directPredicatesMap;
98 final public UnaryQueryHashMap<IntProcedure> principalTypesMap;
99 final public THashMap<String, URIToResource> uriToResourceMap;
100 final public THashMap<String, NamespaceIndex> namespaceIndexMap22;
101 final public UnaryQueryHashMap<IntProcedure> projectsMap;
102 final public UnaryQueryHashMap<InternalProcedure<RelationInfo>> relationInfoMap;
103 final public UnaryQueryHashMap<InternalProcedure<IntSet>> superTypesMap;
104 final public UnaryQueryHashMap<InternalProcedure<IntSet>> typeHierarchyMap;
105 final public UnaryQueryHashMap<InternalProcedure<IntSet>> superRelationsMap;
106 final public UnaryQueryHashMap<InternalProcedure<IntSet>> typesMap;
107 final public UnaryQueryHashMap<InternalProcedure<byte[]>> valueMap;
108 final public DoubleKeyQueryHashMap<IntProcedure> directObjectsMap;
109 final public DoubleKeyQueryHashMap<IntProcedure> objectsMap;
110 final public UnaryQueryHashMap<IntProcedure> orderedSetMap;
111 final public UnaryQueryHashMap<IntProcedure> predicatesMap;
112 final public DoubleKeyQueryHashMap<TripleIntProcedure> statementsMap;
113 final public UnaryQueryHashMap<IntProcedure> assertedPredicatesMap;
114 final public BinaryQueryHashMap<TripleIntProcedure> assertedStatementsMap;
115 final public StableHashMap<ExternalRead, ExternalReadEntry> externalReadMap;
116 final public StableHashMap<AsyncRead, AsyncReadEntry> asyncReadMap;
117 final public StableHashMap<Read, ReadEntry> readMap;
118 final public StableHashMap<AsyncMultiRead, AsyncMultiReadEntry> asyncMultiReadMap;
119 final public StableHashMap<MultiRead, MultiReadEntry> multiReadMap;
121 final private THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners;
123 public static int indent = 0;
128 // Garbage collection
130 public int boundQueries = 0;
133 private int hits = 0;
135 private int misses = 0;
137 private int updates = 0;
139 final private int functionalRelation;
141 final private int superrelationOf;
143 final private int instanceOf;
145 final private int inverseOf;
147 final private int asserts;
149 final private int hasPredicate;
151 final private int hasPredicateInverse;
153 final private int hasObject;
155 final private int inherits;
157 final private int subrelationOf;
159 final private int rootLibrary;
162 * A cache for the root library resource. Initialized in
163 * {@link #getRootLibraryResource()}.
165 private volatile ResourceImpl rootLibraryResource;
167 final private int library;
169 final private int consistsOf;
171 final private int hasName;
173 AtomicInteger sleepers = new AtomicInteger(0);
175 private boolean updating = false;
177 static public boolean collecting = false;
179 private boolean firingListeners = false;
181 final public QuerySupport querySupport;
182 final public Session session;
183 final public ResourceSupport resourceSupport;
185 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
187 QueryThread[] executors;
189 public ArrayList<SessionTask>[] queues;
193 INIT, RUN, SLEEP, DISPOSED
197 public ThreadState[] threadStates;
198 public ReentrantLock[] threadLocks;
199 public Condition[] threadConditions;
201 public ArrayList<SessionTask>[] ownTasks;
203 public ArrayList<SessionTask>[] ownSyncTasks;
205 ArrayList<SessionTask>[] delayQueues;
207 public boolean synch = true;
209 final Object querySupportLock;
211 public Long modificationCounter = 0L;
213 public void close() {
216 final public void scheduleOwn(int caller, SessionTask request) {
217 ownTasks[caller].add(request);
220 final public void scheduleAlways(int caller, SessionTask request) {
222 int performer = request.thread;
223 if(caller == performer) {
224 ownTasks[caller].add(request);
226 schedule(caller, request);
231 final public void schedule(int caller, SessionTask request) {
233 int performer = request.thread;
235 if(DebugPolicy.SCHEDULE)
236 System.out.println("schedule " + request + " " + caller + " -> " + performer);
238 assert(performer >= 0);
240 assert(request != null);
242 if(caller == performer) {
245 ReentrantLock queueLock = threadLocks[performer];
247 queues[performer].add(request);
248 // This thread could have been sleeping
249 if(queues[performer].size() == 1) {
250 if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
251 threadConditions[performer].signalAll();
261 final public int THREAD_MASK;
262 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
264 public static abstract class SessionTask {
266 final public int thread;
267 final public int syncCaller;
268 final public Object object;
270 public SessionTask(WriteTraits object, int thread) {
271 this.thread = thread;
272 this.syncCaller = -1;
273 this.object = object;
276 public SessionTask(Object object, int thread, int syncCaller) {
277 this.thread = thread;
278 this.syncCaller = syncCaller;
279 this.object = object;
282 public abstract void run(int thread);
285 public String toString() {
286 return "SessionTask[" + object + "]";
291 public static abstract class SessionRead extends SessionTask {
293 final public Semaphore notify;
294 final public DataContainer<Throwable> throwable;
296 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
297 super(object, thread, thread);
298 this.throwable = throwable;
299 this.notify = notify;
302 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
303 super(object, thread, syncThread);
304 this.throwable = throwable;
305 this.notify = notify;
310 long waitingTime = 0;
313 static int koss2 = 0;
315 public boolean resume(ReadGraphImpl graph) {
316 return executors[0].runSynchronized();
319 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
320 throws DatabaseException {
323 THREAD_MASK = threads - 1;
326 session = querySupport.getSession();
327 resourceSupport = querySupport.getSupport();
328 querySupportLock = core.getLock();
330 executors = new QueryThread[THREADS];
331 queues = new ArrayList[THREADS];
332 threadLocks = new ReentrantLock[THREADS];
333 threadConditions = new Condition[THREADS];
334 threadStates = new ThreadState[THREADS];
335 ownTasks = new ArrayList[THREADS];
336 ownSyncTasks = new ArrayList[THREADS];
337 delayQueues = new ArrayList[THREADS * THREADS];
339 // freeSchedule = new AtomicInteger(0);
341 for (int i = 0; i < THREADS * THREADS; i++) {
342 delayQueues[i] = new ArrayList<SessionTask>();
345 for (int i = 0; i < THREADS; i++) {
347 // tasks[i] = new ArrayList<Runnable>();
348 ownTasks[i] = new ArrayList<SessionTask>();
349 ownSyncTasks[i] = new ArrayList<SessionTask>();
350 queues[i] = new ArrayList<SessionTask>();
351 threadLocks[i] = new ReentrantLock();
352 threadConditions[i] = threadLocks[i].newCondition();
353 // limits[i] = false;
354 threadStates[i] = ThreadState.INIT;
358 for (int i = 0; i < THREADS; i++) {
362 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
364 threadSet.add(executors[i]);
368 directPredicatesMap = new UnaryQueryHashMap();
369 valueMap = new UnaryQueryHashMap();
370 principalTypesMap = new UnaryQueryHashMap();
371 uriToResourceMap = new THashMap<String, URIToResource>();
372 namespaceIndexMap22 = new THashMap<String, NamespaceIndex>();
373 projectsMap = new UnaryQueryHashMap();
374 relationInfoMap = new UnaryQueryHashMap();
375 typeHierarchyMap = new UnaryQueryHashMap();
376 superTypesMap = new UnaryQueryHashMap();
377 superRelationsMap = new UnaryQueryHashMap();
378 typesMap = new UnaryQueryHashMap();
379 objectsMap = new DoubleKeyQueryHashMap();
380 orderedSetMap = new UnaryQueryHashMap();
381 predicatesMap = new UnaryQueryHashMap();
382 statementsMap = new DoubleKeyQueryHashMap();
383 directObjectsMap = new DoubleKeyQueryHashMap();
384 assertedPredicatesMap = new UnaryQueryHashMap();
385 assertedStatementsMap = new BinaryQueryHashMap();
386 asyncReadMap = new StableHashMap<AsyncRead, AsyncReadEntry>();
387 readMap = new StableHashMap<Read, ReadEntry>();
388 asyncMultiReadMap = new StableHashMap<AsyncMultiRead, AsyncMultiReadEntry>();
389 multiReadMap = new StableHashMap<MultiRead, MultiReadEntry>();
390 externalReadMap = new StableHashMap<ExternalRead, ExternalReadEntry>();
391 listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
394 for (int i = 0; i < THREADS; i++) {
395 executors[i].start();
398 // Make sure that query threads are up and running
399 while(sleepers.get() != THREADS) {
402 } catch (InterruptedException e) {
407 rootLibrary = core.getBuiltin("http:/");
408 boolean builtinsInstalled = rootLibrary != 0;
410 if (builtinsInstalled) {
411 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
412 assert (functionalRelation != 0);
414 functionalRelation = 0;
416 if (builtinsInstalled) {
417 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
418 assert (instanceOf != 0);
422 if (builtinsInstalled) {
423 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
424 assert (inverseOf != 0);
429 if (builtinsInstalled) {
430 inherits = core.getBuiltin(Layer0.URIs.Inherits);
431 assert (inherits != 0);
435 if (builtinsInstalled) {
436 asserts = core.getBuiltin(Layer0.URIs.Asserts);
437 assert (asserts != 0);
441 if (builtinsInstalled) {
442 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
443 assert (hasPredicate != 0);
447 if (builtinsInstalled) {
448 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
449 assert (hasPredicateInverse != 0);
451 hasPredicateInverse = 0;
453 if (builtinsInstalled) {
454 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
455 assert (hasObject != 0);
459 if (builtinsInstalled) {
460 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
461 assert (subrelationOf != 0);
465 if (builtinsInstalled) {
466 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
467 assert (superrelationOf != 0);
471 if (builtinsInstalled) {
472 library = core.getBuiltin(Layer0.URIs.Library);
473 assert (library != 0);
477 if (builtinsInstalled) {
478 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
479 assert (consistsOf != 0);
483 if (builtinsInstalled) {
484 hasName = core.getBuiltin(Layer0.URIs.HasName);
485 assert (hasName != 0);
491 final public void releaseWrite(ReadGraphImpl graph) {
492 performDirtyUpdates(graph);
493 modificationCounter++;
496 final public int getId(final Resource r) {
497 return querySupport.getId(r);
500 public QuerySupport getCore() {
504 public int getFunctionalRelation() {
505 return functionalRelation;
508 public int getInherits() {
512 public int getInstanceOf() {
516 public int getInverseOf() {
520 public int getSubrelationOf() {
521 return subrelationOf;
524 public int getSuperrelationOf() {
525 return superrelationOf;
528 public int getAsserts() {
532 public int getHasPredicate() {
536 public int getHasPredicateInverse() {
537 return hasPredicateInverse;
540 public int getHasObject() {
544 public int getRootLibrary() {
548 public Resource getRootLibraryResource() {
549 if (rootLibraryResource == null) {
550 // Synchronization is not needed here, it doesn't matter if multiple
551 // threads simultaneously set rootLibraryResource once.
552 int root = getRootLibrary();
554 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
555 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
557 return rootLibraryResource;
560 public int getLibrary() {
564 public int getConsistsOf() {
568 public int getHasName() {
572 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
574 URIToResource.queryEach(graph, id, parent, null, new InternalProcedure<Integer>() {
577 public void execute(ReadGraphImpl graph, Integer result) {
579 if (result != null && result != 0) {
580 procedure.execute(graph, result);
584 // Fall back to using the fixed builtins.
585 result = querySupport.getBuiltin(id);
587 procedure.execute(graph, result);
592 result = querySupport.getRandomAccessReference(id);
593 } catch (ResourceNotFoundException e) {
594 procedure.exception(graph, e);
599 procedure.execute(graph, result);
601 procedure.exception(graph, new ResourceNotFoundException(id));
607 public void exception(ReadGraphImpl graph, Throwable t) {
608 procedure.exception(graph, t);
615 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
617 Integer result = querySupport.getBuiltin(id);
619 procedure.execute(graph, result);
621 procedure.exception(graph, new ResourceNotFoundException(id));
626 public final <T> void runAsyncRead(final ReadGraphImpl graph, final AsyncRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) {
628 int hash = requestHash(query);
630 AsyncReadEntry<T> entry = asyncReadMap.get(query, hash);
632 if(parent == null && listener == null) {
633 if(entry != null && (entry.isReady() || entry.isExcepted())) {
634 System.out.println("ready " + query);
635 entry.performFromCache(graph, this, procedure);
636 // graph.state.barrier.dec(query);
639 query.perform(graph, procedure);
640 // graph.state.barrier.dec(query);
647 entry = new AsyncReadEntry<T>(query);
649 entry.clearResult(querySupport);
650 asyncReadMap.put(query, entry, hash);
652 performForEach(graph, query, entry, parent, listener, procedure, false);
656 if(entry.isPending()) {
657 synchronized(entry) {
658 if(entry.isPending()) {
659 throw new IllegalStateException();
660 // final AsyncBarrierImpl parentBarrier = graph.state.barrier;
661 // if(entry.procs == null) entry.procs = new ArrayList<AsyncProcedure<T>>();
662 // entry.procs.add(new AsyncProcedure<T>() {
665 // public void execute(AsyncReadGraph graph, T result) {
666 // procedure.execute(graph, result);
667 // parentBarrier.dec(query);
671 // public void exception(AsyncReadGraph graph, Throwable throwable) {
672 // procedure.exception(graph, throwable);
673 // parentBarrier.dec(query);
677 // if(graph.parent != null || listener != null) {
678 // registerDependencies(graph, entry, parent, listener, procedure, false);
681 // query.perform(graph, procedure);
689 if(entry.isReady()) {
690 entry.performFromCache(graph, this, procedure);
691 registerDependencies(graph, entry, parent, listener, procedure, false);
693 performForEach(graph, query, entry, parent, listener, procedure, false);
701 final static <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
703 MultiReadEntry entry = cached != null ? cached : provider.multiReadMap.get(query);
706 entry = new MultiReadEntry(query);
708 entry.clearResult(provider.querySupport);
710 provider.multiReadMap.put(query, entry);
712 provider.performForEach(graph, query, entry, parent, listener, procedure, false);
716 if(entry.isPending()) {
718 synchronized(entry) {
720 if(entry.isPending()) {
721 throw new IllegalStateException();
723 // if(entry.procs == null) entry.procs = new ArrayList<Pair<AsyncMultiProcedure<T>, AsyncBarrier>>();
724 // entry.procs.add(new Pair(procedure, parentBarrier));
725 // if(graph.parent != null || listener != null) {
726 // provider.registerDependencies(graph, entry, parent, listener, procedure, false);
729 // If this was synchronized we must wait here until completion
730 // if(graph.state.synchronizedExecution) {
731 // while(entry.isPending()) {
732 // graph.resumeTasks(graph.callerThread, null, null);
741 entry.performFromCache(graph, provider, procedure);
742 // graph.state.barrier.dec(query);
747 provider.performForEach(graph, query, entry, parent, listener, procedure, false);
755 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
757 int hash = requestHash(query);
759 AsyncMultiReadEntry entry = asyncMultiReadMap.get(query, hash);
761 if(parent == null && listener == null) {
762 if(entry != null && (entry.isReady() || entry.isExcepted())) {
763 System.out.println("ready " + query);
764 entry.performFromCache(graph, this, procedure);
767 query.perform(graph, procedure);
774 entry = new AsyncMultiReadEntry<T>(query);
776 entry.clearResult(querySupport);
778 asyncMultiReadMap.put(query, entry, hash);
780 performForEach(graph, query, entry, parent, listener, procedure, false);
784 if(entry.isPending()) {
786 synchronized(entry) {
787 if(entry.isPending()) {
788 throw new IllegalStateException();
789 // if(entry.procs == null) entry.procs = new ArrayList<AsyncMultiProcedure<T>>();
790 // entry.procs.add(procedure);
791 // if(graph.parent != null || listener != null) {
792 // registerDependencies(graph, entry, parent, listener, procedure, false);
799 performForEach(graph, query, entry, parent, listener, procedure, false);
805 final static <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final Procedure<T> procedure) {
807 final ExternalReadEntry<T> entry = cached != null ? cached : provider.externalReadMap.get(query);
809 provider.performForEach(graph, query, new ExternalReadEntry<T>(query), parent, listener, procedure, false);
811 if(entry.isPending()) {
812 synchronized(entry) {
813 if(entry.isPending()) {
814 throw new IllegalStateException();
815 // if(entry.procs == null) entry.procs = new ArrayList<Procedure<T>>();
816 // entry.procs.add(procedure);
821 provider.performForEach(graph, query, entry, parent, listener, procedure, false);
826 public int requestHash(Object object) {
828 return object.hashCode();
829 } catch (Throwable t) {
830 Logger.defaultLogError(t);
836 public <T> T queryRead(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws Throwable {
838 assert(query != null);
840 ReadEntry entry = readMap.get(query);
843 if(parent == null && (listener == null || listener.isDisposed()) && entry.isReady()) {
844 return (T)entry.get(graph, this, procedure);
845 } else if (entry.isPending()) {
846 throw new IllegalStateException();
852 entry = new ReadEntry(query);
854 entry.clearResult(querySupport);
856 readMap.put(query, entry);
858 return (T)performForEach(graph, query, entry, parent, listener, procedure, false);
862 if(entry.isPending()) {
863 throw new IllegalStateException();
865 return (T)performForEach(graph, query, entry, parent, listener, procedure, false);
872 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
874 assert(query != null);
875 assert(procedure != null);
877 final MultiReadEntry entry = multiReadMap.get(query);
879 if(parent == null && !(listener != null)) {
880 if(entry != null && entry.isReady()) {
881 entry.performFromCache(graph, this, procedure);
886 runMultiRead(graph, entry, query, parent, this, listener, procedure);
890 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final Procedure<T> procedure) {
892 assert(query != null);
893 assert(procedure != null);
895 final ExternalReadEntry entry = externalReadMap.get(query);
897 if(parent == null && !(listener != null)) {
898 if(entry != null && entry.isReady()) {
899 entry.performFromCache(procedure);
904 runPrimitiveRead(graph, entry, query, parent, this, listener, procedure);
908 public <T> void performForEach(ReadGraphImpl parentGraph, final AsyncRead<T> query, final AsyncReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final AsyncProcedure<T> procedure,
909 boolean inferredDependency) {
911 if (DebugPolicy.PERFORM)
912 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
915 assert (!collecting);
917 assert(!entry.isDiscarded());
919 final ListenerEntry listenerEntry = registerDependencies(parentGraph, entry, parent, base, procedure, inferredDependency);
921 // FRESH, REFUTED, EXCEPTED go here
922 if (!entry.isReady()) {
930 final ReadGraphImpl finalParentGraph = parentGraph;
932 query.perform(parentGraph.withParent(entry), new AsyncProcedure<T>() {
935 public void execute(AsyncReadGraph returnGraph, T result) {
936 ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
937 //AsyncReadGraph resumeGraph = finalParentGraph.newAsync();
938 entry.addOrSet(finalParentGraph, result);
939 if(listenerEntry != null) {
940 primeListenerEntry(listenerEntry, result);
943 procedure.execute(finalParentGraph, result);
944 } catch (Throwable t) {
947 // parentBarrier.dec(query);
951 public void exception(AsyncReadGraph returnGraph, Throwable t) {
952 ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
953 // AsyncReadGraph resumeGraph = finalParentGraph.newAsync();
954 entry.except(finalParentGraph, t);
956 procedure.exception(finalParentGraph, t);
957 } catch (Throwable t2) {
958 t2.printStackTrace();
960 // parentBarrier.dec(query);
964 public String toString() {
965 return procedure.toString();
970 } catch (Throwable t) {
974 procedure.exception(parentGraph, t);
975 } catch (Throwable t2) {
976 t2.printStackTrace();
978 // parentBarrier.dec(query);
986 entry.performFromCache(parentGraph, this, new AsyncProcedure<T>() {
989 public void exception(AsyncReadGraph graph, Throwable throwable) {
990 procedure.exception(graph, throwable);
994 public void execute(AsyncReadGraph graph, T result) {
995 procedure.execute(graph, result);
996 if(listenerEntry != null) {
997 primeListenerEntry(listenerEntry, result);
1003 // parentBarrier.dec(query);
1009 assert (!entry.isDiscarded());
1013 public <T> T performForEach(final ReadGraphImpl graph, final Read<T> query, final ReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure,
1014 boolean inferredDependency) throws Throwable {
1016 if (DebugPolicy.PERFORM)
1017 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1020 assert (!collecting);
1022 entry.assertNotDiscarded();
1024 if(entry.isReady()) {
1026 // EXCEPTED goes here
1028 // if(procedure != null) entry.performFromCache(graph, this, procedure);
1029 // parentBarrier.dec(query);
1032 ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
1034 T result = (T)entry.get(graph, this, procedure);
1036 if(listenerEntry != null) primeListenerEntry(listenerEntry, result);
1042 // FRESH, REFUTED, PENDING go here
1049 ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
1051 final ReadGraphImpl performGraph = graph.newSync(entry);
1055 if(Development.DEVELOPMENT)
1056 Development.recordHistogram("run " + query);
1058 T result = query.perform(performGraph);
1059 entry.addOrSet(performGraph, result);
1061 if(listenerEntry != null) primeListenerEntry(listenerEntry, result);
1063 return (T)entry.get(graph, this, procedure);
1065 } catch (Throwable t) {
1068 return (T)entry.get(graph, this, procedure);
1076 public <T> void performForEach(final ReadGraphImpl graph, final MultiRead<T> query, final MultiReadEntry<T> entry, CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,
1077 boolean inferredDependency) {
1079 if (DebugPolicy.PERFORM)
1080 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1083 assert (!collecting);
1085 assert(!entry.isPending());
1086 assert(!entry.isDiscarded());
1088 // FRESH, REFUTED, EXCEPTED go here
1089 if (!entry.isReady()) {
1092 entry.clearResult(querySupport);
1094 multiReadMap.put(query, entry);
1097 final ReadGraphImpl newGraph = graph.newSync(entry);
1098 // newGraph.state.barrier.inc();
1102 query.perform(newGraph, new AsyncMultiProcedure<T>() {
1105 public void execute(AsyncReadGraph graph, T result) {
1106 entry.addOrSet(result);
1108 procedure.execute(graph, result);
1109 } catch (Throwable t) {
1110 t.printStackTrace();
1115 public void finished(AsyncReadGraph graph) {
1116 entry.finish(graph);
1118 procedure.finished(graph);
1119 } catch (Throwable t) {
1120 t.printStackTrace();
1122 // newGraph.state.barrier.dec();
1123 // parentBarrier.dec();
1127 public void exception(AsyncReadGraph graph, Throwable t) {
1130 procedure.exception(graph, t);
1131 } catch (Throwable t2) {
1132 t2.printStackTrace();
1134 // newGraph.state.barrier.dec();
1135 // parentBarrier.dec();
1140 } catch (DatabaseException e) {
1144 procedure.exception(graph, e);
1145 } catch (Throwable t2) {
1146 t2.printStackTrace();
1148 // newGraph.state.barrier.dec();
1149 // parentBarrier.dec();
1151 } catch (Throwable t) {
1153 DatabaseException e = new DatabaseException(t);
1157 procedure.exception(graph, e);
1158 } catch (Throwable t2) {
1159 t2.printStackTrace();
1161 // newGraph.state.barrier.dec();
1162 // parentBarrier.dec();
1170 entry.performFromCache(graph, this, procedure);
1176 assert (!entry.isDiscarded());
1178 registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
1183 public <T> void performForEach(final ReadGraphImpl callerGraph, AsyncMultiRead<T> query, final AsyncMultiReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,
1184 boolean inferredDependency) {
1186 if (DebugPolicy.PERFORM)
1187 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1190 assert (!collecting);
1194 assert(!entry.isDiscarded());
1196 // FRESH, REFUTED, EXCEPTED go here
1197 if (!entry.isReady()) {
1203 ReadGraphImpl performGraph = callerGraph.withAsyncParent(entry);
1205 query.perform(performGraph, new AsyncMultiProcedure<T>() {
1208 public void execute(AsyncReadGraph graph, T result) {
1209 ReadGraphImpl impl = (ReadGraphImpl)graph;
1210 // ReadGraphImpl executeGraph = callerGraph.newAsync();
1211 entry.addOrSet(result);
1213 procedure.execute(callerGraph, result);
1214 } catch (Throwable t) {
1215 t.printStackTrace();
1220 public void finished(AsyncReadGraph graph) {
1221 ReadGraphImpl impl = (ReadGraphImpl)graph;
1222 // ReadGraphImpl executeGraph = callerGraph.newAsync();
1223 entry.finish(callerGraph);
1225 procedure.finished(callerGraph);
1226 } catch (Throwable t) {
1227 t.printStackTrace();
1232 public void exception(AsyncReadGraph graph, Throwable t) {
1233 ReadGraphImpl impl = (ReadGraphImpl)graph;
1234 // ReadGraphImpl executeGraph = callerGraph.newAsync();
1235 entry.except(callerGraph, t);
1237 procedure.exception(callerGraph, t);
1238 } catch (Throwable t2) {
1239 t2.printStackTrace();
1245 } catch (Throwable t) {
1249 procedure.exception(callerGraph, t);
1250 } catch (Throwable t2) {
1251 t2.printStackTrace();
1261 entry.performFromCache(callerGraph, this, procedure);
1267 assert (!entry.isDiscarded());
1269 registerDependencies(callerGraph, entry, parent, listener, procedure, inferredDependency);
1271 } catch (Throwable t) {
1273 Logger.defaultLogError(t);
1281 public <T> void performForEach(ReadGraphImpl graph, final ExternalRead<T> query, final ExternalReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final Procedure<T> procedure,
1282 boolean inferredDependency) {
1284 if (DebugPolicy.PERFORM)
1285 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1288 assert (!collecting);
1290 assert(!entry.isPending());
1291 assert(!entry.isDiscarded());
1293 registerDependencies(graph, entry, parent, base, procedure, inferredDependency);
1295 // FRESH, REFUTED, EXCEPTED go here
1296 if (!entry.isReady()) {
1299 entry.clearResult(querySupport);
1301 externalReadMap.put(query, entry);
1306 query.register(graph, new Listener<T>() {
1308 AtomicBoolean used = new AtomicBoolean(false);
1311 public void execute(T result) {
1314 if(entry.isDiscarded()) return;
1315 if(entry.isExcepted()) entry.setPending();
1317 if(used.compareAndSet(false, true)) {
1318 entry.addOrSet(QueryProcessor.this, result);
1319 procedure.execute(result);
1321 entry.queue(result);
1322 updatePrimitive(query);
1328 public void exception(Throwable t) {
1332 if(used.compareAndSet(false, true)) {
1333 procedure.exception(t);
1335 // entry.queue(result);
1336 updatePrimitive(query);
1342 public String toString() {
1343 return procedure.toString();
1347 public boolean isDisposed() {
1348 return entry.isDiscarded() || !isBound(entry);
1353 } catch (Throwable t) {
1356 procedure.exception(t);
1364 entry.performFromCache(procedure);
1370 assert (!entry.isDiscarded());
1374 private boolean isBound(ExternalReadEntry<?> entry) {
1375 if(entry.hasParents()) return true;
1376 else if(hasListener(entry)) return true;
1380 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
1382 if (parent != null && !inferred) {
1384 if(!child.isImmutable(graph))
1385 child.addParent(parent);
1386 } catch (DatabaseException e) {
1387 Logger.defaultLogError(e);
1389 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
1392 if (listener != null) {
1393 return registerListener(child, listener, procedure);
1400 public <Procedure> void performForEach(ReadGraphImpl graph, BinaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {
1402 if (DebugPolicy.PERFORM)
1403 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1406 assert (!collecting);
1410 registerDependencies(graph, query, parent, listener, procedure, false);
1412 // FRESH, REFUTED, EXCEPTED go here
1413 if (!query.isReady()) {
1415 boolean fresh = query.isFresh();
1421 query.computeForEach(graph, this, procedure, true);
1427 query.performFromCache(graph, this, procedure);
1433 } catch (Throwable t) {
1435 Logger.defaultLogError(t);
1440 public <Procedure> Object performForEach(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {
1442 if (DebugPolicy.PERFORM)
1443 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1446 assert (!collecting);
1450 assert(query.assertNotDiscarded());
1452 registerDependencies(graph, query, parent, listener, procedure, false);
1454 // FRESH, REFUTED, EXCEPTED go here
1455 if (!query.isReady()) {
1460 return query.computeForEach(graph, this, procedure, true);
1467 return query.performFromCache(graph, this, procedure);
1471 } catch (Throwable t) {
1473 Logger.defaultLogError(t);
1480 static class Dummy implements InternalProcedure<Object>, IntProcedure {
1483 public void execute(ReadGraphImpl graph, int i) {
1487 public void finished(ReadGraphImpl graph) {
1491 public void execute(ReadGraphImpl graph, Object result) {
1495 public void exception(ReadGraphImpl graph, Throwable throwable) {
1500 private static final Dummy dummy = new Dummy();
1502 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
1504 if (DebugPolicy.PERFORM)
1505 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1508 assert (!collecting);
1510 assert(query.assertNotDiscarded());
1512 registerDependencies(graph, query, parent, listener, procedure, false);
1514 // FRESH, REFUTED, EXCEPTED go here
1515 if (!query.isReady()) {
1520 query.computeForEach(graph, this, (Procedure)dummy, true);
1521 return query.get(graph, this, null);
1527 return query.get(graph, this, procedure);
1533 public <Procedure> void performForEach(ReadGraphImpl graph, StringQuery<Procedure> query, CacheEntry parent, final ListenerBase listener, Procedure procedure) {
1535 if (DebugPolicy.PERFORM)
1536 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1539 assert (!collecting);
1543 if(query.isDiscarded()) {
1544 System.err.println("aff");
1546 assert(!query.isDiscarded());
1548 // FRESH, REFUTED, EXCEPTED go here
1549 if (!query.isReady()) {
1551 query.computeForEach(graph.withAsyncParent(query), this, procedure);
1558 query.performFromCache(graph, this, procedure);
1564 assert (!query.isDiscarded());
1566 registerDependencies(graph, query, parent, listener, procedure, false);
1568 } catch (Throwable t) {
1570 t.printStackTrace();
1571 Logger.defaultLogError(t);
1577 interface QueryCollectorSupport {
1578 public CacheCollectionResult allCaches();
1579 public Collection<CacheEntry> getRootList();
1580 public int getCurrentSize();
1581 public int calculateCurrentSize();
1582 public CacheEntryBase iterate(int level);
1583 public void remove();
1584 public void setLevel(CacheEntryBase entry, int level);
1585 public boolean start(boolean flush);
1588 interface QueryCollector {
1590 public void collect(int youngTarget, int allowedTimeInMs);
1594 class QueryCollectorSupportImpl implements QueryCollectorSupport {
1596 private static final boolean DEBUG = false;
1597 private static final double ITERATION_RATIO = 0.2;
1599 private CacheCollectionResult iteration = new CacheCollectionResult();
1600 private boolean fresh = true;
1601 private boolean needDataInStart = true;
1603 QueryCollectorSupportImpl() {
1604 iteration.restart();
1607 public CacheCollectionResult allCaches() {
1608 CacheCollectionResult result = new CacheCollectionResult();
1609 QueryProcessor.this.allCaches(result);
1614 public boolean start(boolean flush) {
1615 // We need new data from query maps
1617 if(needDataInStart || flush) {
1618 // Last run ended after processing all queries => refresh data
1619 restart(flush ? 0.0 : ITERATION_RATIO);
1621 // continue with previous big data
1623 // Notify caller about iteration situation
1624 return iteration.isAtStart();
1627 private void restart(double targetRatio) {
1629 needDataInStart = true;
1631 long start = System.nanoTime();
1634 // We need new data from query maps
1636 int iterationSize = iteration.size()+1;
1637 int diff = calculateCurrentSize()-iterationSize;
1639 double ratio = (double)diff / (double)iterationSize;
1640 boolean dirty = Math.abs(ratio) >= targetRatio;
1643 iteration = allCaches();
1645 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
1646 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
1647 System.err.print(" " + iteration.levels[i].size());
1648 System.err.println("");
1651 iteration.restart();
1655 needDataInStart = false;
1657 // We are returning here within the same GC round - reuse the cache table
1658 iteration.restart();
1666 public CacheEntryBase iterate(int level) {
1668 CacheEntryBase entry = iteration.next(level);
1670 restart(ITERATION_RATIO);
1674 while(entry != null && entry.isDiscarded()) {
1675 entry = iteration.next(level);
1683 public void remove() {
1688 public void setLevel(CacheEntryBase entry, int level) {
1689 iteration.setLevel(entry, level);
1692 public Collection<CacheEntry> getRootList() {
1694 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>();
1696 for (Object e : valueMap.values()) {
1697 result.add((CacheEntry) e);
1699 for (Object e : directPredicatesMap.values()) {
1700 result.add((CacheEntry) e);
1702 for (Object e : objectsMap.values()) {
1703 result.add((CacheEntry) e);
1705 for (Object e : directObjectsMap.values()) {
1706 result.add((CacheEntry) e);
1708 for (Object e : principalTypesMap.values()) {
1709 result.add((CacheEntry) e);
1711 for (Object e : superRelationsMap.values()) {
1712 result.add((CacheEntry) e);
1714 for (Object e : superTypesMap.values()) {
1715 result.add((CacheEntry) e);
1717 for (Object e : typesMap.values()) {
1718 result.add((CacheEntry) e);
1720 for (Object e : objectsMap.values()) {
1721 result.add((CacheEntry) e);
1723 for (Object e : assertedStatementsMap.values()) {
1724 result.add((CacheEntry) e);
1726 for (Object e : readMap.values()) {
1727 if(e instanceof CacheEntry) {
1728 result.add((CacheEntry) e);
1730 System.err.println("e=" + e);
1733 for (Object e : asyncReadMap.values()) {
1734 if(e instanceof CacheEntry) {
1735 result.add((CacheEntry) e);
1737 System.err.println("e=" + e);
1740 for (Object e : externalReadMap.values()) {
1741 result.add((CacheEntry) e);
1743 for (Object e : orderedSetMap.values()) {
1744 result.add((CacheEntry) e);
1752 public int calculateCurrentSize() {
1756 realSize += directPredicatesMap.size();
1757 realSize += principalTypesMap.size();
1758 realSize += uriToResourceMap.size();
1759 realSize += namespaceIndexMap22.size();
1760 realSize += projectsMap.size();
1762 realSize += relationInfoMap.size();
1763 realSize += superTypesMap.size();
1764 realSize += typeHierarchyMap.size();
1765 realSize += superRelationsMap.size();
1766 realSize += typesMap.size();
1768 realSize += valueMap.size();
1769 realSize += directObjectsMap.size();
1770 realSize += objectsMap.size();
1771 realSize += orderedSetMap.size();
1772 realSize += predicatesMap.size();
1774 realSize += statementsMap.size();
1775 realSize += assertedPredicatesMap.size();
1776 realSize += assertedStatementsMap.size();
1777 realSize += externalReadMap.size();
1778 realSize += asyncReadMap.size();
1780 realSize += readMap.size();
1781 realSize += asyncMultiReadMap.size();
1782 realSize += multiReadMap.size();
1791 public int getCurrentSize() {
1796 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
1798 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
1799 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
1801 public int querySize() {
1805 public void gc(int youngTarget, int allowedTimeInMs) {
1807 collector.collect(youngTarget, allowedTimeInMs);
1811 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
1813 assert (entry != null);
1815 if (base.isDisposed())
1818 return addListener(entry, base, procedure);
1822 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
1823 entry.setLastKnown(result);
1826 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
1828 assert (entry != null);
1829 assert (procedure != null);
1831 ArrayList<ListenerEntry> list = listeners.get(entry);
1833 list = new ArrayList<ListenerEntry>(1);
1834 listeners.put(entry, list);
1837 ListenerEntry result = new ListenerEntry(entry, base, procedure);
1838 int currentIndex = list.indexOf(result);
1839 // There was already a listener
1840 if(currentIndex > -1) {
1841 ListenerEntry current = list.get(currentIndex);
1842 if(!current.base.isDisposed()) return null;
1843 list.set(currentIndex, result);
1848 if(DebugPolicy.LISTENER) {
1849 new Exception().printStackTrace();
1850 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
1857 private void scheduleListener(ListenerEntry entry) {
1858 assert (entry != null);
1859 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
1860 scheduledListeners.add(entry);
1863 private void removeListener(ListenerEntry entry) {
1864 assert (entry != null);
1865 ArrayList<ListenerEntry> list = listeners.get(entry.entry);
1866 if(list == null) return;
1867 boolean success = list.remove(entry);
1870 listeners.remove(entry.entry);
1873 private boolean hasListener(CacheEntry entry) {
1874 if(listeners.get(entry) != null) return true;
1878 boolean hasListenerAfterDisposing(CacheEntry entry) {
1879 if(listeners.get(entry) != null) {
1880 ArrayList<ListenerEntry> entries = listeners.get(entry);
1881 ArrayList<ListenerEntry> list = null;
1882 for (ListenerEntry e : entries) {
1883 if (e.base.isDisposed()) {
1884 if(list == null) list = new ArrayList<ListenerEntry>();
1889 for (ListenerEntry e : list) {
1893 if (entries.isEmpty()) {
1894 listeners.remove(entry);
1902 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
1903 hasListenerAfterDisposing(entry);
1904 if(listeners.get(entry) != null)
1905 return listeners.get(entry);
1907 return Collections.emptyList();
1910 void processListenerReport(CacheEntry entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
1912 if(!workarea.containsKey(entry)) {
1914 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
1915 for(ListenerEntry e : getListenerEntries(entry))
1918 workarea.put(entry, ls);
1920 for(CacheEntry parent : entry.getParents(this)) {
1921 processListenerReport(parent, workarea);
1922 ls.addAll(workarea.get(parent));
1929 public synchronized ListenerReport getListenerReport() throws IOException {
1931 class ListenerReportImpl implements ListenerReport {
1933 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
1936 public void print(PrintStream b) {
1937 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
1938 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
1939 for(ListenerBase l : e.getValue()) {
1940 Integer i = hist.get(l);
1941 hist.put(l, i != null ? i-1 : -1);
1945 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1946 b.print("" + -p.second + " " + p.first + "\n");
1954 ListenerReportImpl result = new ListenerReportImpl();
1956 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1957 for(CacheEntryBase entry : all) {
1958 hasListenerAfterDisposing(entry);
1960 for(CacheEntryBase entry : all) {
1961 processListenerReport(entry, result.workarea);
1968 public synchronized String reportListeners(File file) throws IOException {
1973 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1974 ListenerReport report = getListenerReport();
1977 return "Done reporting listeners.";
1981 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1983 if(entry.isDiscarded()) return;
1984 if(workarea.containsKey(entry)) return;
1986 Iterable<CacheEntry> parents = entry.getParents(this);
1987 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1988 for(CacheEntry e : parents) {
1989 if(e.isDiscarded()) continue;
1991 processParentReport(e, workarea);
1993 workarea.put(entry, ps);
1997 public synchronized String reportQueryActivity(File file) throws IOException {
1999 System.err.println("reportQueries " + file.getAbsolutePath());
2004 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
2006 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
2007 Collections.reverse(entries);
2009 for(Pair<String,Integer> entry : entries) {
2010 b.println(entry.first + ": " + entry.second);
2015 Development.histogram.clear();
2021 public synchronized String reportQueries(File file) throws IOException {
2023 System.err.println("reportQueries " + file.getAbsolutePath());
2028 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
2030 long start = System.nanoTime();
2032 // ArrayList<CacheEntry> all = ;
2034 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
2035 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
2036 for(CacheEntryBase entry : caches) {
2037 processParentReport(entry, workarea);
2040 // for(CacheEntry e : all) System.err.println("entry: " + e);
2042 long duration = System.nanoTime() - start;
2043 System.err.println("Query root set in " + 1e-9*duration + "s.");
2045 start = System.nanoTime();
2047 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
2051 for(CacheEntry entry : workarea.keySet()) {
2052 boolean listener = hasListenerAfterDisposing(entry);
2053 boolean hasParents = entry.getParents(this).iterator().hasNext();
2056 flagMap.put(entry, 0);
2057 } else if (!hasParents) {
2059 flagMap.put(entry, 1);
2062 flagMap.put(entry, 2);
2064 // // Write leaf bit
2065 // entry.flags |= 4;
2068 boolean done = true;
2075 long start2 = System.nanoTime();
2077 int boundCounter = 0;
2078 int unboundCounter = 0;
2079 int unknownCounter = 0;
2081 for(CacheEntry entry : workarea.keySet()) {
2083 //System.err.println("process " + entry);
2085 int flags = flagMap.get(entry);
2086 int bindStatus = flags & 3;
2088 if(bindStatus == 0) boundCounter++;
2089 else if(bindStatus == 1) unboundCounter++;
2090 else if(bindStatus == 2) unknownCounter++;
2092 if(bindStatus < 2) continue;
2095 for(CacheEntry parent : entry.getParents(this)) {
2097 if(parent.isDiscarded()) flagMap.put(parent, 1);
2099 int flags2 = flagMap.get(parent);
2100 int bindStatus2 = flags2 & 3;
2101 // Parent is bound => child is bound
2102 if(bindStatus2 == 0) {
2106 // Parent is unknown => child is unknown
2107 else if (bindStatus2 == 2) {
2114 flagMap.put(entry, newStatus);
2118 duration = System.nanoTime() - start2;
2119 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
2120 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
2122 } while(!done && loops++ < 20);
2126 for(CacheEntry entry : workarea.keySet()) {
2128 int bindStatus = flagMap.get(entry);
2129 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
2135 duration = System.nanoTime() - start;
2136 System.err.println("Query analysis in " + 1e-9*duration + "s.");
2138 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
2140 for(CacheEntry entry : workarea.keySet()) {
2141 Class<?> clazz = entry.getClass();
2142 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
2143 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
2144 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
2145 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
2146 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
2147 Integer c = counts.get(clazz);
2148 if(c == null) counts.put(clazz, -1);
2149 else counts.put(clazz, c-1);
2152 b.print("// Simantics DB client query report file\n");
2153 b.print("// This file contains the following information\n");
2154 b.print("// -The amount of cached query instances per query class\n");
2155 b.print("// -The sizes of retained child sets\n");
2156 b.print("// -List of parents for each query (search for 'P <query name>')\n");
2157 b.print("// -Followed by status, where\n");
2158 b.print("// -0=bound\n");
2159 b.print("// -1=free\n");
2160 b.print("// -2=unknown\n");
2161 b.print("// -L=has listener\n");
2162 b.print("// -List of children for each query (search for 'C <query name>')\n");
2164 b.print("----------------------------------------\n");
2166 b.print("// Queries by class\n");
2167 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
2168 b.print(-p.second + " " + p.first.getName() + "\n");
2171 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
2172 for(CacheEntry e : workarea.keySet())
2175 boolean changed = true;
2177 while(changed && iter++<50) {
2181 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
2182 for(CacheEntry e : workarea.keySet())
2185 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
2186 Integer c = hist.get(e.getKey());
2187 for(CacheEntry p : e.getValue()) {
2188 Integer i = newHist.get(p);
2189 newHist.put(p, i+c);
2192 for(CacheEntry e : workarea.keySet()) {
2193 Integer value = newHist.get(e);
2194 Integer old = hist.get(e);
2195 if(!value.equals(old)) {
2197 // System.err.println("hist " + e + ": " + old + " => " + value);
2202 System.err.println("Retained set iteration " + iter);
2206 b.print("// Queries by retained set\n");
2207 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
2208 b.print("" + -p.second + " " + p.first + "\n");
2211 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
2213 b.print("// Entry parent listing\n");
2214 for(CacheEntry entry : workarea.keySet()) {
2215 int status = flagMap.get(entry);
2216 boolean hasListener = hasListenerAfterDisposing(entry);
2217 b.print("Q " + entry.toString());
2219 b.print(" (L" + status + ")");
2222 b.print(" (" + status + ")");
2225 for(CacheEntry parent : workarea.get(entry)) {
2226 Collection<CacheEntry> inv = inverse.get(parent);
2228 inv = new ArrayList<CacheEntry>();
2229 inverse.put(parent, inv);
2232 b.print(" " + parent.toString());
2237 b.print("// Entry child listing\n");
2238 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
2239 b.print("C " + entry.getKey().toString());
2241 for(CacheEntry child : entry.getValue()) {
2242 Integer h = hist.get(child);
2246 b.print(" <no children>");
2248 b.print(" " + child.toString());
2253 b.print("#queries: " + workarea.keySet().size() + "\n");
2254 b.print("#listeners: " + listeners + "\n");
2258 return "Dumped " + workarea.keySet().size() + " queries.";
2264 public CacheEntry caller;
2266 public CacheEntry entry;
2270 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
2271 this.caller = caller;
2273 this.indent = indent;
2278 boolean removeQuery(CacheEntry entry) {
2280 // This entry has been removed before. No need to do anything here.
2281 if(entry.isDiscarded()) return false;
2283 assert (!entry.isDiscarded());
2285 Query query = entry.getQuery();
2287 query.removeEntry(this);
2292 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
2303 * @return true if this entry is being listened
2305 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
2309 CacheEntry entry = e.entry;
2311 // System.err.println("updateQuery " + entry);
2314 * If the dependency graph forms a DAG, some entries are inserted in the
2315 * todo list many times. They only need to be processed once though.
2317 if (entry.isDiscarded()) {
2318 if (Development.DEVELOPMENT) {
2319 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2320 System.out.print("D");
2321 for (int i = 0; i < e.indent; i++)
2322 System.out.print(" ");
2323 System.out.println(entry.getQuery());
2326 // System.err.println(" => DISCARDED");
2330 if (entry.isRefuted()) {
2331 if (Development.DEVELOPMENT) {
2332 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2333 System.out.print("R");
2334 for (int i = 0; i < e.indent; i++)
2335 System.out.print(" ");
2336 System.out.println(entry.getQuery());
2342 if (entry.isExcepted()) {
2343 if (Development.DEVELOPMENT) {
2344 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2345 System.out.print("E");
2350 if (entry.isPending()) {
2351 if (Development.DEVELOPMENT) {
2352 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2353 System.out.print("P");
2360 if (Development.DEVELOPMENT) {
2361 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2362 System.out.print("U ");
2363 for (int i = 0; i < e.indent; i++)
2364 System.out.print(" ");
2365 System.out.print(entry.getQuery());
2369 Query query = entry.getQuery();
2370 int type = query.type();
2372 boolean hasListener = hasListener(entry);
2374 if (Development.DEVELOPMENT) {
2375 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2376 if(hasListener(entry)) {
2377 System.out.println(" (L)");
2379 System.out.println("");
2384 if(entry.isPending() || entry.isExcepted()) {
2387 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
2389 immediates.put(entry, entry);
2404 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
2406 immediates.put(entry, entry);
2420 // System.err.println(" => FOO " + type);
2423 ArrayList<ListenerEntry> entries = listeners.get(entry);
2424 if(entries != null) {
2425 for (ListenerEntry le : entries) {
2426 scheduleListener(le);
2431 // If invalid, update parents
2432 if (type == RequestFlags.INVALIDATE) {
2433 updateParents(e.indent, entry, todo);
2440 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
2442 Iterable<CacheEntry> oldParents = entry.getParents(this);
2443 for (CacheEntry parent : oldParents) {
2444 // System.err.println("updateParents " + entry + " => " + parent);
2445 if(!parent.isDiscarded())
2446 todo.push(new UpdateEntry(entry, parent, indent + 2));
2451 private boolean pruneListener(ListenerEntry entry) {
2452 if (entry.base.isDisposed()) {
2453 removeListener(entry);
2461 * @param av1 an array (guaranteed)
2462 * @param av2 any object
2463 * @return <code>true</code> if the two arrays are equal
2465 private final boolean arrayEquals(Object av1, Object av2) {
2468 Class<?> c1 = av1.getClass().getComponentType();
2469 Class<?> c2 = av2.getClass().getComponentType();
2470 if (c2 == null || !c1.equals(c2))
2472 boolean p1 = c1.isPrimitive();
2473 boolean p2 = c2.isPrimitive();
2477 return Arrays.equals((Object[]) av1, (Object[]) av2);
2478 if (boolean.class.equals(c1))
2479 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
2480 else if (byte.class.equals(c1))
2481 return Arrays.equals((byte[]) av1, (byte[]) av2);
2482 else if (int.class.equals(c1))
2483 return Arrays.equals((int[]) av1, (int[]) av2);
2484 else if (long.class.equals(c1))
2485 return Arrays.equals((long[]) av1, (long[]) av2);
2486 else if (float.class.equals(c1))
2487 return Arrays.equals((float[]) av1, (float[]) av2);
2488 else if (double.class.equals(c1))
2489 return Arrays.equals((double[]) av1, (double[]) av2);
2490 throw new RuntimeException("??? Contact application querySupport.");
2495 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
2499 Query query = entry.getQuery();
2501 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
2503 entry.prepareRecompute(querySupport);
2505 ReadGraphImpl parentGraph = graph.withParent(entry);
2507 query.recompute(parentGraph, this, entry);
2509 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
2511 Object newValue = entry.getResult();
2513 if (ListenerEntry.NO_VALUE == oldValue) {
2514 if(DebugPolicy.CHANGES) {
2515 System.out.println("C " + query);
2516 System.out.println("- " + oldValue);
2517 System.out.println("- " + newValue);
2522 boolean changed = false;
2524 if (newValue != null) {
2525 if (newValue.getClass().isArray()) {
2526 changed = !arrayEquals(newValue, oldValue);
2528 changed = !newValue.equals(oldValue);
2531 changed = (oldValue != null);
2533 if(DebugPolicy.CHANGES && changed) {
2534 System.out.println("C " + query);
2535 System.out.println("- " + oldValue);
2536 System.out.println("- " + newValue);
2539 return changed ? newValue : ListenerEntry.NOT_CHANGED;
2541 } catch (Throwable t) {
2543 Logger.defaultLogError(t);
2545 return ListenerEntry.NO_VALUE;
2551 public boolean hasScheduledUpdates() {
2552 return !scheduledListeners.isEmpty();
2555 public void performScheduledUpdates(WriteGraphImpl graph) {
2558 assert (!collecting);
2559 assert (!firingListeners);
2561 firingListeners = true;
2565 // Performing may cause further events to be scheduled.
2566 while (!scheduledListeners.isEmpty()) {
2569 // graph.state.barrier.inc();
2571 // Clone current events to make new entries possible during
2573 THashSet<ListenerEntry> entries = scheduledListeners;
2574 scheduledListeners = new THashSet<ListenerEntry>();
2576 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
2578 for (ListenerEntry listenerEntry : entries) {
2580 if (pruneListener(listenerEntry)) {
2581 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
2585 final CacheEntry entry = listenerEntry.entry;
2586 assert (entry != null);
2588 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
2590 if (newValue != ListenerEntry.NOT_CHANGED) {
2591 if(DebugPolicy.LISTENER)
2592 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
2593 schedule.add(listenerEntry);
2594 listenerEntry.setLastKnown(entry.getResult());
2599 for(ListenerEntry listenerEntry : schedule) {
2600 final CacheEntry entry = listenerEntry.entry;
2601 if(DebugPolicy.LISTENER)
2602 System.out.println("Firing " + listenerEntry.procedure);
2604 if(DebugPolicy.LISTENER)
2605 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
2606 entry.performFromCache(graph, this, listenerEntry.procedure);
2607 } catch (Throwable t) {
2608 t.printStackTrace();
2612 // graph.state.barrier.dec();
2613 // graph.waitAsync(null);
2614 // graph.state.barrier.assertReady();
2619 firingListeners = false;
2626 * @return true if this entry still has listeners
2628 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
2630 assert (!collecting);
2634 boolean hadListeners = false;
2635 boolean listenersUnknown = false;
2639 assert(entry != null);
2640 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
2641 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
2642 todo.add(new UpdateEntry(null, entry, 0));
2646 // Walk the tree and collect immediate updates
2647 while (!todo.isEmpty()) {
2648 UpdateEntry e = todo.pop();
2649 hadListeners |= updateQuery(e, todo, immediates);
2652 if(immediates.isEmpty()) break;
2654 // Evaluate all immediate updates and collect parents to update
2655 for(CacheEntry immediate : immediates.values()) {
2657 if(immediate.isDiscarded()) {
2661 if(immediate.isExcepted()) {
2663 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
2664 if (newValue != ListenerEntry.NOT_CHANGED)
2665 updateParents(0, immediate, todo);
2669 Object oldValue = immediate.getResult();
2670 Object newValue = compareTo(graph, immediate, oldValue);
2672 if (newValue != ListenerEntry.NOT_CHANGED) {
2673 updateParents(0, immediate, todo);
2675 // If not changed, keep the old value
2676 immediate.setResult(oldValue);
2677 listenersUnknown = true;
2687 } catch (Throwable t) {
2688 Logger.defaultLogError(t);
2694 return hadListeners | listenersUnknown;
2698 volatile public boolean dirty = false;
2700 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
2701 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
2702 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
2703 // Maybe use a mutex from util.concurrent?
2704 private Object primitiveUpdateLock = new Object();
2705 private THashSet scheduledPrimitiveUpdates = new THashSet();
2707 public void performDirtyUpdates(final ReadGraphImpl graph) {
2712 if (Development.DEVELOPMENT) {
2713 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2714 System.err.println("== Query update ==");
2718 // Special case - one statement
2719 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
2721 long arg0 = scheduledObjectUpdates.getFirst();
2723 final int subject = (int)(arg0 >>> 32);
2724 final int predicate = (int)(arg0 & 0xffffffff);
2726 for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);
2727 for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);
2728 for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);
2730 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
2731 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);
2732 if(principalTypes != null) update(graph, principalTypes);
2733 Types types = Types.entry(QueryProcessor.this, subject);
2734 if(types != null) update(graph, types);
2737 if(predicate == subrelationOf) {
2738 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
2739 if(superRelations != null) update(graph, superRelations);
2742 DirectPredicates dp = DirectPredicates.entry(QueryProcessor.this, subject);
2743 if(dp != null) update(graph, dp);
2744 OrderedSet os = OrderedSet.entry(QueryProcessor.this, predicate);
2745 if(os != null) update(graph, os);
2747 scheduledObjectUpdates.clear();
2752 // Special case - one value
2753 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
2755 int arg0 = scheduledValueUpdates.getFirst();
2757 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);
2758 if(valueQuery != null) update(graph, valueQuery);
2760 scheduledValueUpdates.clear();
2765 final TIntHashSet predicates = new TIntHashSet();
2766 final TIntHashSet orderedSets = new TIntHashSet();
2768 THashSet primitiveUpdates;
2769 synchronized (primitiveUpdateLock) {
2770 primitiveUpdates = scheduledPrimitiveUpdates;
2771 scheduledPrimitiveUpdates = new THashSet();
2774 primitiveUpdates.forEach(new TObjectProcedure() {
2777 public boolean execute(Object arg0) {
2779 ExternalReadEntry query = (ExternalReadEntry)externalReadMap.get(arg0);
2780 if (query != null) {
2781 boolean listening = update(graph, query);
2782 if (!listening && !query.hasParents()) {
2783 externalReadMap.remove(arg0);
2792 scheduledValueUpdates.forEach(new TIntProcedure() {
2795 public boolean execute(int arg0) {
2796 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);
2797 if(valueQuery != null) update(graph, valueQuery);
2803 scheduledInvalidates.forEach(new TIntProcedure() {
2806 public boolean execute(int resource) {
2808 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, resource);
2809 if(valueQuery != null) update(graph, valueQuery);
2811 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, resource);
2812 if(principalTypes != null) update(graph, principalTypes);
2813 Types types = Types.entry(QueryProcessor.this, resource);
2814 if(types != null) update(graph, types);
2816 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
2817 if(superRelations != null) update(graph, superRelations);
2819 predicates.add(resource);
2826 scheduledObjectUpdates.forEach(new TLongProcedure() {
2829 public boolean execute(long arg0) {
2831 final int subject = (int)(arg0 >>> 32);
2832 final int predicate = (int)(arg0 & 0xffffffff);
2834 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
2835 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);
2836 if(principalTypes != null) update(graph, principalTypes);
2837 Types types = Types.entry(QueryProcessor.this, subject);
2838 if(types != null) update(graph, types);
2841 if(predicate == subrelationOf) {
2842 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
2843 if(superRelations != null) update(graph, superRelations);
2846 predicates.add(subject);
2847 orderedSets.add(predicate);
2855 predicates.forEach(new TIntProcedure() {
2858 public boolean execute(final int subject) {
2860 for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);
2861 for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);
2862 for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);
2864 DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
2865 if(entry != null) update(graph, entry);
2873 orderedSets.forEach(new TIntProcedure() {
2876 public boolean execute(int orderedSet) {
2878 OrderedSet entry = OrderedSet.entry(QueryProcessor.this, orderedSet);
2879 if(entry != null) update(graph, entry);
2887 // for (Integer subject : predicates) {
2888 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
2889 // if(entry != null) update(graph, entry);
2893 if (Development.DEVELOPMENT) {
2894 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2895 System.err.println("== Query update ends ==");
2899 scheduledValueUpdates.clear();
2900 scheduledObjectUpdates.clear();
2901 scheduledInvalidates.clear();
2905 public void updateValue(final int resource) {
2906 scheduledValueUpdates.add(resource);
2910 public void updateStatements(final int resource, final int predicate) {
2911 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
2915 private int lastInvalidate = 0;
2917 public void invalidateResource(final int resource) {
2918 if(lastInvalidate == resource) return;
2919 scheduledValueUpdates.add(resource);
2920 lastInvalidate = resource;
2924 public void updatePrimitive(final ExternalRead primitive) {
2926 // External reads may be updated from arbitrary threads.
2927 // Synchronize to prevent race-conditions.
2928 synchronized (primitiveUpdateLock) {
2929 scheduledPrimitiveUpdates.add(primitive);
2931 querySupport.dirtyPrimitives();
2936 public synchronized String toString() {
2937 return "QueryProvider [size = " + size + ", hits = " + hits + " misses = " + misses + ", updates = " + updates + "]";
2941 protected void doDispose() {
2943 for(int index = 0; index < THREADS; index++) {
2944 executors[index].dispose();
2948 for(int i=0;i<100;i++) {
2950 boolean alive = false;
2951 for(int index = 0; index < THREADS; index++) {
2952 alive |= executors[index].isAlive();
2957 } catch (InterruptedException e) {
2958 Logger.defaultLogError(e);
2963 // Then start interrupting
2964 for(int i=0;i<100;i++) {
2966 boolean alive = false;
2967 for(int index = 0; index < THREADS; index++) {
2968 alive |= executors[index].isAlive();
2971 for(int index = 0; index < THREADS; index++) {
2972 executors[index].interrupt();
2976 // // Then just destroy
2977 // for(int index = 0; index < THREADS; index++) {
2978 // executors[index].destroy();
2981 for(int index = 0; index < THREADS; index++) {
2983 executors[index].join(5000);
2984 } catch (InterruptedException e) {
2985 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2987 executors[index] = null;
2992 public int getHits() {
2996 public int getMisses() {
3000 public int getSize() {
3004 public Set<Long> getReferencedClusters() {
3005 HashSet<Long> result = new HashSet<Long>();
3006 for (CacheEntry entry : objectsMap.values()) {
3007 Objects query = (Objects) entry.getQuery();
3008 result.add(querySupport.getClusterId(query.r1()));
3010 for (CacheEntry entry : directPredicatesMap.values()) {
3011 DirectPredicates query = (DirectPredicates) entry.getQuery();
3012 result.add(querySupport.getClusterId(query.id));
3014 for (CacheEntry entry : valueMap.values()) {
3015 ValueQuery query = (ValueQuery) entry.getQuery();
3016 result.add(querySupport.getClusterId(query.id));
3021 public void assertDone() {
3024 CacheCollectionResult allCaches(CacheCollectionResult result) {
3026 int level = Integer.MAX_VALUE;
3027 directPredicatesMap.values(level, result);
3028 principalTypesMap.values(level, result);
3029 for(CacheEntryBase e : uriToResourceMap.values())
3030 if(e.getLevel() <= level)
3032 for(CacheEntryBase e : namespaceIndexMap22.values())
3033 if(e.getLevel() <= level)
3035 projectsMap.values(level, result);
3037 relationInfoMap.values(level, result);
3038 superTypesMap.values(level, result);
3039 typeHierarchyMap.values(level, result);
3040 superRelationsMap.values(level, result);
3041 typesMap.values(level, result);
3043 valueMap.values(level, result);
3044 directObjectsMap.values(level, result);
3045 objectsMap.values(level, result);
3046 orderedSetMap.values(level, result);
3047 predicatesMap.values(level, result);
3049 statementsMap.values(level, result);
3050 assertedPredicatesMap.values(level, result);
3051 assertedStatementsMap.values(level, result);
3052 externalReadMap.values(level, result);
3053 asyncReadMap.values(level, result);
3055 readMap.values(level, result);
3056 asyncMultiReadMap.values(level, result);
3057 multiReadMap.values(level, result);
3063 public void printDiagnostics() {
3066 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
3067 querySupport.requestCluster(graph, clusterId, runnable);
3070 public int clean() {
3071 collector.collect(0, Integer.MAX_VALUE);
3075 public void clean(final Collection<ExternalRead<?>> requests) {
3076 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
3077 Iterator<ExternalRead<?>> iterator = requests.iterator();
3079 public CacheCollectionResult allCaches() {
3080 throw new UnsupportedOperationException();
3083 public CacheEntryBase iterate(int level) {
3084 if(iterator.hasNext()) {
3085 ExternalRead<?> request = iterator.next();
3086 ExternalReadEntry entry = externalReadMap.get(request);
3087 if (entry != null) return entry;
3088 else return iterate(level);
3090 iterator = requests.iterator();
3095 public void remove() {
3096 throw new UnsupportedOperationException();
3099 public void setLevel(CacheEntryBase entry, int level) {
3100 throw new UnsupportedOperationException();
3103 public Collection<CacheEntry> getRootList() {
3104 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
3105 for (ExternalRead<?> request : requests) {
3106 ExternalReadEntry entry = externalReadMap.get(request);
3113 public int getCurrentSize() {
3117 public int calculateCurrentSize() {
3118 // This tells the collector to attempt collecting everything.
3119 return Integer.MAX_VALUE;
3122 public boolean start(boolean flush) {
3126 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
3129 public void scanPending() {
3131 ArrayList<CacheEntry> entries = new ArrayList<CacheEntry>();
3133 entries.addAll(directPredicatesMap.values());
3134 entries.addAll(principalTypesMap.values());
3135 entries.addAll(uriToResourceMap.values());
3136 entries.addAll(namespaceIndexMap22.values());
3137 entries.addAll(projectsMap.values());
3138 entries.addAll(relationInfoMap.values());
3139 entries.addAll(superTypesMap.values());
3140 entries.addAll(superRelationsMap.values());
3141 entries.addAll(typesMap.values());
3142 entries.addAll(valueMap.values());
3143 entries.addAll(directObjectsMap.values());
3144 entries.addAll(objectsMap.values());
3145 entries.addAll(orderedSetMap.values());
3146 entries.addAll(predicatesMap.values());
3147 entries.addAll(orderedSetMap.values());
3148 entries.addAll(statementsMap.values());
3149 // entries.addAll(assertedObjectsMap.values());
3150 entries.addAll(assertedPredicatesMap.values());
3151 entries.addAll(assertedStatementsMap.values());
3152 entries.addAll(externalReadMap.values());
3153 entries.addAll(asyncReadMap.values());
3154 entries.addAll(externalReadMap.values());
3155 entries.addAll(readMap.values());
3156 entries.addAll(asyncMultiReadMap.values());
3157 entries.addAll(multiReadMap.values());
3158 entries.addAll(readMap.values());
3159 System.out.println(entries.size() + " entries.");
3160 for(Object e : entries) {
3161 if(e instanceof CacheEntry) {
3162 CacheEntry en = (CacheEntry)e;
3163 if(en.isPending()) System.out.println("pending " + e);
3164 if(en.isExcepted()) System.out.println("excepted " + e);
3165 if(en.isDiscarded()) System.out.println("discarded " + e);
3166 if(en.isRefuted()) System.out.println("refuted " + e);
3167 if(en.isFresh()) System.out.println("fresh " + e);
3169 //System.out.println("Unknown object " + e);
3175 public ReadGraphImpl graphForVirtualRequest() {
3176 return ReadGraphImpl.createAsync(this);
3180 private HashMap<Resource, Class<?>> builtinValues;
3182 public Class<?> getBuiltinValue(Resource r) {
3183 if(builtinValues == null) initBuiltinValues();
3184 return builtinValues.get(r);
3187 Exception callerException = null;
3189 public interface AsyncBarrier {
3192 // public void inc(String debug);
3193 // public void dec(String debug);
3196 // final public QueryProcessor processor;
3197 // final public QuerySupport support;
3199 // boolean disposed = false;
3201 private void initBuiltinValues() {
3203 Layer0 b = getSession().peekService(Layer0.class);
3204 if(b == null) return;
3206 builtinValues = new HashMap<Resource, Class<?>>();
3208 builtinValues.put(b.String, String.class);
3209 builtinValues.put(b.Double, Double.class);
3210 builtinValues.put(b.Float, Float.class);
3211 builtinValues.put(b.Long, Long.class);
3212 builtinValues.put(b.Integer, Integer.class);
3213 builtinValues.put(b.Byte, Byte.class);
3214 builtinValues.put(b.Boolean, Boolean.class);
3216 builtinValues.put(b.StringArray, String[].class);
3217 builtinValues.put(b.DoubleArray, double[].class);
3218 builtinValues.put(b.FloatArray, float[].class);
3219 builtinValues.put(b.LongArray, long[].class);
3220 builtinValues.put(b.IntegerArray, int[].class);
3221 builtinValues.put(b.ByteArray, byte[].class);
3222 builtinValues.put(b.BooleanArray, boolean[].class);
3226 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
3228 // if (null == provider2) {
3229 // this.processor = null;
3233 // this.processor = provider2;
3234 // support = provider2.getCore();
3235 // initBuiltinValues();
3239 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
3240 // return new ReadGraphSupportImpl(impl.processor);
3244 final public Session getSession() {
3248 final public ResourceSupport getResourceSupport() {
3249 return resourceSupport;
3253 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3255 assert(subject != null);
3256 assert(procedure != null);
3258 final ListenerBase listener = getListenerBase(procedure);
3260 IntProcedure ip = new IntProcedure() {
3262 AtomicBoolean first = new AtomicBoolean(true);
3265 public void execute(ReadGraphImpl graph, int i) {
3268 procedure.execute(graph, querySupport.getResource(i));
3270 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
3272 } catch (Throwable t2) {
3273 Logger.defaultLogError(t2);
3278 public void finished(ReadGraphImpl graph) {
3280 if(first.compareAndSet(true, false)) {
3281 procedure.finished(graph);
3282 // impl.state.barrier.dec(this);
3284 procedure.finished(impl.newRestart(graph));
3287 } catch (Throwable t2) {
3288 Logger.defaultLogError(t2);
3293 public void exception(ReadGraphImpl graph, Throwable t) {
3295 if(first.compareAndSet(true, false)) {
3296 procedure.exception(graph, t);
3297 // impl.state.barrier.dec(this);
3299 procedure.exception(impl.newRestart(graph), t);
3301 } catch (Throwable t2) {
3302 Logger.defaultLogError(t2);
3308 int sId = querySupport.getId(subject);
3310 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Predicates#" + sId);
3311 // else impl.state.barrier.inc(null, null);
3313 Predicates.queryEach(impl, sId, this, impl.parent, listener, ip);
3318 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3320 assert(subject != null);
3321 assert(procedure != null);
3323 final ListenerBase listener = getListenerBase(procedure);
3325 // impl.state.barrier.inc();
3327 Predicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
3330 public void execute(ReadGraphImpl graph, int i) {
3332 procedure.execute(querySupport.getResource(i));
3333 } catch (Throwable t2) {
3334 Logger.defaultLogError(t2);
3339 public void finished(ReadGraphImpl graph) {
3341 procedure.finished();
3342 } catch (Throwable t2) {
3343 Logger.defaultLogError(t2);
3345 // impl.state.barrier.dec();
3349 public void exception(ReadGraphImpl graph, Throwable t) {
3351 procedure.exception(t);
3352 } catch (Throwable t2) {
3353 Logger.defaultLogError(t2);
3355 // impl.state.barrier.dec();
3363 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3365 assert(subject != null);
3367 return Predicates.queryEach2(impl, querySupport.getId(subject), this, impl.parent);
3373 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
3374 final Resource predicate, final MultiProcedure<Statement> procedure) {
3376 assert(subject != null);
3377 assert(predicate != null);
3378 assert(procedure != null);
3380 final ListenerBase listener = getListenerBase(procedure);
3382 // impl.state.barrier.inc();
3384 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
3387 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3389 procedure.execute(querySupport.getStatement(s, p, o));
3390 } catch (Throwable t2) {
3391 Logger.defaultLogError(t2);
3396 public void finished(ReadGraphImpl graph) {
3398 procedure.finished();
3399 } catch (Throwable t2) {
3400 Logger.defaultLogError(t2);
3402 // impl.state.barrier.dec();
3406 public void exception(ReadGraphImpl graph, Throwable t) {
3408 procedure.exception(t);
3409 } catch (Throwable t2) {
3410 Logger.defaultLogError(t2);
3412 // impl.state.barrier.dec();
3420 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
3421 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
3423 assert(subject != null);
3424 assert(predicate != null);
3425 assert(procedure != null);
3427 final ListenerBase listener = getListenerBase(procedure);
3429 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
3431 boolean first = true;
3434 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3437 procedure.execute(graph, querySupport.getStatement(s, p, o));
3439 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
3441 } catch (Throwable t2) {
3442 Logger.defaultLogError(t2);
3447 public void finished(ReadGraphImpl graph) {
3452 procedure.finished(graph);
3453 // impl.state.barrier.dec(this);
3455 procedure.finished(impl.newRestart(graph));
3457 } catch (Throwable t2) {
3458 Logger.defaultLogError(t2);
3464 public void exception(ReadGraphImpl graph, Throwable t) {
3469 procedure.exception(graph, t);
3470 // impl.state.barrier.dec(this);
3472 procedure.exception(impl.newRestart(graph), t);
3474 } catch (Throwable t2) {
3475 Logger.defaultLogError(t2);
3482 int sId = querySupport.getId(subject);
3483 int pId = querySupport.getId(predicate);
3485 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
3486 // else impl.state.barrier.inc(null, null);
3488 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
3493 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
3494 final Resource predicate, final StatementProcedure procedure) {
3496 assert(subject != null);
3497 assert(predicate != null);
3498 assert(procedure != null);
3500 final ListenerBase listener = getListenerBase(procedure);
3502 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
3504 boolean first = true;
3507 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3510 procedure.execute(graph, s, p, o);
3512 procedure.execute(impl.newRestart(graph), s, p, o);
3514 } catch (Throwable t2) {
3515 Logger.defaultLogError(t2);
3520 public void finished(ReadGraphImpl graph) {
3525 procedure.finished(graph);
3526 // impl.state.barrier.dec(this);
3528 procedure.finished(impl.newRestart(graph));
3530 } catch (Throwable t2) {
3531 Logger.defaultLogError(t2);
3537 public void exception(ReadGraphImpl graph, Throwable t) {
3542 procedure.exception(graph, t);
3543 // impl.state.barrier.dec(this);
3545 procedure.exception(impl.newRestart(graph), t);
3547 } catch (Throwable t2) {
3548 Logger.defaultLogError(t2);
3555 int sId = querySupport.getId(subject);
3556 int pId = querySupport.getId(predicate);
3558 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
3559 // else impl.state.barrier.inc(null, null);
3561 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
3566 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3568 assert(subject != null);
3569 assert(predicate != null);
3570 assert(procedure != null);
3572 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3574 private Set<Statement> current = null;
3575 private Set<Statement> run = new HashSet<Statement>();
3578 public void execute(AsyncReadGraph graph, Statement result) {
3580 boolean found = false;
3582 if(current != null) {
3584 found = current.remove(result);
3588 if(!found) procedure.add(graph, result);
3595 public void finished(AsyncReadGraph graph) {
3597 if(current != null) {
3598 for(Statement r : current) procedure.remove(graph, r);
3603 run = new HashSet<Statement>();
3608 public void exception(AsyncReadGraph graph, Throwable t) {
3609 procedure.exception(graph, t);
3613 public boolean isDisposed() {
3614 return procedure.isDisposed();
3622 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
3623 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
3625 assert(subject != null);
3626 assert(predicate != null);
3627 assert(procedure != null);
3629 final ListenerBase listener = getListenerBase(procedure);
3631 // impl.state.barrier.inc();
3633 AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
3636 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3638 procedure.execute(graph, querySupport.getStatement(s, p, o));
3639 } catch (Throwable t2) {
3640 Logger.defaultLogError(t2);
3645 public void finished(ReadGraphImpl graph) {
3647 procedure.finished(graph);
3648 } catch (Throwable t2) {
3649 Logger.defaultLogError(t2);
3651 // impl.state.barrier.dec();
3655 public void exception(ReadGraphImpl graph, Throwable t) {
3657 procedure.exception(graph, t);
3658 } catch (Throwable t2) {
3659 Logger.defaultLogError(t2);
3661 // impl.state.barrier.dec();
3668 private static ListenerBase getListenerBase(Object procedure) {
3669 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
3674 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
3676 assert(subject != null);
3677 assert(predicate != null);
3678 assert(procedure != null);
3680 final ListenerBase listener = getListenerBase(procedure);
3682 // impl.state.barrier.inc();
3684 Objects.runner(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
3687 public void execute(ReadGraphImpl graph, int i) {
3689 procedure.execute(querySupport.getResource(i));
3690 } catch (Throwable t2) {
3691 Logger.defaultLogError(t2);
3696 public void finished(ReadGraphImpl graph) {
3698 procedure.finished();
3699 } catch (Throwable t2) {
3700 Logger.defaultLogError(t2);
3702 // impl.state.barrier.dec();
3706 public void exception(ReadGraphImpl graph, Throwable t) {
3707 System.out.println("forEachObject exception " + t);
3709 procedure.exception(t);
3710 } catch (Throwable t2) {
3711 Logger.defaultLogError(t2);
3713 // impl.state.barrier.dec();
3722 // final public void forEachDirectObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3724 // assert(subject != null);
3725 // assert(predicate != null);
3726 // assert(procedure != null);
3728 // final ListenerBase listener = getListenerBase(procedure);
3730 // int sId = querySupport.getId(subject);
3731 // int pId = querySupport.getId(predicate);
3733 // MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, support);
3735 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectObjects" + sId + "#" + pId);
3736 // else impl.state.barrier.inc(null, null);
3738 // // final Exception caller = new Exception();
3740 // // final Pair<Exception, Exception> exceptions = Pair.make(callerException, new Exception());
3742 // DirectObjects.queryEach(impl, sId, pId, processor, impl.parent, listener, proc);
3747 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3749 assert(subject != null);
3750 assert(procedure != null);
3752 final ListenerBase listener = getListenerBase(procedure);
3754 MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);
3756 int sId = querySupport.getId(subject);
3758 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectPredicates" + sId);
3759 // else impl.state.barrier.inc(null, null);
3761 DirectPredicates.queryEach(impl, sId, this, impl.parent, listener, proc);
3766 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {
3768 assert(subject != null);
3769 assert(procedure != null);
3771 final ListenerBase listener = getListenerBase(procedure);
3773 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3778 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
3780 assert(subject != null);
3781 assert(procedure != null);
3783 final ListenerBase listener = getListenerBase(procedure);
3785 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
3789 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
3792 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
3794 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
3796 private Resource single = null;
3799 public synchronized void execute(AsyncReadGraph graph, Resource result) {
3800 if(single == null) {
3803 single = INVALID_RESOURCE;
3808 public synchronized void finished(AsyncReadGraph graph) {
3809 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
3810 else procedure.execute(graph, single);
3814 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
3815 procedure.exception(graph, throwable);
3822 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
3824 final int sId = querySupport.getId(subject);
3825 final int pId = querySupport.getId(predicate);
3827 Objects.runner(impl, sId, pId, impl.parent, listener, procedure);
3831 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
3833 final int sId = querySupport.getId(subject);
3834 final int pId = querySupport.getId(predicate);
3836 return Objects.runner2(impl, sId, pId, impl.parent);
3840 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3842 assert(subject != null);
3843 assert(predicate != null);
3845 final ListenerBase listener = getListenerBase(procedure);
3847 if(impl.parent != null || listener != null) {
3849 IntProcedure ip = new IntProcedure() {
3851 AtomicBoolean first = new AtomicBoolean(true);
3854 public void execute(ReadGraphImpl graph, int i) {
3857 procedure.execute(impl, querySupport.getResource(i));
3859 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
3861 } catch (Throwable t2) {
3862 Logger.defaultLogError(t2);
3868 public void finished(ReadGraphImpl graph) {
3870 if(first.compareAndSet(true, false)) {
3871 procedure.finished(impl);
3872 // impl.state.barrier.dec(this);
3874 procedure.finished(impl.newRestart(graph));
3876 } catch (Throwable t2) {
3877 Logger.defaultLogError(t2);
3882 public void exception(ReadGraphImpl graph, Throwable t) {
3884 procedure.exception(graph, t);
3885 } catch (Throwable t2) {
3886 Logger.defaultLogError(t2);
3888 // impl.state.barrier.dec(this);
3892 public String toString() {
3893 return "forEachObject with " + procedure;
3898 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
3899 // else impl.state.barrier.inc(null, null);
3901 forEachObject(impl, subject, predicate, listener, ip);
3905 IntProcedure ip = new IntProcedure() {
3908 public void execute(ReadGraphImpl graph, int i) {
3909 procedure.execute(graph, querySupport.getResource(i));
3913 public void finished(ReadGraphImpl graph) {
3914 procedure.finished(graph);
3918 public void exception(ReadGraphImpl graph, Throwable t) {
3919 procedure.exception(graph, t);
3923 public String toString() {
3924 return "forEachObject with " + procedure;
3929 forEachObject(impl, subject, predicate, listener, ip);
3936 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3938 assert(subject != null);
3939 assert(predicate != null);
3940 assert(procedure != null);
3942 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3944 private Set<Resource> current = null;
3945 private Set<Resource> run = new HashSet<Resource>();
3948 public void execute(AsyncReadGraph graph, Resource result) {
3950 boolean found = false;
3952 if(current != null) {
3954 found = current.remove(result);
3958 if(!found) procedure.add(graph, result);
3965 public void finished(AsyncReadGraph graph) {
3967 if(current != null) {
3968 for(Resource r : current) procedure.remove(graph, r);
3973 run = new HashSet<Resource>();
3978 public boolean isDisposed() {
3979 return procedure.isDisposed();
3983 public void exception(AsyncReadGraph graph, Throwable t) {
3984 procedure.exception(graph, t);
3988 public String toString() {
3989 return "forObjectSet " + procedure;
3997 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3999 assert(subject != null);
4000 assert(procedure != null);
4002 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
4004 private Set<Resource> current = null;
4005 private Set<Resource> run = new HashSet<Resource>();
4008 public void execute(AsyncReadGraph graph, Resource result) {
4010 boolean found = false;
4012 if(current != null) {
4014 found = current.remove(result);
4018 if(!found) procedure.add(graph, result);
4025 public void finished(AsyncReadGraph graph) {
4027 if(current != null) {
4028 for(Resource r : current) procedure.remove(graph, r);
4033 run = new HashSet<Resource>();
4038 public boolean isDisposed() {
4039 return procedure.isDisposed();
4043 public void exception(AsyncReadGraph graph, Throwable t) {
4044 procedure.exception(graph, t);
4048 public String toString() {
4049 return "forPredicateSet " + procedure;
4057 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
4059 assert(subject != null);
4060 assert(procedure != null);
4062 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
4064 private Set<Resource> current = null;
4065 private Set<Resource> run = new HashSet<Resource>();
4068 public void execute(AsyncReadGraph graph, Resource result) {
4070 boolean found = false;
4072 if(current != null) {
4074 found = current.remove(result);
4078 if(!found) procedure.add(graph, result);
4085 public void finished(AsyncReadGraph graph) {
4087 if(current != null) {
4088 for(Resource r : current) procedure.remove(graph, r);
4093 run = new HashSet<Resource>();
4098 public boolean isDisposed() {
4099 return procedure.isDisposed();
4103 public void exception(AsyncReadGraph graph, Throwable t) {
4104 procedure.exception(graph, t);
4108 public String toString() {
4109 return "forPrincipalTypeSet " + procedure;
4117 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
4119 assert(subject != null);
4120 assert(predicate != null);
4121 assert(procedure != null);
4123 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
4125 private Set<Resource> current = null;
4126 private Set<Resource> run = new HashSet<Resource>();
4129 public void execute(AsyncReadGraph graph, Resource result) {
4131 boolean found = false;
4133 if(current != null) {
4135 found = current.remove(result);
4139 if(!found) procedure.add(graph, result);
4146 public void finished(AsyncReadGraph graph) {
4148 if(current != null) {
4149 for(Resource r : current) procedure.remove(graph, r);
4154 run = new HashSet<Resource>();
4159 public boolean isDisposed() {
4160 return procedure.isDisposed();
4164 public void exception(AsyncReadGraph graph, Throwable t) {
4165 procedure.exception(graph, t);
4169 public String toString() {
4170 return "forObjectSet " + procedure;
4178 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
4180 assert(subject != null);
4181 assert(predicate != null);
4182 assert(procedure != null);
4184 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
4186 private Set<Statement> current = null;
4187 private Set<Statement> run = new HashSet<Statement>();
4190 public void execute(AsyncReadGraph graph, Statement result) {
4192 boolean found = false;
4194 if(current != null) {
4196 found = current.remove(result);
4200 if(!found) procedure.add(graph, result);
4207 public void finished(AsyncReadGraph graph) {
4209 if(current != null) {
4210 for(Statement s : current) procedure.remove(graph, s);
4215 run = new HashSet<Statement>();
4220 public boolean isDisposed() {
4221 return procedure.isDisposed();
4225 public void exception(AsyncReadGraph graph, Throwable t) {
4226 procedure.exception(graph, t);
4230 public String toString() {
4231 return "forStatementSet " + procedure;
4239 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
4240 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
4242 assert(subject != null);
4243 assert(predicate != null);
4244 assert(procedure != null);
4246 final ListenerBase listener = getListenerBase(procedure);
4248 // impl.state.barrier.inc();
4250 AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedure() {
4253 public void execute(ReadGraphImpl graph, int s, int p, int o) {
4255 procedure.execute(graph, querySupport.getResource(o));
4256 } catch (Throwable t2) {
4257 Logger.defaultLogError(t2);
4262 public void finished(ReadGraphImpl graph) {
4264 procedure.finished(graph);
4265 } catch (Throwable t2) {
4266 Logger.defaultLogError(t2);
4268 // impl.state.barrier.dec();
4272 public void exception(ReadGraphImpl graph, Throwable t) {
4274 procedure.exception(graph, t);
4275 } catch (Throwable t2) {
4276 Logger.defaultLogError(t2);
4278 // impl.state.barrier.dec();
4286 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4288 assert(subject != null);
4289 assert(procedure != null);
4291 final ListenerBase listener = getListenerBase(procedure);
4293 IntProcedure ip = new IntProcedure() {
4296 public void execute(ReadGraphImpl graph, int i) {
4298 procedure.execute(graph, querySupport.getResource(i));
4299 } catch (Throwable t2) {
4300 Logger.defaultLogError(t2);
4305 public void finished(ReadGraphImpl graph) {
4307 procedure.finished(graph);
4308 } catch (Throwable t2) {
4309 Logger.defaultLogError(t2);
4311 // impl.state.barrier.dec(this);
4315 public void exception(ReadGraphImpl graph, Throwable t) {
4317 procedure.exception(graph, t);
4318 } catch (Throwable t2) {
4319 Logger.defaultLogError(t2);
4321 // impl.state.barrier.dec(this);
4326 int sId = querySupport.getId(subject);
4328 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
4329 // else impl.state.barrier.inc(null, null);
4331 PrincipalTypes.queryEach(impl, sId, this, impl.parent, listener, ip);
4336 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
4338 assert(subject != null);
4339 assert(procedure != null);
4341 final ListenerBase listener = getListenerBase(procedure);
4343 // impl.state.barrier.inc();
4345 PrincipalTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
4348 public void execute(ReadGraphImpl graph, int i) {
4350 procedure.execute(querySupport.getResource(i));
4351 } catch (Throwable t2) {
4352 Logger.defaultLogError(t2);
4357 public void finished(ReadGraphImpl graph) {
4359 procedure.finished();
4360 } catch (Throwable t2) {
4361 Logger.defaultLogError(t2);
4363 // impl.state.barrier.dec();
4367 public void exception(ReadGraphImpl graph, Throwable t) {
4369 procedure.exception(t);
4370 } catch (Throwable t2) {
4371 Logger.defaultLogError(t2);
4373 // impl.state.barrier.dec();
4380 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
4382 assert(subject != null);
4383 assert(procedure != null);
4385 final ListenerBase listener = getListenerBase(procedure);
4387 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
4389 AtomicBoolean first = new AtomicBoolean(true);
4392 public void execute(final ReadGraphImpl graph, IntSet set) {
4394 if(first.compareAndSet(true, false)) {
4395 procedure.execute(graph, set);
4396 // impl.state.barrier.dec(this);
4398 procedure.execute(impl.newRestart(graph), set);
4400 } catch (Throwable t2) {
4401 Logger.defaultLogError(t2);
4406 public void exception(ReadGraphImpl graph, Throwable t) {
4408 if(first.compareAndSet(true, false)) {
4409 procedure.exception(graph, t);
4410 // impl.state.barrier.dec(this);
4412 procedure.exception(impl.newRestart(graph), t);
4414 } catch (Throwable t2) {
4415 Logger.defaultLogError(t2);
4421 int sId = querySupport.getId(subject);
4423 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Types" + sId);
4424 // else impl.state.barrier.inc(null, null);
4426 Types.queryEach(impl, sId, this, impl.parent, listener, ip);
4431 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
4433 assert(subject != null);
4435 return Types.queryEach2(impl, querySupport.getId(subject), this, impl.parent);
4440 final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {
4442 assert(subject != null);
4443 assert(procedure != null);
4445 final ListenerBase listener = getListenerBase(procedure);
4447 // impl.state.barrier.inc();
4449 RelationInfoQuery.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<RelationInfo>() {
4451 AtomicBoolean first = new AtomicBoolean(true);
4454 public void execute(final ReadGraphImpl graph, RelationInfo set) {
4456 if(first.compareAndSet(true, false)) {
4457 procedure.execute(graph, set);
4458 // impl.state.barrier.dec();
4460 procedure.execute(impl.newRestart(graph), set);
4462 } catch (Throwable t2) {
4463 Logger.defaultLogError(t2);
4468 public void exception(ReadGraphImpl graph, Throwable t) {
4470 if(first.compareAndSet(true, false)) {
4471 procedure.exception(graph, t);
4472 // impl.state.barrier.dec("ReadGraphSupportImpl.1353");
4474 procedure.exception(impl.newRestart(graph), t);
4476 } catch (Throwable t2) {
4477 Logger.defaultLogError(t2);
4486 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
4488 assert(subject != null);
4489 assert(procedure != null);
4491 final ListenerBase listener = getListenerBase(procedure);
4493 // impl.state.barrier.inc();
4495 SuperTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<IntSet>() {
4497 AtomicBoolean first = new AtomicBoolean(true);
4500 public void execute(final ReadGraphImpl graph, IntSet set) {
4501 // final HashSet<Resource> result = new HashSet<Resource>();
4502 // set.forEach(new TIntProcedure() {
4505 // public boolean execute(int type) {
4506 // result.add(querySupport.getResource(type));
4512 if(first.compareAndSet(true, false)) {
4513 procedure.execute(graph, set);
4514 // impl.state.barrier.dec();
4516 procedure.execute(impl.newRestart(graph), set);
4518 } catch (Throwable t2) {
4519 Logger.defaultLogError(t2);
4524 public void exception(ReadGraphImpl graph, Throwable t) {
4526 if(first.compareAndSet(true, false)) {
4527 procedure.exception(graph, t);
4528 // impl.state.barrier.dec();
4530 procedure.exception(impl.newRestart(graph), t);
4532 } catch (Throwable t2) {
4533 Logger.defaultLogError(t2);
4542 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4544 assert(subject != null);
4545 assert(procedure != null);
4547 final ListenerBase listener = getListenerBase(procedure);
4549 IntProcedure ip = new IntProcedureAdapter() {
4552 public void execute(final ReadGraphImpl graph, int superRelation) {
4554 procedure.execute(graph, querySupport.getResource(superRelation));
4555 } catch (Throwable t2) {
4556 Logger.defaultLogError(t2);
4561 public void finished(final ReadGraphImpl graph) {
4563 procedure.finished(graph);
4564 } catch (Throwable t2) {
4565 Logger.defaultLogError(t2);
4567 // impl.state.barrier.dec(this);
4572 public void exception(ReadGraphImpl graph, Throwable t) {
4574 procedure.exception(graph, t);
4575 } catch (Throwable t2) {
4576 Logger.defaultLogError(t2);
4578 // impl.state.barrier.dec(this);
4583 int sId = querySupport.getId(subject);
4585 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
4586 // else impl.state.barrier.inc(null, null);
4588 DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
4593 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
4595 assert(subject != null);
4596 assert(procedure != null);
4598 final ListenerBase listener = getListenerBase(procedure);
4600 // impl.state.barrier.inc();
4602 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
4607 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
4609 assert(subject != null);
4610 assert(procedure != null);
4612 final ListenerBase listener = getListenerBase(procedure);
4614 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
4617 public void execute(final ReadGraphImpl graph, IntSet set) {
4618 // final HashSet<Resource> result = new HashSet<Resource>();
4619 // set.forEach(new TIntProcedure() {
4622 // public boolean execute(int type) {
4623 // result.add(querySupport.getResource(type));
4629 procedure.execute(graph, set);
4630 } catch (Throwable t2) {
4631 Logger.defaultLogError(t2);
4633 // impl.state.barrier.dec(this);
4637 public void exception(ReadGraphImpl graph, Throwable t) {
4639 procedure.exception(graph, t);
4640 } catch (Throwable t2) {
4641 Logger.defaultLogError(t2);
4643 // impl.state.barrier.dec(this);
4648 int sId = querySupport.getId(subject);
4650 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
4651 // else impl.state.barrier.inc(null, null);
4653 SuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
4657 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
4659 int sId = querySupport.getId(subject);
4660 return ValueQuery.queryEach(impl, sId, impl.parent);
4664 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
4666 return ValueQuery.queryEach(impl, subject, impl.parent);
4671 final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
4673 assert(subject != null);
4675 int sId = querySupport.getId(subject);
4677 if(procedure != null) {
4679 final ListenerBase listener = getListenerBase(procedure);
4681 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
4683 AtomicBoolean first = new AtomicBoolean(true);
4686 public void execute(ReadGraphImpl graph, byte[] result) {
4688 if(first.compareAndSet(true, false)) {
4689 procedure.execute(graph, result);
4690 // impl.state.barrier.dec(this);
4692 procedure.execute(impl.newRestart(graph), result);
4694 } catch (Throwable t2) {
4695 Logger.defaultLogError(t2);
4700 public void exception(ReadGraphImpl graph, Throwable t) {
4702 if(first.compareAndSet(true, false)) {
4703 procedure.exception(graph, t);
4704 // impl.state.barrier.dec(this);
4706 procedure.exception(impl.newRestart(graph), t);
4708 } catch (Throwable t2) {
4709 Logger.defaultLogError(t2);
4715 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
4716 // else impl.state.barrier.inc(null, null);
4718 return ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
4722 return ValueQuery.queryEach(impl, sId, impl.parent, null, null);
4729 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
4731 assert(subject != null);
4732 assert(procedure != null);
4734 final ListenerBase listener = getListenerBase(procedure);
4736 if(impl.parent != null || listener != null) {
4738 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
4740 AtomicBoolean first = new AtomicBoolean(true);
4743 public void execute(ReadGraphImpl graph, byte[] result) {
4745 if(first.compareAndSet(true, false)) {
4746 procedure.execute(graph, result);
4747 // impl.state.barrier.dec(this);
4749 procedure.execute(impl.newRestart(graph), result);
4751 } catch (Throwable t2) {
4752 Logger.defaultLogError(t2);
4757 public void exception(ReadGraphImpl graph, Throwable t) {
4759 if(first.compareAndSet(true, false)) {
4760 procedure.exception(graph, t);
4761 // impl.state.barrier.dec(this);
4763 procedure.exception(impl.newRestart(graph), t);
4765 } catch (Throwable t2) {
4766 Logger.defaultLogError(t2);
4772 int sId = querySupport.getId(subject);
4774 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
4775 // else impl.state.barrier.inc(null, null);
4777 ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
4781 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
4784 public void execute(ReadGraphImpl graph, byte[] result) {
4786 procedure.execute(graph, result);
4791 public void exception(ReadGraphImpl graph, Throwable t) {
4793 procedure.exception(graph, t);
4799 int sId = querySupport.getId(subject);
4801 ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
4808 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
4810 assert(relation != null);
4811 assert(procedure != null);
4813 final ListenerBase listener = getListenerBase(procedure);
4815 IntProcedure ip = new IntProcedure() {
4817 private int result = 0;
4819 final AtomicBoolean found = new AtomicBoolean(false);
4820 final AtomicBoolean done = new AtomicBoolean(false);
4823 public void finished(ReadGraphImpl graph) {
4825 // Shall fire exactly once!
4826 if(done.compareAndSet(false, true)) {
4829 procedure.exception(graph, new NoInverseException(""));
4830 // impl.state.barrier.dec(this);
4832 procedure.execute(graph, querySupport.getResource(result));
4833 // impl.state.barrier.dec(this);
4835 } catch (Throwable t) {
4836 Logger.defaultLogError(t);
4843 public void execute(ReadGraphImpl graph, int i) {
4845 if(found.compareAndSet(false, true)) {
4848 // Shall fire exactly once!
4849 if(done.compareAndSet(false, true)) {
4851 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
4852 // impl.state.barrier.dec(this);
4853 } catch (Throwable t) {
4854 Logger.defaultLogError(t);
4862 public void exception(ReadGraphImpl graph, Throwable t) {
4863 // Shall fire exactly once!
4864 if(done.compareAndSet(false, true)) {
4866 procedure.exception(graph, t);
4867 // impl.state.barrier.dec(this);
4868 } catch (Throwable t2) {
4869 Logger.defaultLogError(t2);
4876 int sId = querySupport.getId(relation);
4878 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
4879 // else impl.state.barrier.inc(null, null);
4881 Objects.runner(impl, sId, getInverseOf(), impl.parent, listener, ip);
4886 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
4889 assert(procedure != null);
4891 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
4894 public void execute(ReadGraphImpl graph, Integer result) {
4896 procedure.execute(graph, querySupport.getResource(result));
4897 } catch (Throwable t2) {
4898 Logger.defaultLogError(t2);
4900 // impl.state.barrier.dec(this);
4904 public void exception(ReadGraphImpl graph, Throwable t) {
4907 procedure.exception(graph, t);
4908 } catch (Throwable t2) {
4909 Logger.defaultLogError(t2);
4911 // impl.state.barrier.dec(this);
4916 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
4917 // else impl.state.barrier.inc(null, null);
4919 forResource(impl, id, impl.parent, ip);
4924 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
4927 assert(procedure != null);
4929 // impl.state.barrier.inc();
4931 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
4934 public void execute(ReadGraphImpl graph, Integer result) {
4936 procedure.execute(graph, querySupport.getResource(result));
4937 } catch (Throwable t2) {
4938 Logger.defaultLogError(t2);
4940 // impl.state.barrier.dec();
4944 public void exception(ReadGraphImpl graph, Throwable t) {
4946 procedure.exception(graph, t);
4947 } catch (Throwable t2) {
4948 Logger.defaultLogError(t2);
4950 // impl.state.barrier.dec();
4958 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4960 assert(subject != null);
4961 assert(procedure != null);
4963 final ListenerBase listener = getListenerBase(procedure);
4965 // impl.state.barrier.inc();
4967 DirectPredicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
4969 boolean found = false;
4972 public void execute(ReadGraphImpl graph, int object) {
4977 public void finished(ReadGraphImpl graph) {
4979 procedure.execute(graph, found);
4980 } catch (Throwable t2) {
4981 Logger.defaultLogError(t2);
4983 // impl.state.barrier.dec();
4987 public void exception(ReadGraphImpl graph, Throwable t) {
4989 procedure.exception(graph, t);
4990 } catch (Throwable t2) {
4991 Logger.defaultLogError(t2);
4993 // impl.state.barrier.dec();
5001 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
5003 assert(subject != null);
5004 assert(predicate != null);
5005 assert(procedure != null);
5007 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
5009 boolean found = false;
5012 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
5017 synchronized public void finished(AsyncReadGraph graph) {
5019 procedure.execute(graph, found);
5020 } catch (Throwable t2) {
5021 Logger.defaultLogError(t2);
5023 // impl.state.barrier.dec(this);
5027 public void exception(AsyncReadGraph graph, Throwable t) {
5029 procedure.exception(graph, t);
5030 } catch (Throwable t2) {
5031 Logger.defaultLogError(t2);
5033 // impl.state.barrier.dec(this);
5038 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
5039 // else impl.state.barrier.inc(null, null);
5041 forEachObject(impl, subject, predicate, ip);
5046 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
5048 assert(subject != null);
5049 assert(predicate != null);
5050 assert(procedure != null);
5052 // impl.state.barrier.inc();
5054 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
5056 boolean found = false;
5059 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
5060 if(resource.equals(object)) found = true;
5064 synchronized public void finished(AsyncReadGraph graph) {
5066 procedure.execute(graph, found);
5067 } catch (Throwable t2) {
5068 Logger.defaultLogError(t2);
5070 // impl.state.barrier.dec();
5074 public void exception(AsyncReadGraph graph, Throwable t) {
5076 procedure.exception(graph, t);
5077 } catch (Throwable t2) {
5078 Logger.defaultLogError(t2);
5080 // impl.state.barrier.dec();
5088 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
5090 assert(subject != null);
5091 assert(procedure != null);
5093 final ListenerBase listener = getListenerBase(procedure);
5095 // impl.state.barrier.inc();
5097 ValueQuery.queryEach(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
5100 public void execute(ReadGraphImpl graph, byte[] object) {
5101 boolean result = object != null;
5103 procedure.execute(graph, result);
5104 } catch (Throwable t2) {
5105 Logger.defaultLogError(t2);
5107 // impl.state.barrier.dec();
5111 public void exception(ReadGraphImpl graph, Throwable t) {
5113 procedure.exception(graph, t);
5114 } catch (Throwable t2) {
5115 Logger.defaultLogError(t2);
5117 // impl.state.barrier.dec();
5125 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
5127 assert(subject != null);
5128 assert(procedure != null);
5130 final ListenerBase listener = getListenerBase(procedure);
5132 // impl.state.barrier.inc();
5134 OrderedSet.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
5137 public void exception(ReadGraphImpl graph, Throwable t) {
5139 procedure.exception(graph, t);
5140 } catch (Throwable t2) {
5141 Logger.defaultLogError(t2);
5143 // impl.state.barrier.dec();
5147 public void execute(ReadGraphImpl graph, int i) {
5149 procedure.execute(graph, querySupport.getResource(i));
5150 } catch (Throwable t2) {
5151 Logger.defaultLogError(t2);
5156 public void finished(ReadGraphImpl graph) {
5158 procedure.finished(graph);
5159 } catch (Throwable t2) {
5160 Logger.defaultLogError(t2);
5162 // impl.state.barrier.dec();
5170 final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) {
5172 assert(request != null);
5173 assert(procedure != null);
5175 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(request, "#" + request.toString() + ".1999");
5176 // else impl.state.barrier.inc(null, null);
5178 runAsyncRead(impl, request, parent, listener, procedure);
5183 final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
5185 assert(graph != null);
5186 assert(request != null);
5188 final ReadEntry entry = readMap.get(request);
5189 if(entry != null && entry.isReady()) {
5190 return (T)entry.get(graph, this, null);
5192 return request.perform(graph);
5197 final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
5199 assert(graph != null);
5200 assert(request != null);
5202 final ExternalReadEntry<T> entry = externalReadMap.get(request);
5203 if(entry != null && entry.isReady()) {
5204 if(entry.isExcepted()) {
5205 Throwable t = (Throwable)entry.getResult();
5206 if(t instanceof DatabaseException) throw (DatabaseException)t;
5207 else throw new DatabaseException(t);
5209 return (T)entry.getResult();
5213 final DataContainer<T> result = new DataContainer<T>();
5214 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
5216 request.register(graph, new Listener<T>() {
5219 public void exception(Throwable t) {
5224 public void execute(T t) {
5229 public boolean isDisposed() {
5235 Throwable t = exception.get();
5237 if(t instanceof DatabaseException) throw (DatabaseException)t;
5238 else throw new DatabaseException(t);
5241 return result.get();
5248 final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
5250 assert(graph != null);
5251 assert(request != null);
5253 final AsyncReadEntry entry = asyncReadMap.get(request);
5254 if(entry != null && entry.isReady()) {
5255 if(entry.isExcepted()) {
5256 procedure.exception(graph, (Throwable)entry.getResult());
5258 procedure.execute(graph, (T)entry.getResult());
5261 request.perform(graph, procedure);
5267 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
5269 assert(request != null);
5270 assert(procedure != null);
5272 // impl.state.barrier.inc(null, null);
5274 queryMultiRead(impl, request, parent, listener, procedure);
5279 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
5281 assert(request != null);
5282 assert(procedure != null);
5284 // impl.state.barrier.inc();
5286 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
5288 public void execute(AsyncReadGraph graph, T result) {
5291 procedure.execute(graph, result);
5292 } catch (Throwable t2) {
5293 Logger.defaultLogError(t2);
5298 public void finished(AsyncReadGraph graph) {
5301 procedure.finished(graph);
5302 } catch (Throwable t2) {
5303 Logger.defaultLogError(t2);
5306 // impl.state.barrier.dec();
5311 public String toString() {
5312 return procedure.toString();
5316 public void exception(AsyncReadGraph graph, Throwable t) {
5319 procedure.exception(graph, t);
5320 } catch (Throwable t2) {
5321 Logger.defaultLogError(t2);
5324 // impl.state.barrier.dec();
5333 final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {
5335 assert(request != null);
5336 assert(procedure != null);
5338 queryPrimitiveRead(impl, request, parent, listener, new Procedure<T>() {
5341 public void execute(T result) {
5343 procedure.execute(result);
5344 } catch (Throwable t2) {
5345 Logger.defaultLogError(t2);
5350 public String toString() {
5351 return procedure.toString();
5355 public void exception(Throwable t) {
5357 procedure.exception(t);
5358 } catch (Throwable t2) {
5359 Logger.defaultLogError(t2);
5368 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
5370 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
5375 public VirtualGraph getProvider(Resource subject, Resource predicate) {
5377 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
5382 public VirtualGraph getValueProvider(Resource subject) {
5384 return querySupport.getValueProvider(querySupport.getId(subject));
5388 public boolean resumeTasks(ReadGraphImpl graph) {
5390 return querySupport.resume(graph);
5394 public boolean isImmutable(int resourceId) {
5395 return querySupport.isImmutable(resourceId);
5398 public boolean isImmutable(Resource resource) {
5399 ResourceImpl impl = (ResourceImpl)resource;
5400 return isImmutable(impl.id);
5405 public Layer0 getL0(ReadGraph graph) {
5407 L0 = Layer0.getInstance(graph);