1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
37 import org.simantics.databoard.Bindings;
38 import org.simantics.db.AsyncReadGraph;
39 import org.simantics.db.DevelopmentKeys;
40 import org.simantics.db.DirectStatements;
41 import org.simantics.db.ReadGraph;
42 import org.simantics.db.RelationInfo;
43 import org.simantics.db.Resource;
44 import org.simantics.db.Session;
45 import org.simantics.db.Statement;
46 import org.simantics.db.VirtualGraph;
47 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
48 import org.simantics.db.common.utils.Logger;
49 import org.simantics.db.debug.ListenerReport;
50 import org.simantics.db.exception.DatabaseException;
51 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
52 import org.simantics.db.exception.NoInverseException;
53 import org.simantics.db.exception.ResourceNotFoundException;
54 import org.simantics.db.impl.DebugPolicy;
55 import org.simantics.db.impl.ResourceImpl;
56 import org.simantics.db.impl.graph.MultiIntProcedure;
57 import org.simantics.db.impl.graph.ReadGraphImpl;
58 import org.simantics.db.impl.graph.ReadGraphSupport;
59 import org.simantics.db.impl.graph.WriteGraphImpl;
60 import org.simantics.db.impl.procedure.IntProcedureAdapter;
61 import org.simantics.db.impl.procedure.InternalProcedure;
62 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
63 import org.simantics.db.impl.support.ResourceSupport;
64 import org.simantics.db.procedure.AsyncMultiListener;
65 import org.simantics.db.procedure.AsyncMultiProcedure;
66 import org.simantics.db.procedure.AsyncProcedure;
67 import org.simantics.db.procedure.AsyncSetListener;
68 import org.simantics.db.procedure.Listener;
69 import org.simantics.db.procedure.ListenerBase;
70 import org.simantics.db.procedure.MultiProcedure;
71 import org.simantics.db.procedure.Procedure;
72 import org.simantics.db.procedure.StatementProcedure;
73 import org.simantics.db.request.AsyncMultiRead;
74 import org.simantics.db.request.AsyncRead;
75 import org.simantics.db.request.ExternalRead;
76 import org.simantics.db.request.MultiRead;
77 import org.simantics.db.request.Read;
78 import org.simantics.db.request.RequestFlags;
79 import org.simantics.db.request.WriteTraits;
80 import org.simantics.layer0.Layer0;
81 import org.simantics.utils.DataContainer;
82 import org.simantics.utils.Development;
83 import org.simantics.utils.datastructures.Pair;
84 import org.simantics.utils.datastructures.collections.CollectionUtils;
85 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
87 import gnu.trove.map.hash.THashMap;
88 import gnu.trove.procedure.TIntProcedure;
89 import gnu.trove.procedure.TLongProcedure;
90 import gnu.trove.procedure.TObjectProcedure;
91 import gnu.trove.set.hash.THashSet;
92 import gnu.trove.set.hash.TIntHashSet;
94 @SuppressWarnings({"rawtypes", "unchecked"})
95 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
97 final public UnaryQueryHashMap<IntProcedure> directPredicatesMap;
98 final public UnaryQueryHashMap<IntProcedure> principalTypesMap;
99 final public THashMap<String, URIToResource> uriToResourceMap;
100 final public THashMap<String, NamespaceIndex> namespaceIndexMap22;
101 final public UnaryQueryHashMap<IntProcedure> projectsMap;
102 final public UnaryQueryHashMap<InternalProcedure<RelationInfo>> relationInfoMap;
103 final public UnaryQueryHashMap<InternalProcedure<IntSet>> superTypesMap;
104 final public UnaryQueryHashMap<InternalProcedure<IntSet>> typeHierarchyMap;
105 final public UnaryQueryHashMap<InternalProcedure<IntSet>> superRelationsMap;
106 final public UnaryQueryHashMap<InternalProcedure<IntSet>> typesMap;
107 final public UnaryQueryHashMap<InternalProcedure<byte[]>> valueMap;
108 final public DoubleKeyQueryHashMap<IntProcedure> directObjectsMap;
109 final public DoubleKeyQueryHashMap<IntProcedure> objectsMap;
110 final public UnaryQueryHashMap<IntProcedure> orderedSetMap;
111 final public UnaryQueryHashMap<IntProcedure> predicatesMap;
112 final public DoubleKeyQueryHashMap<TripleIntProcedure> statementsMap;
113 final public UnaryQueryHashMap<IntProcedure> assertedPredicatesMap;
114 final public BinaryQueryHashMap<TripleIntProcedure> assertedStatementsMap;
115 final public StableHashMap<ExternalRead, ExternalReadEntry> externalReadMap;
116 final public StableHashMap<AsyncRead, AsyncReadEntry> asyncReadMap;
117 final public StableHashMap<Read, ReadEntry> readMap;
118 final public StableHashMap<AsyncMultiRead, AsyncMultiReadEntry> asyncMultiReadMap;
119 final public StableHashMap<MultiRead, MultiReadEntry> multiReadMap;
121 final private THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners;
123 public static int indent = 0;
128 // Garbage collection
130 public int boundQueries = 0;
133 private int hits = 0;
135 private int misses = 0;
137 private int updates = 0;
139 final private int functionalRelation;
141 final private int superrelationOf;
143 final private int instanceOf;
145 final private int inverseOf;
147 final private int asserts;
149 final private int hasPredicate;
151 final private int hasPredicateInverse;
153 final private int hasObject;
155 final private int inherits;
157 final private int subrelationOf;
159 final private int rootLibrary;
162 * A cache for the root library resource. Initialized in
163 * {@link #getRootLibraryResource()}.
165 private volatile ResourceImpl rootLibraryResource;
167 final private int library;
169 final private int consistsOf;
171 final private int hasName;
173 AtomicInteger sleepers = new AtomicInteger(0);
175 private boolean updating = false;
177 static public boolean collecting = false;
179 private boolean firingListeners = false;
181 final public QuerySupport querySupport;
182 final public Session session;
183 final public ResourceSupport resourceSupport;
185 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
187 QueryThread[] executors;
189 public ArrayList<SessionTask>[] queues;
193 INIT, RUN, SLEEP, DISPOSED
197 public ThreadState[] threadStates;
198 public ReentrantLock[] threadLocks;
199 public Condition[] threadConditions;
201 public ArrayList<SessionTask>[] ownTasks;
203 public ArrayList<SessionTask>[] ownSyncTasks;
205 ArrayList<SessionTask>[] delayQueues;
207 public boolean synch = true;
209 final Object querySupportLock;
211 public Long modificationCounter = 0L;
213 public void close() {
216 final public void scheduleOwn(int caller, SessionTask request) {
217 ownTasks[caller].add(request);
220 final public void scheduleAlways(int caller, SessionTask request) {
222 int performer = request.thread;
223 if(caller == performer) {
224 ownTasks[caller].add(request);
226 schedule(caller, request);
231 final public void schedule(int caller, SessionTask request) {
233 int performer = request.thread;
235 if(DebugPolicy.SCHEDULE)
236 System.out.println("schedule " + request + " " + caller + " -> " + performer);
238 assert(performer >= 0);
240 assert(request != null);
242 if(caller == performer) {
245 ReentrantLock queueLock = threadLocks[performer];
247 queues[performer].add(request);
248 // This thread could have been sleeping
249 if(queues[performer].size() == 1) {
250 if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
251 threadConditions[performer].signalAll();
261 final public int THREAD_MASK;
262 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
264 public static abstract class SessionTask {
266 final public int thread;
267 final public int syncCaller;
268 final public Object object;
270 public SessionTask(WriteTraits object, int thread) {
271 this.thread = thread;
272 this.syncCaller = -1;
273 this.object = object;
276 public SessionTask(Object object, int thread, int syncCaller) {
277 this.thread = thread;
278 this.syncCaller = syncCaller;
279 this.object = object;
282 public abstract void run(int thread);
285 public String toString() {
286 return "SessionTask[" + object + "]";
291 public static abstract class SessionRead extends SessionTask {
293 final public Semaphore notify;
294 final public DataContainer<Throwable> throwable;
296 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
297 super(object, thread, thread);
298 this.throwable = throwable;
299 this.notify = notify;
302 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
303 super(object, thread, syncThread);
304 this.throwable = throwable;
305 this.notify = notify;
310 long waitingTime = 0;
313 static int koss2 = 0;
315 public boolean resume(ReadGraphImpl graph) {
316 return executors[0].runSynchronized();
319 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
320 throws DatabaseException {
323 THREAD_MASK = threads - 1;
326 session = querySupport.getSession();
327 resourceSupport = querySupport.getSupport();
328 querySupportLock = core.getLock();
330 executors = new QueryThread[THREADS];
331 queues = new ArrayList[THREADS];
332 threadLocks = new ReentrantLock[THREADS];
333 threadConditions = new Condition[THREADS];
334 threadStates = new ThreadState[THREADS];
335 ownTasks = new ArrayList[THREADS];
336 ownSyncTasks = new ArrayList[THREADS];
337 delayQueues = new ArrayList[THREADS * THREADS];
339 // freeSchedule = new AtomicInteger(0);
341 for (int i = 0; i < THREADS * THREADS; i++) {
342 delayQueues[i] = new ArrayList<SessionTask>();
345 for (int i = 0; i < THREADS; i++) {
347 // tasks[i] = new ArrayList<Runnable>();
348 ownTasks[i] = new ArrayList<SessionTask>();
349 ownSyncTasks[i] = new ArrayList<SessionTask>();
350 queues[i] = new ArrayList<SessionTask>();
351 threadLocks[i] = new ReentrantLock();
352 threadConditions[i] = threadLocks[i].newCondition();
353 // limits[i] = false;
354 threadStates[i] = ThreadState.INIT;
358 for (int i = 0; i < THREADS; i++) {
362 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
364 threadSet.add(executors[i]);
368 directPredicatesMap = new UnaryQueryHashMap();
369 valueMap = new UnaryQueryHashMap();
370 principalTypesMap = new UnaryQueryHashMap();
371 uriToResourceMap = new THashMap<String, URIToResource>();
372 namespaceIndexMap22 = new THashMap<String, NamespaceIndex>();
373 projectsMap = new UnaryQueryHashMap();
374 relationInfoMap = new UnaryQueryHashMap();
375 typeHierarchyMap = new UnaryQueryHashMap();
376 superTypesMap = new UnaryQueryHashMap();
377 superRelationsMap = new UnaryQueryHashMap();
378 typesMap = new UnaryQueryHashMap();
379 objectsMap = new DoubleKeyQueryHashMap();
380 orderedSetMap = new UnaryQueryHashMap();
381 predicatesMap = new UnaryQueryHashMap();
382 statementsMap = new DoubleKeyQueryHashMap();
383 directObjectsMap = new DoubleKeyQueryHashMap();
384 assertedPredicatesMap = new UnaryQueryHashMap();
385 assertedStatementsMap = new BinaryQueryHashMap();
386 asyncReadMap = new StableHashMap<AsyncRead, AsyncReadEntry>();
387 readMap = new StableHashMap<Read, ReadEntry>();
388 asyncMultiReadMap = new StableHashMap<AsyncMultiRead, AsyncMultiReadEntry>();
389 multiReadMap = new StableHashMap<MultiRead, MultiReadEntry>();
390 externalReadMap = new StableHashMap<ExternalRead, ExternalReadEntry>();
391 listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
394 for (int i = 0; i < THREADS; i++) {
395 executors[i].start();
398 // Make sure that query threads are up and running
399 while(sleepers.get() != THREADS) {
402 } catch (InterruptedException e) {
407 rootLibrary = core.getBuiltin("http:/");
408 boolean builtinsInstalled = rootLibrary != 0;
410 if (builtinsInstalled) {
411 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
412 assert (functionalRelation != 0);
414 functionalRelation = 0;
416 if (builtinsInstalled) {
417 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
418 assert (instanceOf != 0);
422 if (builtinsInstalled) {
423 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
424 assert (inverseOf != 0);
429 if (builtinsInstalled) {
430 inherits = core.getBuiltin(Layer0.URIs.Inherits);
431 assert (inherits != 0);
435 if (builtinsInstalled) {
436 asserts = core.getBuiltin(Layer0.URIs.Asserts);
437 assert (asserts != 0);
441 if (builtinsInstalled) {
442 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
443 assert (hasPredicate != 0);
447 if (builtinsInstalled) {
448 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
449 assert (hasPredicateInverse != 0);
451 hasPredicateInverse = 0;
453 if (builtinsInstalled) {
454 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
455 assert (hasObject != 0);
459 if (builtinsInstalled) {
460 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
461 assert (subrelationOf != 0);
465 if (builtinsInstalled) {
466 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
467 assert (superrelationOf != 0);
471 if (builtinsInstalled) {
472 library = core.getBuiltin(Layer0.URIs.Library);
473 assert (library != 0);
477 if (builtinsInstalled) {
478 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
479 assert (consistsOf != 0);
483 if (builtinsInstalled) {
484 hasName = core.getBuiltin(Layer0.URIs.HasName);
485 assert (hasName != 0);
491 final public void releaseWrite(ReadGraphImpl graph) {
492 performDirtyUpdates(graph);
493 modificationCounter++;
496 final public int getId(final Resource r) {
497 return querySupport.getId(r);
500 public QuerySupport getCore() {
504 public int getFunctionalRelation() {
505 return functionalRelation;
508 public int getInherits() {
512 public int getInstanceOf() {
516 public int getInverseOf() {
520 public int getSubrelationOf() {
521 return subrelationOf;
524 public int getSuperrelationOf() {
525 return superrelationOf;
528 public int getAsserts() {
532 public int getHasPredicate() {
536 public int getHasPredicateInverse() {
537 return hasPredicateInverse;
540 public int getHasObject() {
544 public int getRootLibrary() {
548 public Resource getRootLibraryResource() {
549 if (rootLibraryResource == null) {
550 // Synchronization is not needed here, it doesn't matter if multiple
551 // threads simultaneously set rootLibraryResource once.
552 int root = getRootLibrary();
554 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
555 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
557 return rootLibraryResource;
560 public int getLibrary() {
564 public int getConsistsOf() {
568 public int getHasName() {
572 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
574 URIToResource.queryEach(graph, id, parent, null, new InternalProcedure<Integer>() {
577 public void execute(ReadGraphImpl graph, Integer result) {
579 if (result != null && result != 0) {
580 procedure.execute(graph, result);
584 // Fall back to using the fixed builtins.
585 result = querySupport.getBuiltin(id);
587 procedure.execute(graph, result);
592 result = querySupport.getRandomAccessReference(id);
593 } catch (ResourceNotFoundException e) {
594 procedure.exception(graph, e);
599 procedure.execute(graph, result);
601 procedure.exception(graph, new ResourceNotFoundException(id));
607 public void exception(ReadGraphImpl graph, Throwable t) {
608 procedure.exception(graph, t);
615 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
617 Integer result = querySupport.getBuiltin(id);
619 procedure.execute(graph, result);
621 procedure.exception(graph, new ResourceNotFoundException(id));
626 public final <T> void runAsyncRead(final ReadGraphImpl graph, final AsyncRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) {
628 int hash = requestHash(query);
630 AsyncReadEntry<T> entry = asyncReadMap.get(query, hash);
632 if(parent == null && listener == null) {
633 if(entry != null && (entry.isReady() || entry.isExcepted())) {
634 System.out.println("ready " + query);
635 entry.performFromCache(graph, this, procedure);
636 // graph.state.barrier.dec(query);
639 query.perform(graph, procedure);
640 // graph.state.barrier.dec(query);
647 entry = new AsyncReadEntry<T>(query);
649 entry.clearResult(querySupport);
650 asyncReadMap.put(query, entry, hash);
652 performForEach(graph, query, entry, parent, listener, procedure, false);
656 if(entry.isPending()) {
657 synchronized(entry) {
658 if(entry.isPending()) {
659 throw new IllegalStateException();
660 // final AsyncBarrierImpl parentBarrier = graph.state.barrier;
661 // if(entry.procs == null) entry.procs = new ArrayList<AsyncProcedure<T>>();
662 // entry.procs.add(new AsyncProcedure<T>() {
665 // public void execute(AsyncReadGraph graph, T result) {
666 // procedure.execute(graph, result);
667 // parentBarrier.dec(query);
671 // public void exception(AsyncReadGraph graph, Throwable throwable) {
672 // procedure.exception(graph, throwable);
673 // parentBarrier.dec(query);
677 // if(graph.parent != null || listener != null) {
678 // registerDependencies(graph, entry, parent, listener, procedure, false);
681 // query.perform(graph, procedure);
689 if(entry.isReady()) {
690 entry.performFromCache(graph, this, procedure);
691 registerDependencies(graph, entry, parent, listener, procedure, false);
693 performForEach(graph, query, entry, parent, listener, procedure, false);
701 final static <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
703 MultiReadEntry entry = cached != null ? cached : provider.multiReadMap.get(query);
706 entry = new MultiReadEntry(query);
708 entry.clearResult(provider.querySupport);
710 provider.multiReadMap.put(query, entry);
712 provider.performForEach(graph, query, entry, parent, listener, procedure, false);
716 if(entry.isPending()) {
718 synchronized(entry) {
720 if(entry.isPending()) {
721 throw new IllegalStateException();
723 // if(entry.procs == null) entry.procs = new ArrayList<Pair<AsyncMultiProcedure<T>, AsyncBarrier>>();
724 // entry.procs.add(new Pair(procedure, parentBarrier));
725 // if(graph.parent != null || listener != null) {
726 // provider.registerDependencies(graph, entry, parent, listener, procedure, false);
729 // If this was synchronized we must wait here until completion
730 // if(graph.state.synchronizedExecution) {
731 // while(entry.isPending()) {
732 // graph.resumeTasks(graph.callerThread, null, null);
741 entry.performFromCache(graph, provider, procedure);
742 // graph.state.barrier.dec(query);
747 provider.performForEach(graph, query, entry, parent, listener, procedure, false);
755 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
757 int hash = requestHash(query);
759 AsyncMultiReadEntry entry = asyncMultiReadMap.get(query, hash);
761 if(parent == null && listener == null) {
762 if(entry != null && (entry.isReady() || entry.isExcepted())) {
763 System.out.println("ready " + query);
764 entry.performFromCache(graph, this, procedure);
767 query.perform(graph, procedure);
774 entry = new AsyncMultiReadEntry<T>(query);
776 entry.clearResult(querySupport);
778 asyncMultiReadMap.put(query, entry, hash);
780 performForEach(graph, query, entry, parent, listener, procedure, false);
784 if(entry.isPending()) {
786 synchronized(entry) {
787 if(entry.isPending()) {
788 throw new IllegalStateException();
789 // if(entry.procs == null) entry.procs = new ArrayList<AsyncMultiProcedure<T>>();
790 // entry.procs.add(procedure);
791 // if(graph.parent != null || listener != null) {
792 // registerDependencies(graph, entry, parent, listener, procedure, false);
799 performForEach(graph, query, entry, parent, listener, procedure, false);
805 final static <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final Procedure<T> procedure) {
807 final ExternalReadEntry<T> entry = cached != null ? cached : provider.externalReadMap.get(query);
809 provider.performForEach(graph, query, new ExternalReadEntry<T>(query), parent, listener, procedure, false);
811 if(entry.isPending()) {
812 synchronized(entry) {
813 if(entry.isPending()) {
814 throw new IllegalStateException();
815 // if(entry.procs == null) entry.procs = new ArrayList<Procedure<T>>();
816 // entry.procs.add(procedure);
821 provider.performForEach(graph, query, entry, parent, listener, procedure, false);
826 public int requestHash(Object object) {
828 return object.hashCode();
829 } catch (Throwable t) {
830 Logger.defaultLogError(t);
836 public <T> T queryRead(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws Throwable {
838 assert(query != null);
840 ReadEntry entry = readMap.get(query);
843 if(parent == null && (listener == null || listener.isDisposed()) && entry.isReady()) {
844 return (T)entry.get(graph, this, procedure);
845 } else if (entry.isPending()) {
846 throw new IllegalStateException();
852 entry = new ReadEntry(query);
854 entry.clearResult(querySupport);
856 readMap.put(query, entry);
858 return (T)performForEach(graph, query, entry, parent, listener, procedure, false);
862 if(entry.isPending()) {
863 throw new IllegalStateException();
865 return (T)performForEach(graph, query, entry, parent, listener, procedure, false);
872 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
874 assert(query != null);
875 assert(procedure != null);
877 final MultiReadEntry entry = multiReadMap.get(query);
879 if(parent == null && !(listener != null)) {
880 if(entry != null && entry.isReady()) {
881 entry.performFromCache(graph, this, procedure);
886 runMultiRead(graph, entry, query, parent, this, listener, procedure);
890 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final Procedure<T> procedure) {
892 assert(query != null);
893 assert(procedure != null);
895 final ExternalReadEntry entry = externalReadMap.get(query);
897 if(parent == null && !(listener != null)) {
898 if(entry != null && entry.isReady()) {
899 entry.performFromCache(procedure);
904 runPrimitiveRead(graph, entry, query, parent, this, listener, procedure);
908 public <T> void performForEach(ReadGraphImpl parentGraph, final AsyncRead<T> query, final AsyncReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final AsyncProcedure<T> procedure,
909 boolean inferredDependency) {
911 if (DebugPolicy.PERFORM)
912 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
915 assert (!collecting);
917 assert(!entry.isDiscarded());
919 final ListenerEntry listenerEntry = registerDependencies(parentGraph, entry, parent, base, procedure, inferredDependency);
921 // FRESH, REFUTED, EXCEPTED go here
922 if (!entry.isReady()) {
930 final ReadGraphImpl finalParentGraph = parentGraph;
932 query.perform(parentGraph.withParent(entry), new AsyncProcedure<T>() {
935 public void execute(AsyncReadGraph returnGraph, T result) {
936 ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
937 //AsyncReadGraph resumeGraph = finalParentGraph.newAsync();
938 entry.addOrSet(finalParentGraph, result);
939 if(listenerEntry != null) {
940 primeListenerEntry(listenerEntry, result);
943 procedure.execute(finalParentGraph, result);
944 } catch (Throwable t) {
947 // parentBarrier.dec(query);
951 public void exception(AsyncReadGraph returnGraph, Throwable t) {
952 ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
953 // AsyncReadGraph resumeGraph = finalParentGraph.newAsync();
954 entry.except(finalParentGraph, t);
956 procedure.exception(finalParentGraph, t);
957 } catch (Throwable t2) {
958 t2.printStackTrace();
960 // parentBarrier.dec(query);
964 public String toString() {
965 return procedure.toString();
970 } catch (Throwable t) {
974 procedure.exception(parentGraph, t);
975 } catch (Throwable t2) {
976 t2.printStackTrace();
978 // parentBarrier.dec(query);
986 entry.performFromCache(parentGraph, this, new AsyncProcedure<T>() {
989 public void exception(AsyncReadGraph graph, Throwable throwable) {
990 procedure.exception(graph, throwable);
994 public void execute(AsyncReadGraph graph, T result) {
995 procedure.execute(graph, result);
996 if(listenerEntry != null) {
997 primeListenerEntry(listenerEntry, result);
1003 // parentBarrier.dec(query);
1009 assert (!entry.isDiscarded());
1013 public <T> T performForEach(final ReadGraphImpl graph, final Read<T> query, final ReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure,
1014 boolean inferredDependency) throws Throwable {
1016 if (DebugPolicy.PERFORM)
1017 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1020 assert (!collecting);
1022 entry.assertNotDiscarded();
1024 if(entry.isReady()) {
1026 // EXCEPTED goes here
1028 // if(procedure != null) entry.performFromCache(graph, this, procedure);
1029 // parentBarrier.dec(query);
1032 ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
1034 T result = (T)entry.get(graph, this, procedure);
1036 if(listenerEntry != null) primeListenerEntry(listenerEntry, result);
1042 // FRESH, REFUTED, PENDING go here
1049 ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
1051 final ReadGraphImpl performGraph = graph.newSync(entry);
1055 if(Development.DEVELOPMENT)
1056 Development.recordHistogram("run " + query);
1058 T result = query.perform(performGraph);
1059 entry.addOrSet(performGraph, result);
1061 if(listenerEntry != null) primeListenerEntry(listenerEntry, result);
1063 return (T)entry.get(graph, this, procedure);
1065 } catch (Throwable t) {
1068 return (T)entry.get(graph, this, procedure);
1076 public <T> void performForEach(final ReadGraphImpl graph, final MultiRead<T> query, final MultiReadEntry<T> entry, CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,
1077 boolean inferredDependency) {
1079 if (DebugPolicy.PERFORM)
1080 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1083 assert (!collecting);
1085 assert(!entry.isPending());
1086 assert(!entry.isDiscarded());
1088 // FRESH, REFUTED, EXCEPTED go here
1089 if (!entry.isReady()) {
1092 entry.clearResult(querySupport);
1094 multiReadMap.put(query, entry);
1097 final ReadGraphImpl newGraph = graph.newSync(entry);
1098 // newGraph.state.barrier.inc();
1102 query.perform(newGraph, new AsyncMultiProcedure<T>() {
1105 public void execute(AsyncReadGraph graph, T result) {
1106 entry.addOrSet(result);
1108 procedure.execute(graph, result);
1109 } catch (Throwable t) {
1110 t.printStackTrace();
1115 public void finished(AsyncReadGraph graph) {
1116 entry.finish(graph);
1118 procedure.finished(graph);
1119 } catch (Throwable t) {
1120 t.printStackTrace();
1122 // newGraph.state.barrier.dec();
1123 // parentBarrier.dec();
1127 public void exception(AsyncReadGraph graph, Throwable t) {
1130 procedure.exception(graph, t);
1131 } catch (Throwable t2) {
1132 t2.printStackTrace();
1134 // newGraph.state.barrier.dec();
1135 // parentBarrier.dec();
1140 } catch (DatabaseException e) {
1144 procedure.exception(graph, e);
1145 } catch (Throwable t2) {
1146 t2.printStackTrace();
1148 // newGraph.state.barrier.dec();
1149 // parentBarrier.dec();
1151 } catch (Throwable t) {
1153 DatabaseException e = new DatabaseException(t);
1157 procedure.exception(graph, e);
1158 } catch (Throwable t2) {
1159 t2.printStackTrace();
1161 // newGraph.state.barrier.dec();
1162 // parentBarrier.dec();
1170 entry.performFromCache(graph, this, procedure);
1176 assert (!entry.isDiscarded());
1178 registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
1183 public <T> void performForEach(final ReadGraphImpl callerGraph, AsyncMultiRead<T> query, final AsyncMultiReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,
1184 boolean inferredDependency) {
1186 if (DebugPolicy.PERFORM)
1187 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1190 assert (!collecting);
1194 assert(!entry.isDiscarded());
1196 // FRESH, REFUTED, EXCEPTED go here
1197 if (!entry.isReady()) {
1203 ReadGraphImpl performGraph = callerGraph.withAsyncParent(entry);
1205 query.perform(performGraph, new AsyncMultiProcedure<T>() {
1208 public void execute(AsyncReadGraph graph, T result) {
1209 ReadGraphImpl impl = (ReadGraphImpl)graph;
1210 // ReadGraphImpl executeGraph = callerGraph.newAsync();
1211 entry.addOrSet(result);
1213 procedure.execute(callerGraph, result);
1214 } catch (Throwable t) {
1215 t.printStackTrace();
1220 public void finished(AsyncReadGraph graph) {
1221 ReadGraphImpl impl = (ReadGraphImpl)graph;
1222 // ReadGraphImpl executeGraph = callerGraph.newAsync();
1223 entry.finish(callerGraph);
1225 procedure.finished(callerGraph);
1226 } catch (Throwable t) {
1227 t.printStackTrace();
1232 public void exception(AsyncReadGraph graph, Throwable t) {
1233 ReadGraphImpl impl = (ReadGraphImpl)graph;
1234 // ReadGraphImpl executeGraph = callerGraph.newAsync();
1235 entry.except(callerGraph, t);
1237 procedure.exception(callerGraph, t);
1238 } catch (Throwable t2) {
1239 t2.printStackTrace();
1245 } catch (Throwable t) {
1249 procedure.exception(callerGraph, t);
1250 } catch (Throwable t2) {
1251 t2.printStackTrace();
1261 entry.performFromCache(callerGraph, this, procedure);
1267 assert (!entry.isDiscarded());
1269 registerDependencies(callerGraph, entry, parent, listener, procedure, inferredDependency);
1271 } catch (Throwable t) {
1273 Logger.defaultLogError(t);
1281 public <T> void performForEach(ReadGraphImpl graph, final ExternalRead<T> query, final ExternalReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final Procedure<T> procedure,
1282 boolean inferredDependency) {
1284 if (DebugPolicy.PERFORM)
1285 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1288 assert (!collecting);
1290 assert(!entry.isPending());
1291 assert(!entry.isDiscarded());
1293 registerDependencies(graph, entry, parent, base, procedure, inferredDependency);
1295 // FRESH, REFUTED, EXCEPTED go here
1296 if (!entry.isReady()) {
1299 entry.clearResult(querySupport);
1301 externalReadMap.put(query, entry);
1306 query.register(graph, new Listener<T>() {
1308 AtomicBoolean used = new AtomicBoolean(false);
1311 public void execute(T result) {
1314 if(entry.isDiscarded()) return;
1315 if(entry.isExcepted()) entry.setPending();
1317 if(used.compareAndSet(false, true)) {
1318 entry.addOrSet(QueryProcessor.this, result);
1319 procedure.execute(result);
1321 entry.queue(result);
1322 updatePrimitive(query);
1328 public void exception(Throwable t) {
1332 if(used.compareAndSet(false, true)) {
1333 procedure.exception(t);
1335 // entry.queue(result);
1336 updatePrimitive(query);
1342 public String toString() {
1343 return procedure.toString();
1347 public boolean isDisposed() {
1348 return entry.isDiscarded() || !isBound(entry);
1353 } catch (Throwable t) {
1356 procedure.exception(t);
1364 entry.performFromCache(procedure);
1370 assert (!entry.isDiscarded());
1374 private boolean isBound(ExternalReadEntry<?> entry) {
1375 if(entry.hasParents()) return true;
1376 else if(hasListener(entry)) return true;
1380 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
1382 if (parent != null && !inferred) {
1384 if(!child.isImmutable(graph))
1385 child.addParent(parent);
1386 } catch (DatabaseException e) {
1387 Logger.defaultLogError(e);
1389 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
1392 if (listener != null) {
1393 return registerListener(child, listener, procedure);
1400 public <Procedure> void performForEach(ReadGraphImpl graph, BinaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {
1402 if (DebugPolicy.PERFORM)
1403 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1406 assert (!collecting);
1410 registerDependencies(graph, query, parent, listener, procedure, false);
1412 // FRESH, REFUTED, EXCEPTED go here
1413 if (!query.isReady()) {
1415 boolean fresh = query.isFresh();
1421 query.computeForEach(graph, this, procedure, true);
1427 query.performFromCache(graph, this, procedure);
1433 } catch (Throwable t) {
1435 Logger.defaultLogError(t);
1440 public <Procedure> Object performForEach(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {
1442 if (DebugPolicy.PERFORM)
1443 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1446 assert (!collecting);
1450 assert(query.assertNotDiscarded());
1452 registerDependencies(graph, query, parent, listener, procedure, false);
1454 // FRESH, REFUTED, EXCEPTED go here
1455 if (!query.isReady()) {
1460 return query.computeForEach(graph, this, procedure, true);
1467 return query.performFromCache(graph, this, procedure);
1471 } catch (Throwable t) {
1473 Logger.defaultLogError(t);
1480 static class Dummy implements InternalProcedure<Object>, IntProcedure {
1483 public void execute(ReadGraphImpl graph, int i) {
1487 public void finished(ReadGraphImpl graph) {
1491 public void execute(ReadGraphImpl graph, Object result) {
1495 public void exception(ReadGraphImpl graph, Throwable throwable) {
1500 private static final Dummy dummy = new Dummy();
1502 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
1504 if (DebugPolicy.PERFORM)
1505 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1508 assert (!collecting);
1510 assert(query.assertNotDiscarded());
1512 registerDependencies(graph, query, parent, listener, procedure, false);
1514 // FRESH, REFUTED, EXCEPTED go here
1515 if (!query.isReady()) {
1520 query.computeForEach(graph, this, (Procedure)dummy, true);
1521 return query.get(graph, this, null);
1527 return query.get(graph, this, procedure);
1533 public <Procedure> void performForEach(ReadGraphImpl graph, StringQuery<Procedure> query, CacheEntry parent, final ListenerBase listener, Procedure procedure) {
1535 if (DebugPolicy.PERFORM)
1536 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1539 assert (!collecting);
1543 if(query.isDiscarded()) {
1544 System.err.println("aff");
1546 assert(!query.isDiscarded());
1548 // FRESH, REFUTED, EXCEPTED go here
1549 if (!query.isReady()) {
1551 query.computeForEach(graph.withAsyncParent(query), this, procedure);
1558 query.performFromCache(graph, this, procedure);
1564 assert (!query.isDiscarded());
1566 registerDependencies(graph, query, parent, listener, procedure, false);
1568 } catch (Throwable t) {
1570 t.printStackTrace();
1571 Logger.defaultLogError(t);
1577 interface QueryCollectorSupport {
1578 public CacheCollectionResult allCaches();
1579 public Collection<CacheEntry> getRootList();
1580 public int getCurrentSize();
1581 public int calculateCurrentSize();
1582 public CacheEntryBase iterate(int level);
1583 public void remove();
1584 public void setLevel(CacheEntryBase entry, int level);
1585 public boolean start(boolean flush);
1588 interface QueryCollector {
1590 public void collect(int youngTarget, int allowedTimeInMs);
1594 class QueryCollectorSupportImpl implements QueryCollectorSupport {
1596 private static final boolean DEBUG = false;
1597 private static final double ITERATION_RATIO = 0.2;
1599 private CacheCollectionResult iteration = new CacheCollectionResult();
1600 private boolean fresh = true;
1601 private boolean needDataInStart = true;
1603 QueryCollectorSupportImpl() {
1604 iteration.restart();
1607 public CacheCollectionResult allCaches() {
1608 CacheCollectionResult result = new CacheCollectionResult();
1609 QueryProcessor.this.allCaches(result);
1614 public boolean start(boolean flush) {
1615 // We need new data from query maps
1617 if(needDataInStart || flush) {
1618 // Last run ended after processing all queries => refresh data
1619 restart(flush ? 0.0 : ITERATION_RATIO);
1621 // continue with previous big data
1623 // Notify caller about iteration situation
1624 return iteration.isAtStart();
1627 private void restart(double targetRatio) {
1629 needDataInStart = true;
1631 long start = System.nanoTime();
1634 // We need new data from query maps
1636 int iterationSize = iteration.size()+1;
1637 int diff = calculateCurrentSize()-iterationSize;
1639 double ratio = (double)diff / (double)iterationSize;
1640 boolean dirty = Math.abs(ratio) >= targetRatio;
1643 iteration = allCaches();
1645 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
1646 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
1647 System.err.print(" " + iteration.levels[i].size());
1648 System.err.println("");
1651 iteration.restart();
1655 needDataInStart = false;
1657 // We are returning here within the same GC round - reuse the cache table
1658 iteration.restart();
1666 public CacheEntryBase iterate(int level) {
1668 CacheEntryBase entry = iteration.next(level);
1670 restart(ITERATION_RATIO);
1674 while(entry != null && entry.isDiscarded()) {
1675 entry = iteration.next(level);
1683 public void remove() {
1688 public void setLevel(CacheEntryBase entry, int level) {
1689 iteration.setLevel(entry, level);
1692 public Collection<CacheEntry> getRootList() {
1694 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>();
1696 for (Object e : valueMap.values()) {
1697 result.add((CacheEntry) e);
1699 for (Object e : directPredicatesMap.values()) {
1700 result.add((CacheEntry) e);
1702 for (Object e : objectsMap.values()) {
1703 result.add((CacheEntry) e);
1705 for (Object e : directObjectsMap.values()) {
1706 result.add((CacheEntry) e);
1708 for (Object e : principalTypesMap.values()) {
1709 result.add((CacheEntry) e);
1711 for (Object e : superRelationsMap.values()) {
1712 result.add((CacheEntry) e);
1714 for (Object e : superTypesMap.values()) {
1715 result.add((CacheEntry) e);
1717 for (Object e : typesMap.values()) {
1718 result.add((CacheEntry) e);
1720 for (Object e : objectsMap.values()) {
1721 result.add((CacheEntry) e);
1723 for (Object e : assertedStatementsMap.values()) {
1724 result.add((CacheEntry) e);
1726 for (Object e : readMap.values()) {
1727 if(e instanceof CacheEntry) {
1728 result.add((CacheEntry) e);
1730 System.err.println("e=" + e);
1733 for (Object e : asyncReadMap.values()) {
1734 if(e instanceof CacheEntry) {
1735 result.add((CacheEntry) e);
1737 System.err.println("e=" + e);
1740 for (Object e : externalReadMap.values()) {
1741 result.add((CacheEntry) e);
1743 for (Object e : orderedSetMap.values()) {
1744 result.add((CacheEntry) e);
1752 public int calculateCurrentSize() {
1756 realSize += directPredicatesMap.size();
1757 realSize += principalTypesMap.size();
1758 realSize += uriToResourceMap.size();
1759 realSize += namespaceIndexMap22.size();
1760 realSize += projectsMap.size();
1762 realSize += relationInfoMap.size();
1763 realSize += superTypesMap.size();
1764 realSize += typeHierarchyMap.size();
1765 realSize += superRelationsMap.size();
1766 realSize += typesMap.size();
1768 realSize += valueMap.size();
1769 realSize += directObjectsMap.size();
1770 realSize += objectsMap.size();
1771 realSize += orderedSetMap.size();
1772 realSize += predicatesMap.size();
1774 realSize += statementsMap.size();
1775 realSize += assertedPredicatesMap.size();
1776 realSize += assertedStatementsMap.size();
1777 realSize += externalReadMap.size();
1778 realSize += asyncReadMap.size();
1780 realSize += readMap.size();
1781 realSize += asyncMultiReadMap.size();
1782 realSize += multiReadMap.size();
1791 public int getCurrentSize() {
1796 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
1798 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
1799 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
1801 public int querySize() {
1805 public void gc(int youngTarget, int allowedTimeInMs) {
1807 collector.collect(youngTarget, allowedTimeInMs);
1811 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
1813 assert (entry != null);
1815 if (base.isDisposed())
1818 return addListener(entry, base, procedure);
1822 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
1823 entry.setLastKnown(result);
1826 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
1828 assert (entry != null);
1829 assert (procedure != null);
1831 ArrayList<ListenerEntry> list = listeners.get(entry);
1833 list = new ArrayList<ListenerEntry>(1);
1834 listeners.put(entry, list);
1837 ListenerEntry result = new ListenerEntry(entry, base, procedure);
1838 int currentIndex = list.indexOf(result);
1839 // There was already a listener
1840 if(currentIndex > -1) {
1841 ListenerEntry current = list.get(currentIndex);
1842 if(!current.base.isDisposed()) return null;
1843 list.set(currentIndex, result);
1848 if(DebugPolicy.LISTENER) {
1849 new Exception().printStackTrace();
1850 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
1857 private void scheduleListener(ListenerEntry entry) {
1858 assert (entry != null);
1859 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
1860 scheduledListeners.add(entry);
1863 private void removeListener(ListenerEntry entry) {
1864 assert (entry != null);
1865 ArrayList<ListenerEntry> list = listeners.get(entry.entry);
1866 if(list == null) return;
1867 boolean success = list.remove(entry);
1870 listeners.remove(entry.entry);
1873 private boolean hasListener(CacheEntry entry) {
1874 if(listeners.get(entry) != null) return true;
1878 boolean hasListenerAfterDisposing(CacheEntry entry) {
1879 if(listeners.get(entry) != null) {
1880 ArrayList<ListenerEntry> entries = listeners.get(entry);
1881 ArrayList<ListenerEntry> list = null;
1882 for (ListenerEntry e : entries) {
1883 if (e.base.isDisposed()) {
1884 if(list == null) list = new ArrayList<ListenerEntry>();
1889 for (ListenerEntry e : list) {
1893 if (entries.isEmpty()) {
1894 listeners.remove(entry);
1902 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
1903 hasListenerAfterDisposing(entry);
1904 if(listeners.get(entry) != null)
1905 return listeners.get(entry);
1907 return Collections.emptyList();
1910 void processListenerReport(CacheEntry entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
1912 if(!workarea.containsKey(entry)) {
1914 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
1915 for(ListenerEntry e : getListenerEntries(entry))
1918 workarea.put(entry, ls);
1920 for(CacheEntry parent : entry.getParents(this)) {
1921 processListenerReport(parent, workarea);
1922 ls.addAll(workarea.get(parent));
1929 public synchronized ListenerReport getListenerReport() throws IOException {
1931 class ListenerReportImpl implements ListenerReport {
1933 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
1936 public void print(PrintStream b) {
1937 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
1938 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
1939 for(ListenerBase l : e.getValue()) {
1940 Integer i = hist.get(l);
1941 hist.put(l, i != null ? i-1 : -1);
1945 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1946 b.print("" + -p.second + " " + p.first + "\n");
1954 ListenerReportImpl result = new ListenerReportImpl();
1956 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1957 for(CacheEntryBase entry : all) {
1958 hasListenerAfterDisposing(entry);
1960 for(CacheEntryBase entry : all) {
1961 processListenerReport(entry, result.workarea);
1968 public synchronized String reportListeners(File file) throws IOException {
1973 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1974 ListenerReport report = getListenerReport();
1977 return "Done reporting listeners.";
1981 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1983 if(entry.isDiscarded()) return;
1984 if(workarea.containsKey(entry)) return;
1986 Iterable<CacheEntry> parents = entry.getParents(this);
1987 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1988 for(CacheEntry e : parents) {
1989 if(e.isDiscarded()) continue;
1991 processParentReport(e, workarea);
1993 workarea.put(entry, ps);
1997 public synchronized String reportQueryActivity(File file) throws IOException {
1999 System.err.println("reportQueries " + file.getAbsolutePath());
2004 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
2006 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
2007 Collections.reverse(entries);
2009 for(Pair<String,Integer> entry : entries) {
2010 b.println(entry.first + ": " + entry.second);
2011 Exception e = Development.histogramExceptions.get(entry.first);
2013 e.printStackTrace(b);
2019 Development.histogram.clear();
2025 public synchronized String reportQueries(File file) throws IOException {
2027 System.err.println("reportQueries " + file.getAbsolutePath());
2032 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
2034 long start = System.nanoTime();
2036 // ArrayList<CacheEntry> all = ;
2038 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
2039 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
2040 for(CacheEntryBase entry : caches) {
2041 processParentReport(entry, workarea);
2044 // for(CacheEntry e : all) System.err.println("entry: " + e);
2046 long duration = System.nanoTime() - start;
2047 System.err.println("Query root set in " + 1e-9*duration + "s.");
2049 start = System.nanoTime();
2051 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
2055 for(CacheEntry entry : workarea.keySet()) {
2056 boolean listener = hasListenerAfterDisposing(entry);
2057 boolean hasParents = entry.getParents(this).iterator().hasNext();
2060 flagMap.put(entry, 0);
2061 } else if (!hasParents) {
2063 flagMap.put(entry, 1);
2066 flagMap.put(entry, 2);
2068 // // Write leaf bit
2069 // entry.flags |= 4;
2072 boolean done = true;
2079 long start2 = System.nanoTime();
2081 int boundCounter = 0;
2082 int unboundCounter = 0;
2083 int unknownCounter = 0;
2085 for(CacheEntry entry : workarea.keySet()) {
2087 //System.err.println("process " + entry);
2089 int flags = flagMap.get(entry);
2090 int bindStatus = flags & 3;
2092 if(bindStatus == 0) boundCounter++;
2093 else if(bindStatus == 1) unboundCounter++;
2094 else if(bindStatus == 2) unknownCounter++;
2096 if(bindStatus < 2) continue;
2099 for(CacheEntry parent : entry.getParents(this)) {
2101 if(parent.isDiscarded()) flagMap.put(parent, 1);
2103 int flags2 = flagMap.get(parent);
2104 int bindStatus2 = flags2 & 3;
2105 // Parent is bound => child is bound
2106 if(bindStatus2 == 0) {
2110 // Parent is unknown => child is unknown
2111 else if (bindStatus2 == 2) {
2118 flagMap.put(entry, newStatus);
2122 duration = System.nanoTime() - start2;
2123 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
2124 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
2126 } while(!done && loops++ < 20);
2130 for(CacheEntry entry : workarea.keySet()) {
2132 int bindStatus = flagMap.get(entry);
2133 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
2139 duration = System.nanoTime() - start;
2140 System.err.println("Query analysis in " + 1e-9*duration + "s.");
2142 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
2144 for(CacheEntry entry : workarea.keySet()) {
2145 Class<?> clazz = entry.getClass();
2146 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
2147 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
2148 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
2149 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
2150 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
2151 Integer c = counts.get(clazz);
2152 if(c == null) counts.put(clazz, -1);
2153 else counts.put(clazz, c-1);
2156 b.print("// Simantics DB client query report file\n");
2157 b.print("// This file contains the following information\n");
2158 b.print("// -The amount of cached query instances per query class\n");
2159 b.print("// -The sizes of retained child sets\n");
2160 b.print("// -List of parents for each query (search for 'P <query name>')\n");
2161 b.print("// -Followed by status, where\n");
2162 b.print("// -0=bound\n");
2163 b.print("// -1=free\n");
2164 b.print("// -2=unknown\n");
2165 b.print("// -L=has listener\n");
2166 b.print("// -List of children for each query (search for 'C <query name>')\n");
2168 b.print("----------------------------------------\n");
2170 b.print("// Queries by class\n");
2171 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
2172 b.print(-p.second + " " + p.first.getName() + "\n");
2175 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
2176 for(CacheEntry e : workarea.keySet())
2179 boolean changed = true;
2181 while(changed && iter++<50) {
2185 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
2186 for(CacheEntry e : workarea.keySet())
2189 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
2190 Integer c = hist.get(e.getKey());
2191 for(CacheEntry p : e.getValue()) {
2192 Integer i = newHist.get(p);
2193 newHist.put(p, i+c);
2196 for(CacheEntry e : workarea.keySet()) {
2197 Integer value = newHist.get(e);
2198 Integer old = hist.get(e);
2199 if(!value.equals(old)) {
2201 // System.err.println("hist " + e + ": " + old + " => " + value);
2206 System.err.println("Retained set iteration " + iter);
2210 b.print("// Queries by retained set\n");
2211 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
2212 b.print("" + -p.second + " " + p.first + "\n");
2215 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
2217 b.print("// Entry parent listing\n");
2218 for(CacheEntry entry : workarea.keySet()) {
2219 int status = flagMap.get(entry);
2220 boolean hasListener = hasListenerAfterDisposing(entry);
2221 b.print("Q " + entry.toString());
2223 b.print(" (L" + status + ")");
2226 b.print(" (" + status + ")");
2229 for(CacheEntry parent : workarea.get(entry)) {
2230 Collection<CacheEntry> inv = inverse.get(parent);
2232 inv = new ArrayList<CacheEntry>();
2233 inverse.put(parent, inv);
2236 b.print(" " + parent.toString());
2241 b.print("// Entry child listing\n");
2242 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
2243 b.print("C " + entry.getKey().toString());
2245 for(CacheEntry child : entry.getValue()) {
2246 Integer h = hist.get(child);
2250 b.print(" <no children>");
2252 b.print(" " + child.toString());
2257 b.print("#queries: " + workarea.keySet().size() + "\n");
2258 b.print("#listeners: " + listeners + "\n");
2262 return "Dumped " + workarea.keySet().size() + " queries.";
2268 public CacheEntry caller;
2270 public CacheEntry entry;
2274 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
2275 this.caller = caller;
2277 this.indent = indent;
2282 boolean removeQuery(CacheEntry entry) {
2284 // This entry has been removed before. No need to do anything here.
2285 if(entry.isDiscarded()) return false;
2287 assert (!entry.isDiscarded());
2289 Query query = entry.getQuery();
2291 query.removeEntry(this);
2296 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
2307 * @return true if this entry is being listened
2309 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
2313 CacheEntry entry = e.entry;
2315 // System.err.println("updateQuery " + entry);
2318 * If the dependency graph forms a DAG, some entries are inserted in the
2319 * todo list many times. They only need to be processed once though.
2321 if (entry.isDiscarded()) {
2322 if (Development.DEVELOPMENT) {
2323 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2324 System.out.print("D");
2325 for (int i = 0; i < e.indent; i++)
2326 System.out.print(" ");
2327 System.out.println(entry.getQuery());
2330 // System.err.println(" => DISCARDED");
2334 if (entry.isRefuted()) {
2335 if (Development.DEVELOPMENT) {
2336 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2337 System.out.print("R");
2338 for (int i = 0; i < e.indent; i++)
2339 System.out.print(" ");
2340 System.out.println(entry.getQuery());
2346 if (entry.isExcepted()) {
2347 if (Development.DEVELOPMENT) {
2348 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2349 System.out.print("E");
2354 if (entry.isPending()) {
2355 if (Development.DEVELOPMENT) {
2356 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2357 System.out.print("P");
2364 if (Development.DEVELOPMENT) {
2365 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2366 System.out.print("U ");
2367 for (int i = 0; i < e.indent; i++)
2368 System.out.print(" ");
2369 System.out.print(entry.getQuery());
2373 Query query = entry.getQuery();
2374 int type = query.type();
2376 boolean hasListener = hasListener(entry);
2378 if (Development.DEVELOPMENT) {
2379 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2380 if(hasListener(entry)) {
2381 System.out.println(" (L)");
2383 System.out.println("");
2388 if(entry.isPending() || entry.isExcepted()) {
2391 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
2393 immediates.put(entry, entry);
2408 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
2410 immediates.put(entry, entry);
2424 // System.err.println(" => FOO " + type);
2427 ArrayList<ListenerEntry> entries = listeners.get(entry);
2428 if(entries != null) {
2429 for (ListenerEntry le : entries) {
2430 scheduleListener(le);
2435 // If invalid, update parents
2436 if (type == RequestFlags.INVALIDATE) {
2437 updateParents(e.indent, entry, todo);
2444 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
2446 Iterable<CacheEntry> oldParents = entry.getParents(this);
2447 for (CacheEntry parent : oldParents) {
2448 // System.err.println("updateParents " + entry + " => " + parent);
2449 if(!parent.isDiscarded())
2450 todo.push(new UpdateEntry(entry, parent, indent + 2));
2455 private boolean pruneListener(ListenerEntry entry) {
2456 if (entry.base.isDisposed()) {
2457 removeListener(entry);
2465 * @param av1 an array (guaranteed)
2466 * @param av2 any object
2467 * @return <code>true</code> if the two arrays are equal
2469 private final boolean arrayEquals(Object av1, Object av2) {
2472 Class<?> c1 = av1.getClass().getComponentType();
2473 Class<?> c2 = av2.getClass().getComponentType();
2474 if (c2 == null || !c1.equals(c2))
2476 boolean p1 = c1.isPrimitive();
2477 boolean p2 = c2.isPrimitive();
2481 return Arrays.equals((Object[]) av1, (Object[]) av2);
2482 if (boolean.class.equals(c1))
2483 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
2484 else if (byte.class.equals(c1))
2485 return Arrays.equals((byte[]) av1, (byte[]) av2);
2486 else if (int.class.equals(c1))
2487 return Arrays.equals((int[]) av1, (int[]) av2);
2488 else if (long.class.equals(c1))
2489 return Arrays.equals((long[]) av1, (long[]) av2);
2490 else if (float.class.equals(c1))
2491 return Arrays.equals((float[]) av1, (float[]) av2);
2492 else if (double.class.equals(c1))
2493 return Arrays.equals((double[]) av1, (double[]) av2);
2494 throw new RuntimeException("??? Contact application querySupport.");
2499 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
2503 Query query = entry.getQuery();
2505 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
2507 entry.prepareRecompute(querySupport);
2509 ReadGraphImpl parentGraph = graph.withParent(entry);
2511 query.recompute(parentGraph, this, entry);
2513 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
2515 Object newValue = entry.getResult();
2517 if (ListenerEntry.NO_VALUE == oldValue) {
2518 if(DebugPolicy.CHANGES) {
2519 System.out.println("C " + query);
2520 System.out.println("- " + oldValue);
2521 System.out.println("- " + newValue);
2526 boolean changed = false;
2528 if (newValue != null) {
2529 if (newValue.getClass().isArray()) {
2530 changed = !arrayEquals(newValue, oldValue);
2532 changed = !newValue.equals(oldValue);
2535 changed = (oldValue != null);
2537 if(DebugPolicy.CHANGES && changed) {
2538 System.out.println("C " + query);
2539 System.out.println("- " + oldValue);
2540 System.out.println("- " + newValue);
2543 return changed ? newValue : ListenerEntry.NOT_CHANGED;
2545 } catch (Throwable t) {
2547 Logger.defaultLogError(t);
2549 return ListenerEntry.NO_VALUE;
2555 public boolean hasScheduledUpdates() {
2556 return !scheduledListeners.isEmpty();
2559 public void performScheduledUpdates(WriteGraphImpl graph) {
2562 assert (!collecting);
2563 assert (!firingListeners);
2565 firingListeners = true;
2569 // Performing may cause further events to be scheduled.
2570 while (!scheduledListeners.isEmpty()) {
2573 // graph.state.barrier.inc();
2575 // Clone current events to make new entries possible during
2577 THashSet<ListenerEntry> entries = scheduledListeners;
2578 scheduledListeners = new THashSet<ListenerEntry>();
2580 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
2582 for (ListenerEntry listenerEntry : entries) {
2584 if (pruneListener(listenerEntry)) {
2585 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
2589 final CacheEntry entry = listenerEntry.entry;
2590 assert (entry != null);
2592 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
2594 if (newValue != ListenerEntry.NOT_CHANGED) {
2595 if(DebugPolicy.LISTENER)
2596 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
2597 schedule.add(listenerEntry);
2598 listenerEntry.setLastKnown(entry.getResult());
2603 for(ListenerEntry listenerEntry : schedule) {
2604 final CacheEntry entry = listenerEntry.entry;
2605 if(DebugPolicy.LISTENER)
2606 System.out.println("Firing " + listenerEntry.procedure);
2608 if(DebugPolicy.LISTENER)
2609 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
2610 entry.performFromCache(graph, this, listenerEntry.procedure);
2611 } catch (Throwable t) {
2612 t.printStackTrace();
2616 // graph.state.barrier.dec();
2617 // graph.waitAsync(null);
2618 // graph.state.barrier.assertReady();
2623 firingListeners = false;
2630 * @return true if this entry still has listeners
2632 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
2634 assert (!collecting);
2638 boolean hadListeners = false;
2639 boolean listenersUnknown = false;
2643 assert(entry != null);
2644 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
2645 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
2646 todo.add(new UpdateEntry(null, entry, 0));
2650 // Walk the tree and collect immediate updates
2651 while (!todo.isEmpty()) {
2652 UpdateEntry e = todo.pop();
2653 hadListeners |= updateQuery(e, todo, immediates);
2656 if(immediates.isEmpty()) break;
2658 // Evaluate all immediate updates and collect parents to update
2659 for(CacheEntry immediate : immediates.values()) {
2661 if(immediate.isDiscarded()) {
2665 if(immediate.isExcepted()) {
2667 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
2668 if (newValue != ListenerEntry.NOT_CHANGED)
2669 updateParents(0, immediate, todo);
2673 Object oldValue = immediate.getResult();
2674 Object newValue = compareTo(graph, immediate, oldValue);
2676 if (newValue != ListenerEntry.NOT_CHANGED) {
2677 updateParents(0, immediate, todo);
2679 // If not changed, keep the old value
2680 immediate.setResult(oldValue);
2681 listenersUnknown = true;
2691 } catch (Throwable t) {
2692 Logger.defaultLogError(t);
2698 return hadListeners | listenersUnknown;
2702 volatile public boolean dirty = false;
2704 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
2705 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
2706 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
2707 // Maybe use a mutex from util.concurrent?
2708 private Object primitiveUpdateLock = new Object();
2709 private THashSet scheduledPrimitiveUpdates = new THashSet();
2711 public void performDirtyUpdates(final ReadGraphImpl graph) {
2716 if (Development.DEVELOPMENT) {
2717 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2718 System.err.println("== Query update ==");
2722 // Special case - one statement
2723 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
2725 long arg0 = scheduledObjectUpdates.getFirst();
2727 final int subject = (int)(arg0 >>> 32);
2728 final int predicate = (int)(arg0 & 0xffffffff);
2730 for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);
2731 for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);
2732 for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);
2734 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
2735 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);
2736 if(principalTypes != null) update(graph, principalTypes);
2737 Types types = Types.entry(QueryProcessor.this, subject);
2738 if(types != null) update(graph, types);
2741 if(predicate == subrelationOf) {
2742 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
2743 if(superRelations != null) update(graph, superRelations);
2746 DirectPredicates dp = DirectPredicates.entry(QueryProcessor.this, subject);
2747 if(dp != null) update(graph, dp);
2748 OrderedSet os = OrderedSet.entry(QueryProcessor.this, predicate);
2749 if(os != null) update(graph, os);
2751 scheduledObjectUpdates.clear();
2756 // Special case - one value
2757 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
2759 int arg0 = scheduledValueUpdates.getFirst();
2761 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);
2762 if(valueQuery != null) update(graph, valueQuery);
2764 scheduledValueUpdates.clear();
2769 final TIntHashSet predicates = new TIntHashSet();
2770 final TIntHashSet orderedSets = new TIntHashSet();
2772 THashSet primitiveUpdates;
2773 synchronized (primitiveUpdateLock) {
2774 primitiveUpdates = scheduledPrimitiveUpdates;
2775 scheduledPrimitiveUpdates = new THashSet();
2778 primitiveUpdates.forEach(new TObjectProcedure() {
2781 public boolean execute(Object arg0) {
2783 ExternalReadEntry query = (ExternalReadEntry)externalReadMap.get(arg0);
2784 if (query != null) {
2785 boolean listening = update(graph, query);
2786 if (!listening && !query.hasParents()) {
2787 externalReadMap.remove(arg0);
2796 scheduledValueUpdates.forEach(new TIntProcedure() {
2799 public boolean execute(int arg0) {
2800 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);
2801 if(valueQuery != null) update(graph, valueQuery);
2807 scheduledInvalidates.forEach(new TIntProcedure() {
2810 public boolean execute(int resource) {
2812 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, resource);
2813 if(valueQuery != null) update(graph, valueQuery);
2815 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, resource);
2816 if(principalTypes != null) update(graph, principalTypes);
2817 Types types = Types.entry(QueryProcessor.this, resource);
2818 if(types != null) update(graph, types);
2820 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
2821 if(superRelations != null) update(graph, superRelations);
2823 predicates.add(resource);
2830 scheduledObjectUpdates.forEach(new TLongProcedure() {
2833 public boolean execute(long arg0) {
2835 final int subject = (int)(arg0 >>> 32);
2836 final int predicate = (int)(arg0 & 0xffffffff);
2838 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
2839 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);
2840 if(principalTypes != null) update(graph, principalTypes);
2841 Types types = Types.entry(QueryProcessor.this, subject);
2842 if(types != null) update(graph, types);
2845 if(predicate == subrelationOf) {
2846 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
2847 if(superRelations != null) update(graph, superRelations);
2850 predicates.add(subject);
2851 orderedSets.add(predicate);
2859 predicates.forEach(new TIntProcedure() {
2862 public boolean execute(final int subject) {
2864 for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);
2865 for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);
2866 for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);
2868 DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
2869 if(entry != null) update(graph, entry);
2877 orderedSets.forEach(new TIntProcedure() {
2880 public boolean execute(int orderedSet) {
2882 OrderedSet entry = OrderedSet.entry(QueryProcessor.this, orderedSet);
2883 if(entry != null) update(graph, entry);
2891 // for (Integer subject : predicates) {
2892 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
2893 // if(entry != null) update(graph, entry);
2897 if (Development.DEVELOPMENT) {
2898 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2899 System.err.println("== Query update ends ==");
2903 scheduledValueUpdates.clear();
2904 scheduledObjectUpdates.clear();
2905 scheduledInvalidates.clear();
2909 public void updateValue(final int resource) {
2910 scheduledValueUpdates.add(resource);
2914 public void updateStatements(final int resource, final int predicate) {
2915 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
2919 private int lastInvalidate = 0;
2921 public void invalidateResource(final int resource) {
2922 if(lastInvalidate == resource) return;
2923 scheduledValueUpdates.add(resource);
2924 lastInvalidate = resource;
2928 public void updatePrimitive(final ExternalRead primitive) {
2930 // External reads may be updated from arbitrary threads.
2931 // Synchronize to prevent race-conditions.
2932 synchronized (primitiveUpdateLock) {
2933 scheduledPrimitiveUpdates.add(primitive);
2935 querySupport.dirtyPrimitives();
2940 public synchronized String toString() {
2941 return "QueryProvider [size = " + size + ", hits = " + hits + " misses = " + misses + ", updates = " + updates + "]";
2945 protected void doDispose() {
2947 for(int index = 0; index < THREADS; index++) {
2948 executors[index].dispose();
2952 for(int i=0;i<100;i++) {
2954 boolean alive = false;
2955 for(int index = 0; index < THREADS; index++) {
2956 alive |= executors[index].isAlive();
2961 } catch (InterruptedException e) {
2962 Logger.defaultLogError(e);
2967 // Then start interrupting
2968 for(int i=0;i<100;i++) {
2970 boolean alive = false;
2971 for(int index = 0; index < THREADS; index++) {
2972 alive |= executors[index].isAlive();
2975 for(int index = 0; index < THREADS; index++) {
2976 executors[index].interrupt();
2980 // // Then just destroy
2981 // for(int index = 0; index < THREADS; index++) {
2982 // executors[index].destroy();
2985 for(int index = 0; index < THREADS; index++) {
2987 executors[index].join(5000);
2988 } catch (InterruptedException e) {
2989 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2991 executors[index] = null;
2996 public int getHits() {
3000 public int getMisses() {
3004 public int getSize() {
3008 public Set<Long> getReferencedClusters() {
3009 HashSet<Long> result = new HashSet<Long>();
3010 for (CacheEntry entry : objectsMap.values()) {
3011 Objects query = (Objects) entry.getQuery();
3012 result.add(querySupport.getClusterId(query.r1()));
3014 for (CacheEntry entry : directPredicatesMap.values()) {
3015 DirectPredicates query = (DirectPredicates) entry.getQuery();
3016 result.add(querySupport.getClusterId(query.id));
3018 for (CacheEntry entry : valueMap.values()) {
3019 ValueQuery query = (ValueQuery) entry.getQuery();
3020 result.add(querySupport.getClusterId(query.id));
3025 public void assertDone() {
3028 CacheCollectionResult allCaches(CacheCollectionResult result) {
3030 int level = Integer.MAX_VALUE;
3031 directPredicatesMap.values(level, result);
3032 principalTypesMap.values(level, result);
3033 for(CacheEntryBase e : uriToResourceMap.values())
3034 if(e.getLevel() <= level)
3036 for(CacheEntryBase e : namespaceIndexMap22.values())
3037 if(e.getLevel() <= level)
3039 projectsMap.values(level, result);
3041 relationInfoMap.values(level, result);
3042 superTypesMap.values(level, result);
3043 typeHierarchyMap.values(level, result);
3044 superRelationsMap.values(level, result);
3045 typesMap.values(level, result);
3047 valueMap.values(level, result);
3048 directObjectsMap.values(level, result);
3049 objectsMap.values(level, result);
3050 orderedSetMap.values(level, result);
3051 predicatesMap.values(level, result);
3053 statementsMap.values(level, result);
3054 assertedPredicatesMap.values(level, result);
3055 assertedStatementsMap.values(level, result);
3056 externalReadMap.values(level, result);
3057 asyncReadMap.values(level, result);
3059 readMap.values(level, result);
3060 asyncMultiReadMap.values(level, result);
3061 multiReadMap.values(level, result);
3067 public void printDiagnostics() {
3070 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
3071 querySupport.requestCluster(graph, clusterId, runnable);
3074 public int clean() {
3075 collector.collect(0, Integer.MAX_VALUE);
3079 public void clean(final Collection<ExternalRead<?>> requests) {
3080 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
3081 Iterator<ExternalRead<?>> iterator = requests.iterator();
3083 public CacheCollectionResult allCaches() {
3084 throw new UnsupportedOperationException();
3087 public CacheEntryBase iterate(int level) {
3088 if(iterator.hasNext()) {
3089 ExternalRead<?> request = iterator.next();
3090 ExternalReadEntry entry = externalReadMap.get(request);
3091 if (entry != null) return entry;
3092 else return iterate(level);
3094 iterator = requests.iterator();
3099 public void remove() {
3100 throw new UnsupportedOperationException();
3103 public void setLevel(CacheEntryBase entry, int level) {
3104 throw new UnsupportedOperationException();
3107 public Collection<CacheEntry> getRootList() {
3108 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
3109 for (ExternalRead<?> request : requests) {
3110 ExternalReadEntry entry = externalReadMap.get(request);
3117 public int getCurrentSize() {
3121 public int calculateCurrentSize() {
3122 // This tells the collector to attempt collecting everything.
3123 return Integer.MAX_VALUE;
3126 public boolean start(boolean flush) {
3130 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
3133 public void scanPending() {
3135 ArrayList<CacheEntry> entries = new ArrayList<CacheEntry>();
3137 entries.addAll(directPredicatesMap.values());
3138 entries.addAll(principalTypesMap.values());
3139 entries.addAll(uriToResourceMap.values());
3140 entries.addAll(namespaceIndexMap22.values());
3141 entries.addAll(projectsMap.values());
3142 entries.addAll(relationInfoMap.values());
3143 entries.addAll(superTypesMap.values());
3144 entries.addAll(superRelationsMap.values());
3145 entries.addAll(typesMap.values());
3146 entries.addAll(valueMap.values());
3147 entries.addAll(directObjectsMap.values());
3148 entries.addAll(objectsMap.values());
3149 entries.addAll(orderedSetMap.values());
3150 entries.addAll(predicatesMap.values());
3151 entries.addAll(orderedSetMap.values());
3152 entries.addAll(statementsMap.values());
3153 // entries.addAll(assertedObjectsMap.values());
3154 entries.addAll(assertedPredicatesMap.values());
3155 entries.addAll(assertedStatementsMap.values());
3156 entries.addAll(externalReadMap.values());
3157 entries.addAll(asyncReadMap.values());
3158 entries.addAll(externalReadMap.values());
3159 entries.addAll(readMap.values());
3160 entries.addAll(asyncMultiReadMap.values());
3161 entries.addAll(multiReadMap.values());
3162 entries.addAll(readMap.values());
3163 System.out.println(entries.size() + " entries.");
3164 for(Object e : entries) {
3165 if(e instanceof CacheEntry) {
3166 CacheEntry en = (CacheEntry)e;
3167 if(en.isPending()) System.out.println("pending " + e);
3168 if(en.isExcepted()) System.out.println("excepted " + e);
3169 if(en.isDiscarded()) System.out.println("discarded " + e);
3170 if(en.isRefuted()) System.out.println("refuted " + e);
3171 if(en.isFresh()) System.out.println("fresh " + e);
3173 //System.out.println("Unknown object " + e);
3179 public ReadGraphImpl graphForVirtualRequest() {
3180 return ReadGraphImpl.createAsync(this);
3184 private HashMap<Resource, Class<?>> builtinValues;
3186 public Class<?> getBuiltinValue(Resource r) {
3187 if(builtinValues == null) initBuiltinValues();
3188 return builtinValues.get(r);
3191 Exception callerException = null;
3193 public interface AsyncBarrier {
3196 // public void inc(String debug);
3197 // public void dec(String debug);
3200 // final public QueryProcessor processor;
3201 // final public QuerySupport support;
3203 // boolean disposed = false;
3205 private void initBuiltinValues() {
3207 Layer0 b = getSession().peekService(Layer0.class);
3208 if(b == null) return;
3210 builtinValues = new HashMap<Resource, Class<?>>();
3212 builtinValues.put(b.String, String.class);
3213 builtinValues.put(b.Double, Double.class);
3214 builtinValues.put(b.Float, Float.class);
3215 builtinValues.put(b.Long, Long.class);
3216 builtinValues.put(b.Integer, Integer.class);
3217 builtinValues.put(b.Byte, Byte.class);
3218 builtinValues.put(b.Boolean, Boolean.class);
3220 builtinValues.put(b.StringArray, String[].class);
3221 builtinValues.put(b.DoubleArray, double[].class);
3222 builtinValues.put(b.FloatArray, float[].class);
3223 builtinValues.put(b.LongArray, long[].class);
3224 builtinValues.put(b.IntegerArray, int[].class);
3225 builtinValues.put(b.ByteArray, byte[].class);
3226 builtinValues.put(b.BooleanArray, boolean[].class);
3230 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
3232 // if (null == provider2) {
3233 // this.processor = null;
3237 // this.processor = provider2;
3238 // support = provider2.getCore();
3239 // initBuiltinValues();
3243 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
3244 // return new ReadGraphSupportImpl(impl.processor);
3248 final public Session getSession() {
3252 final public ResourceSupport getResourceSupport() {
3253 return resourceSupport;
3257 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3259 assert(subject != null);
3260 assert(procedure != null);
3262 final ListenerBase listener = getListenerBase(procedure);
3264 IntProcedure ip = new IntProcedure() {
3266 AtomicBoolean first = new AtomicBoolean(true);
3269 public void execute(ReadGraphImpl graph, int i) {
3272 procedure.execute(graph, querySupport.getResource(i));
3274 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
3276 } catch (Throwable t2) {
3277 Logger.defaultLogError(t2);
3282 public void finished(ReadGraphImpl graph) {
3284 if(first.compareAndSet(true, false)) {
3285 procedure.finished(graph);
3286 // impl.state.barrier.dec(this);
3288 procedure.finished(impl.newRestart(graph));
3291 } catch (Throwable t2) {
3292 Logger.defaultLogError(t2);
3297 public void exception(ReadGraphImpl graph, Throwable t) {
3299 if(first.compareAndSet(true, false)) {
3300 procedure.exception(graph, t);
3301 // impl.state.barrier.dec(this);
3303 procedure.exception(impl.newRestart(graph), t);
3305 } catch (Throwable t2) {
3306 Logger.defaultLogError(t2);
3312 int sId = querySupport.getId(subject);
3314 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Predicates#" + sId);
3315 // else impl.state.barrier.inc(null, null);
3317 Predicates.queryEach(impl, sId, this, impl.parent, listener, ip);
3322 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3324 assert(subject != null);
3325 assert(procedure != null);
3327 final ListenerBase listener = getListenerBase(procedure);
3329 // impl.state.barrier.inc();
3331 Predicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
3334 public void execute(ReadGraphImpl graph, int i) {
3336 procedure.execute(querySupport.getResource(i));
3337 } catch (Throwable t2) {
3338 Logger.defaultLogError(t2);
3343 public void finished(ReadGraphImpl graph) {
3345 procedure.finished();
3346 } catch (Throwable t2) {
3347 Logger.defaultLogError(t2);
3349 // impl.state.barrier.dec();
3353 public void exception(ReadGraphImpl graph, Throwable t) {
3355 procedure.exception(t);
3356 } catch (Throwable t2) {
3357 Logger.defaultLogError(t2);
3359 // impl.state.barrier.dec();
3367 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3369 assert(subject != null);
3371 return Predicates.queryEach2(impl, querySupport.getId(subject), this, impl.parent);
3377 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
3378 final Resource predicate, final MultiProcedure<Statement> procedure) {
3380 assert(subject != null);
3381 assert(predicate != null);
3382 assert(procedure != null);
3384 final ListenerBase listener = getListenerBase(procedure);
3386 // impl.state.barrier.inc();
3388 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
3391 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3393 procedure.execute(querySupport.getStatement(s, p, o));
3394 } catch (Throwable t2) {
3395 Logger.defaultLogError(t2);
3400 public void finished(ReadGraphImpl graph) {
3402 procedure.finished();
3403 } catch (Throwable t2) {
3404 Logger.defaultLogError(t2);
3406 // impl.state.barrier.dec();
3410 public void exception(ReadGraphImpl graph, Throwable t) {
3412 procedure.exception(t);
3413 } catch (Throwable t2) {
3414 Logger.defaultLogError(t2);
3416 // impl.state.barrier.dec();
3424 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
3425 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
3427 assert(subject != null);
3428 assert(predicate != null);
3429 assert(procedure != null);
3431 final ListenerBase listener = getListenerBase(procedure);
3433 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
3435 boolean first = true;
3438 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3441 procedure.execute(graph, querySupport.getStatement(s, p, o));
3443 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
3445 } catch (Throwable t2) {
3446 Logger.defaultLogError(t2);
3451 public void finished(ReadGraphImpl graph) {
3456 procedure.finished(graph);
3457 // impl.state.barrier.dec(this);
3459 procedure.finished(impl.newRestart(graph));
3461 } catch (Throwable t2) {
3462 Logger.defaultLogError(t2);
3468 public void exception(ReadGraphImpl graph, Throwable t) {
3473 procedure.exception(graph, t);
3474 // impl.state.barrier.dec(this);
3476 procedure.exception(impl.newRestart(graph), t);
3478 } catch (Throwable t2) {
3479 Logger.defaultLogError(t2);
3486 int sId = querySupport.getId(subject);
3487 int pId = querySupport.getId(predicate);
3489 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
3490 // else impl.state.barrier.inc(null, null);
3492 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
3497 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
3498 final Resource predicate, final StatementProcedure procedure) {
3500 assert(subject != null);
3501 assert(predicate != null);
3502 assert(procedure != null);
3504 final ListenerBase listener = getListenerBase(procedure);
3506 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
3508 boolean first = true;
3511 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3514 procedure.execute(graph, s, p, o);
3516 procedure.execute(impl.newRestart(graph), s, p, o);
3518 } catch (Throwable t2) {
3519 Logger.defaultLogError(t2);
3524 public void finished(ReadGraphImpl graph) {
3529 procedure.finished(graph);
3530 // impl.state.barrier.dec(this);
3532 procedure.finished(impl.newRestart(graph));
3534 } catch (Throwable t2) {
3535 Logger.defaultLogError(t2);
3541 public void exception(ReadGraphImpl graph, Throwable t) {
3546 procedure.exception(graph, t);
3547 // impl.state.barrier.dec(this);
3549 procedure.exception(impl.newRestart(graph), t);
3551 } catch (Throwable t2) {
3552 Logger.defaultLogError(t2);
3559 int sId = querySupport.getId(subject);
3560 int pId = querySupport.getId(predicate);
3562 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
3563 // else impl.state.barrier.inc(null, null);
3565 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
3570 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3572 assert(subject != null);
3573 assert(predicate != null);
3574 assert(procedure != null);
3576 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3578 private Set<Statement> current = null;
3579 private Set<Statement> run = new HashSet<Statement>();
3582 public void execute(AsyncReadGraph graph, Statement result) {
3584 boolean found = false;
3586 if(current != null) {
3588 found = current.remove(result);
3592 if(!found) procedure.add(graph, result);
3599 public void finished(AsyncReadGraph graph) {
3601 if(current != null) {
3602 for(Statement r : current) procedure.remove(graph, r);
3607 run = new HashSet<Statement>();
3612 public void exception(AsyncReadGraph graph, Throwable t) {
3613 procedure.exception(graph, t);
3617 public boolean isDisposed() {
3618 return procedure.isDisposed();
3626 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
3627 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
3629 assert(subject != null);
3630 assert(predicate != null);
3631 assert(procedure != null);
3633 final ListenerBase listener = getListenerBase(procedure);
3635 // impl.state.barrier.inc();
3637 AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
3640 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3642 procedure.execute(graph, querySupport.getStatement(s, p, o));
3643 } catch (Throwable t2) {
3644 Logger.defaultLogError(t2);
3649 public void finished(ReadGraphImpl graph) {
3651 procedure.finished(graph);
3652 } catch (Throwable t2) {
3653 Logger.defaultLogError(t2);
3655 // impl.state.barrier.dec();
3659 public void exception(ReadGraphImpl graph, Throwable t) {
3661 procedure.exception(graph, t);
3662 } catch (Throwable t2) {
3663 Logger.defaultLogError(t2);
3665 // impl.state.barrier.dec();
3672 private static ListenerBase getListenerBase(Object procedure) {
3673 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
3678 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
3680 assert(subject != null);
3681 assert(predicate != null);
3682 assert(procedure != null);
3684 final ListenerBase listener = getListenerBase(procedure);
3686 // impl.state.barrier.inc();
3688 Objects.runner(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
3691 public void execute(ReadGraphImpl graph, int i) {
3693 procedure.execute(querySupport.getResource(i));
3694 } catch (Throwable t2) {
3695 Logger.defaultLogError(t2);
3700 public void finished(ReadGraphImpl graph) {
3702 procedure.finished();
3703 } catch (Throwable t2) {
3704 Logger.defaultLogError(t2);
3706 // impl.state.barrier.dec();
3710 public void exception(ReadGraphImpl graph, Throwable t) {
3711 System.out.println("forEachObject exception " + t);
3713 procedure.exception(t);
3714 } catch (Throwable t2) {
3715 Logger.defaultLogError(t2);
3717 // impl.state.barrier.dec();
3726 // final public void forEachDirectObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3728 // assert(subject != null);
3729 // assert(predicate != null);
3730 // assert(procedure != null);
3732 // final ListenerBase listener = getListenerBase(procedure);
3734 // int sId = querySupport.getId(subject);
3735 // int pId = querySupport.getId(predicate);
3737 // MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, support);
3739 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectObjects" + sId + "#" + pId);
3740 // else impl.state.barrier.inc(null, null);
3742 // // final Exception caller = new Exception();
3744 // // final Pair<Exception, Exception> exceptions = Pair.make(callerException, new Exception());
3746 // DirectObjects.queryEach(impl, sId, pId, processor, impl.parent, listener, proc);
3751 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3753 assert(subject != null);
3754 assert(procedure != null);
3756 final ListenerBase listener = getListenerBase(procedure);
3758 MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);
3760 int sId = querySupport.getId(subject);
3762 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectPredicates" + sId);
3763 // else impl.state.barrier.inc(null, null);
3765 DirectPredicates.queryEach(impl, sId, this, impl.parent, listener, proc);
3770 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {
3772 assert(subject != null);
3773 assert(procedure != null);
3775 final ListenerBase listener = getListenerBase(procedure);
3777 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3782 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
3784 assert(subject != null);
3785 assert(procedure != null);
3787 final ListenerBase listener = getListenerBase(procedure);
3789 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
3793 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
3796 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
3798 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
3800 private Resource single = null;
3803 public synchronized void execute(AsyncReadGraph graph, Resource result) {
3804 if(single == null) {
3807 single = INVALID_RESOURCE;
3812 public synchronized void finished(AsyncReadGraph graph) {
3813 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
3814 else procedure.execute(graph, single);
3818 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
3819 procedure.exception(graph, throwable);
3826 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
3828 final int sId = querySupport.getId(subject);
3829 final int pId = querySupport.getId(predicate);
3831 Objects.runner(impl, sId, pId, impl.parent, listener, procedure);
3835 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
3837 final int sId = querySupport.getId(subject);
3838 final int pId = querySupport.getId(predicate);
3840 return Objects.runner2(impl, sId, pId, impl.parent);
3844 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3846 assert(subject != null);
3847 assert(predicate != null);
3849 final ListenerBase listener = getListenerBase(procedure);
3851 if(impl.parent != null || listener != null) {
3853 IntProcedure ip = new IntProcedure() {
3855 AtomicBoolean first = new AtomicBoolean(true);
3858 public void execute(ReadGraphImpl graph, int i) {
3861 procedure.execute(impl, querySupport.getResource(i));
3863 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
3865 } catch (Throwable t2) {
3866 Logger.defaultLogError(t2);
3872 public void finished(ReadGraphImpl graph) {
3874 if(first.compareAndSet(true, false)) {
3875 procedure.finished(impl);
3876 // impl.state.barrier.dec(this);
3878 procedure.finished(impl.newRestart(graph));
3880 } catch (Throwable t2) {
3881 Logger.defaultLogError(t2);
3886 public void exception(ReadGraphImpl graph, Throwable t) {
3888 procedure.exception(graph, t);
3889 } catch (Throwable t2) {
3890 Logger.defaultLogError(t2);
3892 // impl.state.barrier.dec(this);
3896 public String toString() {
3897 return "forEachObject with " + procedure;
3902 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
3903 // else impl.state.barrier.inc(null, null);
3905 forEachObject(impl, subject, predicate, listener, ip);
3909 IntProcedure ip = new IntProcedure() {
3912 public void execute(ReadGraphImpl graph, int i) {
3913 procedure.execute(graph, querySupport.getResource(i));
3917 public void finished(ReadGraphImpl graph) {
3918 procedure.finished(graph);
3922 public void exception(ReadGraphImpl graph, Throwable t) {
3923 procedure.exception(graph, t);
3927 public String toString() {
3928 return "forEachObject with " + procedure;
3933 forEachObject(impl, subject, predicate, listener, ip);
3940 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3942 assert(subject != null);
3943 assert(predicate != null);
3944 assert(procedure != null);
3946 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3948 private Set<Resource> current = null;
3949 private Set<Resource> run = new HashSet<Resource>();
3952 public void execute(AsyncReadGraph graph, Resource result) {
3954 boolean found = false;
3956 if(current != null) {
3958 found = current.remove(result);
3962 if(!found) procedure.add(graph, result);
3969 public void finished(AsyncReadGraph graph) {
3971 if(current != null) {
3972 for(Resource r : current) procedure.remove(graph, r);
3977 run = new HashSet<Resource>();
3982 public boolean isDisposed() {
3983 return procedure.isDisposed();
3987 public void exception(AsyncReadGraph graph, Throwable t) {
3988 procedure.exception(graph, t);
3992 public String toString() {
3993 return "forObjectSet " + procedure;
4001 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
4003 assert(subject != null);
4004 assert(procedure != null);
4006 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
4008 private Set<Resource> current = null;
4009 private Set<Resource> run = new HashSet<Resource>();
4012 public void execute(AsyncReadGraph graph, Resource result) {
4014 boolean found = false;
4016 if(current != null) {
4018 found = current.remove(result);
4022 if(!found) procedure.add(graph, result);
4029 public void finished(AsyncReadGraph graph) {
4031 if(current != null) {
4032 for(Resource r : current) procedure.remove(graph, r);
4037 run = new HashSet<Resource>();
4042 public boolean isDisposed() {
4043 return procedure.isDisposed();
4047 public void exception(AsyncReadGraph graph, Throwable t) {
4048 procedure.exception(graph, t);
4052 public String toString() {
4053 return "forPredicateSet " + procedure;
4061 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
4063 assert(subject != null);
4064 assert(procedure != null);
4066 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
4068 private Set<Resource> current = null;
4069 private Set<Resource> run = new HashSet<Resource>();
4072 public void execute(AsyncReadGraph graph, Resource result) {
4074 boolean found = false;
4076 if(current != null) {
4078 found = current.remove(result);
4082 if(!found) procedure.add(graph, result);
4089 public void finished(AsyncReadGraph graph) {
4091 if(current != null) {
4092 for(Resource r : current) procedure.remove(graph, r);
4097 run = new HashSet<Resource>();
4102 public boolean isDisposed() {
4103 return procedure.isDisposed();
4107 public void exception(AsyncReadGraph graph, Throwable t) {
4108 procedure.exception(graph, t);
4112 public String toString() {
4113 return "forPrincipalTypeSet " + procedure;
4121 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
4123 assert(subject != null);
4124 assert(predicate != null);
4125 assert(procedure != null);
4127 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
4129 private Set<Resource> current = null;
4130 private Set<Resource> run = new HashSet<Resource>();
4133 public void execute(AsyncReadGraph graph, Resource result) {
4135 boolean found = false;
4137 if(current != null) {
4139 found = current.remove(result);
4143 if(!found) procedure.add(graph, result);
4150 public void finished(AsyncReadGraph graph) {
4152 if(current != null) {
4153 for(Resource r : current) procedure.remove(graph, r);
4158 run = new HashSet<Resource>();
4163 public boolean isDisposed() {
4164 return procedure.isDisposed();
4168 public void exception(AsyncReadGraph graph, Throwable t) {
4169 procedure.exception(graph, t);
4173 public String toString() {
4174 return "forObjectSet " + procedure;
4182 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
4184 assert(subject != null);
4185 assert(predicate != null);
4186 assert(procedure != null);
4188 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
4190 private Set<Statement> current = null;
4191 private Set<Statement> run = new HashSet<Statement>();
4194 public void execute(AsyncReadGraph graph, Statement result) {
4196 boolean found = false;
4198 if(current != null) {
4200 found = current.remove(result);
4204 if(!found) procedure.add(graph, result);
4211 public void finished(AsyncReadGraph graph) {
4213 if(current != null) {
4214 for(Statement s : current) procedure.remove(graph, s);
4219 run = new HashSet<Statement>();
4224 public boolean isDisposed() {
4225 return procedure.isDisposed();
4229 public void exception(AsyncReadGraph graph, Throwable t) {
4230 procedure.exception(graph, t);
4234 public String toString() {
4235 return "forStatementSet " + procedure;
4243 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
4244 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
4246 assert(subject != null);
4247 assert(predicate != null);
4248 assert(procedure != null);
4250 final ListenerBase listener = getListenerBase(procedure);
4252 // impl.state.barrier.inc();
4254 AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedure() {
4257 public void execute(ReadGraphImpl graph, int s, int p, int o) {
4259 procedure.execute(graph, querySupport.getResource(o));
4260 } catch (Throwable t2) {
4261 Logger.defaultLogError(t2);
4266 public void finished(ReadGraphImpl graph) {
4268 procedure.finished(graph);
4269 } catch (Throwable t2) {
4270 Logger.defaultLogError(t2);
4272 // impl.state.barrier.dec();
4276 public void exception(ReadGraphImpl graph, Throwable t) {
4278 procedure.exception(graph, t);
4279 } catch (Throwable t2) {
4280 Logger.defaultLogError(t2);
4282 // impl.state.barrier.dec();
4290 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4292 assert(subject != null);
4293 assert(procedure != null);
4295 final ListenerBase listener = getListenerBase(procedure);
4297 IntProcedure ip = new IntProcedure() {
4300 public void execute(ReadGraphImpl graph, int i) {
4302 procedure.execute(graph, querySupport.getResource(i));
4303 } catch (Throwable t2) {
4304 Logger.defaultLogError(t2);
4309 public void finished(ReadGraphImpl graph) {
4311 procedure.finished(graph);
4312 } catch (Throwable t2) {
4313 Logger.defaultLogError(t2);
4315 // impl.state.barrier.dec(this);
4319 public void exception(ReadGraphImpl graph, Throwable t) {
4321 procedure.exception(graph, t);
4322 } catch (Throwable t2) {
4323 Logger.defaultLogError(t2);
4325 // impl.state.barrier.dec(this);
4330 int sId = querySupport.getId(subject);
4332 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
4333 // else impl.state.barrier.inc(null, null);
4335 PrincipalTypes.queryEach(impl, sId, this, impl.parent, listener, ip);
4340 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
4342 assert(subject != null);
4343 assert(procedure != null);
4345 final ListenerBase listener = getListenerBase(procedure);
4347 // impl.state.barrier.inc();
4349 PrincipalTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
4352 public void execute(ReadGraphImpl graph, int i) {
4354 procedure.execute(querySupport.getResource(i));
4355 } catch (Throwable t2) {
4356 Logger.defaultLogError(t2);
4361 public void finished(ReadGraphImpl graph) {
4363 procedure.finished();
4364 } catch (Throwable t2) {
4365 Logger.defaultLogError(t2);
4367 // impl.state.barrier.dec();
4371 public void exception(ReadGraphImpl graph, Throwable t) {
4373 procedure.exception(t);
4374 } catch (Throwable t2) {
4375 Logger.defaultLogError(t2);
4377 // impl.state.barrier.dec();
4384 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
4386 assert(subject != null);
4387 assert(procedure != null);
4389 final ListenerBase listener = getListenerBase(procedure);
4391 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
4393 AtomicBoolean first = new AtomicBoolean(true);
4396 public void execute(final ReadGraphImpl graph, IntSet set) {
4398 if(first.compareAndSet(true, false)) {
4399 procedure.execute(graph, set);
4400 // impl.state.barrier.dec(this);
4402 procedure.execute(impl.newRestart(graph), set);
4404 } catch (Throwable t2) {
4405 Logger.defaultLogError(t2);
4410 public void exception(ReadGraphImpl graph, Throwable t) {
4412 if(first.compareAndSet(true, false)) {
4413 procedure.exception(graph, t);
4414 // impl.state.barrier.dec(this);
4416 procedure.exception(impl.newRestart(graph), t);
4418 } catch (Throwable t2) {
4419 Logger.defaultLogError(t2);
4425 int sId = querySupport.getId(subject);
4427 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Types" + sId);
4428 // else impl.state.barrier.inc(null, null);
4430 Types.queryEach(impl, sId, this, impl.parent, listener, ip);
4435 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
4437 assert(subject != null);
4439 return Types.queryEach2(impl, querySupport.getId(subject), this, impl.parent);
4444 final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {
4446 assert(subject != null);
4447 assert(procedure != null);
4449 final ListenerBase listener = getListenerBase(procedure);
4451 // impl.state.barrier.inc();
4453 RelationInfoQuery.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<RelationInfo>() {
4455 AtomicBoolean first = new AtomicBoolean(true);
4458 public void execute(final ReadGraphImpl graph, RelationInfo set) {
4460 if(first.compareAndSet(true, false)) {
4461 procedure.execute(graph, set);
4462 // impl.state.barrier.dec();
4464 procedure.execute(impl.newRestart(graph), set);
4466 } catch (Throwable t2) {
4467 Logger.defaultLogError(t2);
4472 public void exception(ReadGraphImpl graph, Throwable t) {
4474 if(first.compareAndSet(true, false)) {
4475 procedure.exception(graph, t);
4476 // impl.state.barrier.dec("ReadGraphSupportImpl.1353");
4478 procedure.exception(impl.newRestart(graph), t);
4480 } catch (Throwable t2) {
4481 Logger.defaultLogError(t2);
4490 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
4492 assert(subject != null);
4493 assert(procedure != null);
4495 final ListenerBase listener = getListenerBase(procedure);
4497 // impl.state.barrier.inc();
4499 SuperTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<IntSet>() {
4501 AtomicBoolean first = new AtomicBoolean(true);
4504 public void execute(final ReadGraphImpl graph, IntSet set) {
4505 // final HashSet<Resource> result = new HashSet<Resource>();
4506 // set.forEach(new TIntProcedure() {
4509 // public boolean execute(int type) {
4510 // result.add(querySupport.getResource(type));
4516 if(first.compareAndSet(true, false)) {
4517 procedure.execute(graph, set);
4518 // impl.state.barrier.dec();
4520 procedure.execute(impl.newRestart(graph), set);
4522 } catch (Throwable t2) {
4523 Logger.defaultLogError(t2);
4528 public void exception(ReadGraphImpl graph, Throwable t) {
4530 if(first.compareAndSet(true, false)) {
4531 procedure.exception(graph, t);
4532 // impl.state.barrier.dec();
4534 procedure.exception(impl.newRestart(graph), t);
4536 } catch (Throwable t2) {
4537 Logger.defaultLogError(t2);
4546 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4548 assert(subject != null);
4549 assert(procedure != null);
4551 final ListenerBase listener = getListenerBase(procedure);
4553 IntProcedure ip = new IntProcedureAdapter() {
4556 public void execute(final ReadGraphImpl graph, int superRelation) {
4558 procedure.execute(graph, querySupport.getResource(superRelation));
4559 } catch (Throwable t2) {
4560 Logger.defaultLogError(t2);
4565 public void finished(final ReadGraphImpl graph) {
4567 procedure.finished(graph);
4568 } catch (Throwable t2) {
4569 Logger.defaultLogError(t2);
4571 // impl.state.barrier.dec(this);
4576 public void exception(ReadGraphImpl graph, Throwable t) {
4578 procedure.exception(graph, t);
4579 } catch (Throwable t2) {
4580 Logger.defaultLogError(t2);
4582 // impl.state.barrier.dec(this);
4587 int sId = querySupport.getId(subject);
4589 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
4590 // else impl.state.barrier.inc(null, null);
4592 DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
4597 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
4599 assert(subject != null);
4600 assert(procedure != null);
4602 final ListenerBase listener = getListenerBase(procedure);
4604 // impl.state.barrier.inc();
4606 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
4611 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
4613 assert(subject != null);
4614 assert(procedure != null);
4616 final ListenerBase listener = getListenerBase(procedure);
4618 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
4621 public void execute(final ReadGraphImpl graph, IntSet set) {
4622 // final HashSet<Resource> result = new HashSet<Resource>();
4623 // set.forEach(new TIntProcedure() {
4626 // public boolean execute(int type) {
4627 // result.add(querySupport.getResource(type));
4633 procedure.execute(graph, set);
4634 } catch (Throwable t2) {
4635 Logger.defaultLogError(t2);
4637 // impl.state.barrier.dec(this);
4641 public void exception(ReadGraphImpl graph, Throwable t) {
4643 procedure.exception(graph, t);
4644 } catch (Throwable t2) {
4645 Logger.defaultLogError(t2);
4647 // impl.state.barrier.dec(this);
4652 int sId = querySupport.getId(subject);
4654 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
4655 // else impl.state.barrier.inc(null, null);
4657 SuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
4661 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
4663 int sId = querySupport.getId(subject);
4664 return ValueQuery.queryEach(impl, sId, impl.parent);
4668 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
4670 return ValueQuery.queryEach(impl, subject, impl.parent);
4675 final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
4677 assert(subject != null);
4679 int sId = querySupport.getId(subject);
4681 if(procedure != null) {
4683 final ListenerBase listener = getListenerBase(procedure);
4685 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
4687 AtomicBoolean first = new AtomicBoolean(true);
4690 public void execute(ReadGraphImpl graph, byte[] result) {
4692 if(first.compareAndSet(true, false)) {
4693 procedure.execute(graph, result);
4694 // impl.state.barrier.dec(this);
4696 procedure.execute(impl.newRestart(graph), result);
4698 } catch (Throwable t2) {
4699 Logger.defaultLogError(t2);
4704 public void exception(ReadGraphImpl graph, Throwable t) {
4706 if(first.compareAndSet(true, false)) {
4707 procedure.exception(graph, t);
4708 // impl.state.barrier.dec(this);
4710 procedure.exception(impl.newRestart(graph), t);
4712 } catch (Throwable t2) {
4713 Logger.defaultLogError(t2);
4719 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
4720 // else impl.state.barrier.inc(null, null);
4722 return ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
4726 return ValueQuery.queryEach(impl, sId, impl.parent, null, null);
4733 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
4735 assert(subject != null);
4736 assert(procedure != null);
4738 final ListenerBase listener = getListenerBase(procedure);
4740 if(impl.parent != null || listener != null) {
4742 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
4744 AtomicBoolean first = new AtomicBoolean(true);
4747 public void execute(ReadGraphImpl graph, byte[] result) {
4749 if(first.compareAndSet(true, false)) {
4750 procedure.execute(graph, result);
4751 // impl.state.barrier.dec(this);
4753 procedure.execute(impl.newRestart(graph), result);
4755 } catch (Throwable t2) {
4756 Logger.defaultLogError(t2);
4761 public void exception(ReadGraphImpl graph, Throwable t) {
4763 if(first.compareAndSet(true, false)) {
4764 procedure.exception(graph, t);
4765 // impl.state.barrier.dec(this);
4767 procedure.exception(impl.newRestart(graph), t);
4769 } catch (Throwable t2) {
4770 Logger.defaultLogError(t2);
4776 int sId = querySupport.getId(subject);
4778 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
4779 // else impl.state.barrier.inc(null, null);
4781 ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
4785 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
4788 public void execute(ReadGraphImpl graph, byte[] result) {
4790 procedure.execute(graph, result);
4795 public void exception(ReadGraphImpl graph, Throwable t) {
4797 procedure.exception(graph, t);
4803 int sId = querySupport.getId(subject);
4805 ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
4812 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
4814 assert(relation != null);
4815 assert(procedure != null);
4817 final ListenerBase listener = getListenerBase(procedure);
4819 IntProcedure ip = new IntProcedure() {
4821 private int result = 0;
4823 final AtomicBoolean found = new AtomicBoolean(false);
4824 final AtomicBoolean done = new AtomicBoolean(false);
4827 public void finished(ReadGraphImpl graph) {
4829 // Shall fire exactly once!
4830 if(done.compareAndSet(false, true)) {
4833 procedure.exception(graph, new NoInverseException(""));
4834 // impl.state.barrier.dec(this);
4836 procedure.execute(graph, querySupport.getResource(result));
4837 // impl.state.barrier.dec(this);
4839 } catch (Throwable t) {
4840 Logger.defaultLogError(t);
4847 public void execute(ReadGraphImpl graph, int i) {
4849 if(found.compareAndSet(false, true)) {
4852 // Shall fire exactly once!
4853 if(done.compareAndSet(false, true)) {
4855 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
4856 // impl.state.barrier.dec(this);
4857 } catch (Throwable t) {
4858 Logger.defaultLogError(t);
4866 public void exception(ReadGraphImpl graph, Throwable t) {
4867 // Shall fire exactly once!
4868 if(done.compareAndSet(false, true)) {
4870 procedure.exception(graph, t);
4871 // impl.state.barrier.dec(this);
4872 } catch (Throwable t2) {
4873 Logger.defaultLogError(t2);
4880 int sId = querySupport.getId(relation);
4882 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
4883 // else impl.state.barrier.inc(null, null);
4885 Objects.runner(impl, sId, getInverseOf(), impl.parent, listener, ip);
4890 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
4893 assert(procedure != null);
4895 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
4898 public void execute(ReadGraphImpl graph, Integer result) {
4900 procedure.execute(graph, querySupport.getResource(result));
4901 } catch (Throwable t2) {
4902 Logger.defaultLogError(t2);
4904 // impl.state.barrier.dec(this);
4908 public void exception(ReadGraphImpl graph, Throwable t) {
4911 procedure.exception(graph, t);
4912 } catch (Throwable t2) {
4913 Logger.defaultLogError(t2);
4915 // impl.state.barrier.dec(this);
4920 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
4921 // else impl.state.barrier.inc(null, null);
4923 forResource(impl, id, impl.parent, ip);
4928 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
4931 assert(procedure != null);
4933 // impl.state.barrier.inc();
4935 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
4938 public void execute(ReadGraphImpl graph, Integer result) {
4940 procedure.execute(graph, querySupport.getResource(result));
4941 } catch (Throwable t2) {
4942 Logger.defaultLogError(t2);
4944 // impl.state.barrier.dec();
4948 public void exception(ReadGraphImpl graph, Throwable t) {
4950 procedure.exception(graph, t);
4951 } catch (Throwable t2) {
4952 Logger.defaultLogError(t2);
4954 // impl.state.barrier.dec();
4962 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4964 assert(subject != null);
4965 assert(procedure != null);
4967 final ListenerBase listener = getListenerBase(procedure);
4969 // impl.state.barrier.inc();
4971 DirectPredicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
4973 boolean found = false;
4976 public void execute(ReadGraphImpl graph, int object) {
4981 public void finished(ReadGraphImpl graph) {
4983 procedure.execute(graph, found);
4984 } catch (Throwable t2) {
4985 Logger.defaultLogError(t2);
4987 // impl.state.barrier.dec();
4991 public void exception(ReadGraphImpl graph, Throwable t) {
4993 procedure.exception(graph, t);
4994 } catch (Throwable t2) {
4995 Logger.defaultLogError(t2);
4997 // impl.state.barrier.dec();
5005 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
5007 assert(subject != null);
5008 assert(predicate != null);
5009 assert(procedure != null);
5011 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
5013 boolean found = false;
5016 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
5021 synchronized public void finished(AsyncReadGraph graph) {
5023 procedure.execute(graph, found);
5024 } catch (Throwable t2) {
5025 Logger.defaultLogError(t2);
5027 // impl.state.barrier.dec(this);
5031 public void exception(AsyncReadGraph graph, Throwable t) {
5033 procedure.exception(graph, t);
5034 } catch (Throwable t2) {
5035 Logger.defaultLogError(t2);
5037 // impl.state.barrier.dec(this);
5042 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
5043 // else impl.state.barrier.inc(null, null);
5045 forEachObject(impl, subject, predicate, ip);
5050 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
5052 assert(subject != null);
5053 assert(predicate != null);
5054 assert(procedure != null);
5056 // impl.state.barrier.inc();
5058 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
5060 boolean found = false;
5063 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
5064 if(resource.equals(object)) found = true;
5068 synchronized public void finished(AsyncReadGraph graph) {
5070 procedure.execute(graph, found);
5071 } catch (Throwable t2) {
5072 Logger.defaultLogError(t2);
5074 // impl.state.barrier.dec();
5078 public void exception(AsyncReadGraph graph, Throwable t) {
5080 procedure.exception(graph, t);
5081 } catch (Throwable t2) {
5082 Logger.defaultLogError(t2);
5084 // impl.state.barrier.dec();
5092 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
5094 assert(subject != null);
5095 assert(procedure != null);
5097 final ListenerBase listener = getListenerBase(procedure);
5099 // impl.state.barrier.inc();
5101 ValueQuery.queryEach(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
5104 public void execute(ReadGraphImpl graph, byte[] object) {
5105 boolean result = object != null;
5107 procedure.execute(graph, result);
5108 } catch (Throwable t2) {
5109 Logger.defaultLogError(t2);
5111 // impl.state.barrier.dec();
5115 public void exception(ReadGraphImpl graph, Throwable t) {
5117 procedure.exception(graph, t);
5118 } catch (Throwable t2) {
5119 Logger.defaultLogError(t2);
5121 // impl.state.barrier.dec();
5129 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
5131 assert(subject != null);
5132 assert(procedure != null);
5134 final ListenerBase listener = getListenerBase(procedure);
5136 // impl.state.barrier.inc();
5138 OrderedSet.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
5141 public void exception(ReadGraphImpl graph, Throwable t) {
5143 procedure.exception(graph, t);
5144 } catch (Throwable t2) {
5145 Logger.defaultLogError(t2);
5147 // impl.state.barrier.dec();
5151 public void execute(ReadGraphImpl graph, int i) {
5153 procedure.execute(graph, querySupport.getResource(i));
5154 } catch (Throwable t2) {
5155 Logger.defaultLogError(t2);
5160 public void finished(ReadGraphImpl graph) {
5162 procedure.finished(graph);
5163 } catch (Throwable t2) {
5164 Logger.defaultLogError(t2);
5166 // impl.state.barrier.dec();
5174 final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) {
5176 assert(request != null);
5177 assert(procedure != null);
5179 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(request, "#" + request.toString() + ".1999");
5180 // else impl.state.barrier.inc(null, null);
5182 runAsyncRead(impl, request, parent, listener, procedure);
5187 final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
5189 assert(graph != null);
5190 assert(request != null);
5192 final ReadEntry entry = readMap.get(request);
5193 if(entry != null && entry.isReady()) {
5194 return (T)entry.get(graph, this, null);
5196 return request.perform(graph);
5201 final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
5203 assert(graph != null);
5204 assert(request != null);
5206 final ExternalReadEntry<T> entry = externalReadMap.get(request);
5207 if(entry != null && entry.isReady()) {
5208 if(entry.isExcepted()) {
5209 Throwable t = (Throwable)entry.getResult();
5210 if(t instanceof DatabaseException) throw (DatabaseException)t;
5211 else throw new DatabaseException(t);
5213 return (T)entry.getResult();
5217 final DataContainer<T> result = new DataContainer<T>();
5218 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
5220 request.register(graph, new Listener<T>() {
5223 public void exception(Throwable t) {
5228 public void execute(T t) {
5233 public boolean isDisposed() {
5239 Throwable t = exception.get();
5241 if(t instanceof DatabaseException) throw (DatabaseException)t;
5242 else throw new DatabaseException(t);
5245 return result.get();
5252 final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
5254 assert(graph != null);
5255 assert(request != null);
5257 final AsyncReadEntry entry = asyncReadMap.get(request);
5258 if(entry != null && entry.isReady()) {
5259 if(entry.isExcepted()) {
5260 procedure.exception(graph, (Throwable)entry.getResult());
5262 procedure.execute(graph, (T)entry.getResult());
5265 request.perform(graph, procedure);
5271 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
5273 assert(request != null);
5274 assert(procedure != null);
5276 // impl.state.barrier.inc(null, null);
5278 queryMultiRead(impl, request, parent, listener, procedure);
5283 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
5285 assert(request != null);
5286 assert(procedure != null);
5288 // impl.state.barrier.inc();
5290 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
5292 public void execute(AsyncReadGraph graph, T result) {
5295 procedure.execute(graph, result);
5296 } catch (Throwable t2) {
5297 Logger.defaultLogError(t2);
5302 public void finished(AsyncReadGraph graph) {
5305 procedure.finished(graph);
5306 } catch (Throwable t2) {
5307 Logger.defaultLogError(t2);
5310 // impl.state.barrier.dec();
5315 public String toString() {
5316 return procedure.toString();
5320 public void exception(AsyncReadGraph graph, Throwable t) {
5323 procedure.exception(graph, t);
5324 } catch (Throwable t2) {
5325 Logger.defaultLogError(t2);
5328 // impl.state.barrier.dec();
5337 final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {
5339 assert(request != null);
5340 assert(procedure != null);
5342 queryPrimitiveRead(impl, request, parent, listener, new Procedure<T>() {
5345 public void execute(T result) {
5347 procedure.execute(result);
5348 } catch (Throwable t2) {
5349 Logger.defaultLogError(t2);
5354 public String toString() {
5355 return procedure.toString();
5359 public void exception(Throwable t) {
5361 procedure.exception(t);
5362 } catch (Throwable t2) {
5363 Logger.defaultLogError(t2);
5372 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
5374 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
5379 public VirtualGraph getProvider(Resource subject, Resource predicate) {
5381 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
5386 public VirtualGraph getValueProvider(Resource subject) {
5388 return querySupport.getValueProvider(querySupport.getId(subject));
5392 public boolean resumeTasks(ReadGraphImpl graph) {
5394 return querySupport.resume(graph);
5398 public boolean isImmutable(int resourceId) {
5399 return querySupport.isImmutable(resourceId);
5402 public boolean isImmutable(Resource resource) {
5403 ResourceImpl impl = (ResourceImpl)resource;
5404 return isImmutable(impl.id);
5409 public Layer0 getL0(ReadGraph graph) {
5411 L0 = Layer0.getInstance(graph);