1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package org.simantics.db.impl.query;
\r
14 import java.io.BufferedOutputStream;
\r
15 import java.io.File;
\r
16 import java.io.FileOutputStream;
\r
17 import java.io.IOException;
\r
18 import java.io.PrintStream;
\r
19 import java.util.ArrayList;
\r
20 import java.util.Arrays;
\r
21 import java.util.Collection;
\r
22 import java.util.Collections;
\r
23 import java.util.HashMap;
\r
24 import java.util.HashSet;
\r
25 import java.util.IdentityHashMap;
\r
26 import java.util.Iterator;
\r
27 import java.util.LinkedList;
\r
28 import java.util.List;
\r
29 import java.util.Map;
\r
30 import java.util.Set;
\r
31 import java.util.concurrent.Semaphore;
\r
32 import java.util.concurrent.atomic.AtomicBoolean;
\r
33 import java.util.concurrent.atomic.AtomicInteger;
\r
34 import java.util.concurrent.locks.Condition;
\r
35 import java.util.concurrent.locks.ReentrantLock;
\r
37 import org.simantics.databoard.Bindings;
\r
38 import org.simantics.db.AsyncReadGraph;
\r
39 import org.simantics.db.DevelopmentKeys;
\r
40 import org.simantics.db.DirectStatements;
\r
41 import org.simantics.db.ReadGraph;
\r
42 import org.simantics.db.RelationInfo;
\r
43 import org.simantics.db.Resource;
\r
44 import org.simantics.db.Session;
\r
45 import org.simantics.db.Statement;
\r
46 import org.simantics.db.VirtualGraph;
\r
47 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
\r
48 import org.simantics.db.common.utils.Logger;
\r
49 import org.simantics.db.debug.ListenerReport;
\r
50 import org.simantics.db.exception.DatabaseException;
\r
51 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
\r
52 import org.simantics.db.exception.NoInverseException;
\r
53 import org.simantics.db.exception.ResourceNotFoundException;
\r
54 import org.simantics.db.impl.DebugPolicy;
\r
55 import org.simantics.db.impl.ResourceImpl;
\r
56 import org.simantics.db.impl.graph.MultiIntProcedure;
\r
57 import org.simantics.db.impl.graph.ReadGraphImpl;
\r
58 import org.simantics.db.impl.graph.ReadGraphSupport;
\r
59 import org.simantics.db.impl.graph.WriteGraphImpl;
\r
60 import org.simantics.db.impl.procedure.IntProcedureAdapter;
\r
61 import org.simantics.db.impl.procedure.InternalProcedure;
\r
62 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
\r
63 import org.simantics.db.impl.support.ResourceSupport;
\r
64 import org.simantics.db.procedure.AsyncMultiListener;
\r
65 import org.simantics.db.procedure.AsyncMultiProcedure;
\r
66 import org.simantics.db.procedure.AsyncProcedure;
\r
67 import org.simantics.db.procedure.AsyncSetListener;
\r
68 import org.simantics.db.procedure.Listener;
\r
69 import org.simantics.db.procedure.ListenerBase;
\r
70 import org.simantics.db.procedure.MultiProcedure;
\r
71 import org.simantics.db.procedure.Procedure;
\r
72 import org.simantics.db.procedure.StatementProcedure;
\r
73 import org.simantics.db.request.AsyncMultiRead;
\r
74 import org.simantics.db.request.AsyncRead;
\r
75 import org.simantics.db.request.ExternalRead;
\r
76 import org.simantics.db.request.MultiRead;
\r
77 import org.simantics.db.request.Read;
\r
78 import org.simantics.db.request.RequestFlags;
\r
79 import org.simantics.db.request.WriteTraits;
\r
80 import org.simantics.layer0.Layer0;
\r
81 import org.simantics.utils.DataContainer;
\r
82 import org.simantics.utils.Development;
\r
83 import org.simantics.utils.datastructures.Pair;
\r
84 import org.simantics.utils.datastructures.collections.CollectionUtils;
\r
85 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
\r
87 import gnu.trove.map.hash.THashMap;
\r
88 import gnu.trove.procedure.TIntProcedure;
\r
89 import gnu.trove.procedure.TLongProcedure;
\r
90 import gnu.trove.procedure.TObjectProcedure;
\r
91 import gnu.trove.set.hash.THashSet;
\r
92 import gnu.trove.set.hash.TIntHashSet;
\r
94 @SuppressWarnings({"rawtypes", "unchecked"})
\r
95 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
\r
97 final public UnaryQueryHashMap<IntProcedure> directPredicatesMap;
\r
98 final public UnaryQueryHashMap<IntProcedure> principalTypesMap;
\r
99 final public THashMap<String, URIToResource> uriToResourceMap;
\r
100 final public THashMap<String, NamespaceIndex> namespaceIndexMap22;
\r
101 final public UnaryQueryHashMap<IntProcedure> projectsMap;
\r
102 final public UnaryQueryHashMap<InternalProcedure<RelationInfo>> relationInfoMap;
\r
103 final public UnaryQueryHashMap<InternalProcedure<IntSet>> superTypesMap;
\r
104 final public UnaryQueryHashMap<InternalProcedure<IntSet>> typeHierarchyMap;
\r
105 final public UnaryQueryHashMap<InternalProcedure<IntSet>> superRelationsMap;
\r
106 final public UnaryQueryHashMap<InternalProcedure<IntSet>> typesMap;
\r
107 final public UnaryQueryHashMap<InternalProcedure<byte[]>> valueMap;
\r
108 final public DoubleKeyQueryHashMap<IntProcedure> directObjectsMap;
\r
109 final public DoubleKeyQueryHashMap<IntProcedure> objectsMap;
\r
110 final public UnaryQueryHashMap<IntProcedure> orderedSetMap;
\r
111 final public UnaryQueryHashMap<IntProcedure> predicatesMap;
\r
112 final public DoubleKeyQueryHashMap<TripleIntProcedure> statementsMap;
\r
113 final public UnaryQueryHashMap<IntProcedure> assertedPredicatesMap;
\r
114 final public BinaryQueryHashMap<TripleIntProcedure> assertedStatementsMap;
\r
115 final public StableHashMap<ExternalRead, ExternalReadEntry> externalReadMap;
\r
116 final public StableHashMap<AsyncRead, AsyncReadEntry> asyncReadMap;
\r
117 final public StableHashMap<Read, ReadEntry> readMap;
\r
118 final public StableHashMap<AsyncMultiRead, AsyncMultiReadEntry> asyncMultiReadMap;
\r
119 final public StableHashMap<MultiRead, MultiReadEntry> multiReadMap;
\r
121 final private THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners;
\r
123 public static int indent = 0;
\r
125 public int size = 0;
\r
128 // Garbage collection
\r
130 public int boundQueries = 0;
\r
133 private int hits = 0;
\r
135 private int misses = 0;
\r
137 private int updates = 0;
\r
139 final private int functionalRelation;
\r
141 final private int superrelationOf;
\r
143 final private int instanceOf;
\r
145 final private int inverseOf;
\r
147 final private int asserts;
\r
149 final private int hasPredicate;
\r
151 final private int hasPredicateInverse;
\r
153 final private int hasObject;
\r
155 final private int inherits;
\r
157 final private int subrelationOf;
\r
159 final private int rootLibrary;
\r
162 * A cache for the root library resource. Initialized in
\r
163 * {@link #getRootLibraryResource()}.
\r
165 private volatile ResourceImpl rootLibraryResource;
\r
167 final private int library;
\r
169 final private int consistsOf;
\r
171 final private int hasName;
\r
173 AtomicInteger sleepers = new AtomicInteger(0);
\r
175 private boolean updating = false;
\r
177 static public boolean collecting = false;
\r
179 private boolean firingListeners = false;
\r
181 final public QuerySupport querySupport;
\r
182 final public Session session;
\r
183 final public ResourceSupport resourceSupport;
\r
185 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
\r
187 QueryThread[] executors;
\r
189 public ArrayList<SessionTask>[] queues;
\r
193 INIT, RUN, SLEEP, DISPOSED
\r
197 public ThreadState[] threadStates;
\r
198 public ReentrantLock[] threadLocks;
\r
199 public Condition[] threadConditions;
\r
201 public ArrayList<SessionTask>[] ownTasks;
\r
203 public ArrayList<SessionTask>[] ownSyncTasks;
\r
205 ArrayList<SessionTask>[] delayQueues;
\r
207 public boolean synch = true;
\r
209 final Object querySupportLock;
\r
211 public Long modificationCounter = 0L;
\r
213 public void close() {
\r
216 final public void scheduleOwn(int caller, SessionTask request) {
\r
217 ownTasks[caller].add(request);
\r
220 final public void scheduleAlways(int caller, SessionTask request) {
\r
222 int performer = request.thread;
\r
223 if(caller == performer) {
\r
224 ownTasks[caller].add(request);
\r
226 schedule(caller, request);
\r
231 final public void schedule(int caller, SessionTask request) {
\r
233 int performer = request.thread;
\r
235 if(DebugPolicy.SCHEDULE)
\r
236 System.out.println("schedule " + request + " " + caller + " -> " + performer);
\r
238 assert(performer >= 0);
\r
240 assert(request != null);
\r
242 if(caller == performer) {
\r
243 request.run(caller);
\r
245 ReentrantLock queueLock = threadLocks[performer];
\r
247 queues[performer].add(request);
\r
248 // This thread could have been sleeping
\r
249 if(queues[performer].size() == 1) {
\r
250 if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
\r
251 threadConditions[performer].signalAll();
\r
253 queueLock.unlock();
\r
261 final public int THREAD_MASK;
\r
262 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
\r
264 public static abstract class SessionTask {
\r
266 final public int thread;
\r
267 final public int syncCaller;
\r
268 final public Object object;
\r
270 public SessionTask(WriteTraits object, int thread) {
\r
271 this.thread = thread;
\r
272 this.syncCaller = -1;
\r
273 this.object = object;
\r
276 public SessionTask(Object object, int thread, int syncCaller) {
\r
277 this.thread = thread;
\r
278 this.syncCaller = syncCaller;
\r
279 this.object = object;
\r
282 public abstract void run(int thread);
\r
285 public String toString() {
\r
286 return "SessionTask[" + object + "]";
\r
291 public static abstract class SessionRead extends SessionTask {
\r
293 final public Semaphore notify;
\r
294 final public DataContainer<Throwable> throwable;
\r
296 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
\r
297 super(object, thread, thread);
\r
298 this.throwable = throwable;
\r
299 this.notify = notify;
\r
302 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
\r
303 super(object, thread, syncThread);
\r
304 this.throwable = throwable;
\r
305 this.notify = notify;
\r
310 long waitingTime = 0;
\r
312 static int koss = 0;
\r
313 static int koss2 = 0;
\r
315 public boolean resume(ReadGraphImpl graph) {
\r
316 return executors[0].runSynchronized();
\r
319 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
\r
320 throws DatabaseException {
\r
323 THREAD_MASK = threads - 1;
\r
325 querySupport = core;
\r
326 session = querySupport.getSession();
\r
327 resourceSupport = querySupport.getSupport();
\r
328 querySupportLock = core.getLock();
\r
330 executors = new QueryThread[THREADS];
\r
331 queues = new ArrayList[THREADS];
\r
332 threadLocks = new ReentrantLock[THREADS];
\r
333 threadConditions = new Condition[THREADS];
\r
334 threadStates = new ThreadState[THREADS];
\r
335 ownTasks = new ArrayList[THREADS];
\r
336 ownSyncTasks = new ArrayList[THREADS];
\r
337 delayQueues = new ArrayList[THREADS * THREADS];
\r
339 // freeSchedule = new AtomicInteger(0);
\r
341 for (int i = 0; i < THREADS * THREADS; i++) {
\r
342 delayQueues[i] = new ArrayList<SessionTask>();
\r
345 for (int i = 0; i < THREADS; i++) {
\r
347 // tasks[i] = new ArrayList<Runnable>();
\r
348 ownTasks[i] = new ArrayList<SessionTask>();
\r
349 ownSyncTasks[i] = new ArrayList<SessionTask>();
\r
350 queues[i] = new ArrayList<SessionTask>();
\r
351 threadLocks[i] = new ReentrantLock();
\r
352 threadConditions[i] = threadLocks[i].newCondition();
\r
353 // limits[i] = false;
\r
354 threadStates[i] = ThreadState.INIT;
\r
358 for (int i = 0; i < THREADS; i++) {
\r
360 final int index = i;
\r
362 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
\r
364 threadSet.add(executors[i]);
\r
368 directPredicatesMap = new UnaryQueryHashMap();
\r
369 valueMap = new UnaryQueryHashMap();
\r
370 principalTypesMap = new UnaryQueryHashMap();
\r
371 uriToResourceMap = new THashMap<String, URIToResource>();
\r
372 namespaceIndexMap22 = new THashMap<String, NamespaceIndex>();
\r
373 projectsMap = new UnaryQueryHashMap();
\r
374 relationInfoMap = new UnaryQueryHashMap();
\r
375 typeHierarchyMap = new UnaryQueryHashMap();
\r
376 superTypesMap = new UnaryQueryHashMap();
\r
377 superRelationsMap = new UnaryQueryHashMap();
\r
378 typesMap = new UnaryQueryHashMap();
\r
379 objectsMap = new DoubleKeyQueryHashMap();
\r
380 orderedSetMap = new UnaryQueryHashMap();
\r
381 predicatesMap = new UnaryQueryHashMap();
\r
382 statementsMap = new DoubleKeyQueryHashMap();
\r
383 directObjectsMap = new DoubleKeyQueryHashMap();
\r
384 assertedPredicatesMap = new UnaryQueryHashMap();
\r
385 assertedStatementsMap = new BinaryQueryHashMap();
\r
386 asyncReadMap = new StableHashMap<AsyncRead, AsyncReadEntry>();
\r
387 readMap = new StableHashMap<Read, ReadEntry>();
\r
388 asyncMultiReadMap = new StableHashMap<AsyncMultiRead, AsyncMultiReadEntry>();
\r
389 multiReadMap = new StableHashMap<MultiRead, MultiReadEntry>();
\r
390 externalReadMap = new StableHashMap<ExternalRead, ExternalReadEntry>();
\r
391 listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
\r
393 // Now start threads
\r
394 for (int i = 0; i < THREADS; i++) {
\r
395 executors[i].start();
\r
398 // Make sure that query threads are up and running
\r
399 while(sleepers.get() != THREADS) {
\r
402 } catch (InterruptedException e) {
\r
403 e.printStackTrace();
\r
407 rootLibrary = core.getBuiltin("http:/");
\r
408 boolean builtinsInstalled = rootLibrary != 0;
\r
410 if (builtinsInstalled) {
\r
411 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
\r
412 assert (functionalRelation != 0);
\r
414 functionalRelation = 0;
\r
416 if (builtinsInstalled) {
\r
417 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
\r
418 assert (instanceOf != 0);
\r
422 if (builtinsInstalled) {
\r
423 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
\r
424 assert (inverseOf != 0);
\r
429 if (builtinsInstalled) {
\r
430 inherits = core.getBuiltin(Layer0.URIs.Inherits);
\r
431 assert (inherits != 0);
\r
435 if (builtinsInstalled) {
\r
436 asserts = core.getBuiltin(Layer0.URIs.Asserts);
\r
437 assert (asserts != 0);
\r
441 if (builtinsInstalled) {
\r
442 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
\r
443 assert (hasPredicate != 0);
\r
447 if (builtinsInstalled) {
\r
448 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
\r
449 assert (hasPredicateInverse != 0);
\r
451 hasPredicateInverse = 0;
\r
453 if (builtinsInstalled) {
\r
454 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
\r
455 assert (hasObject != 0);
\r
459 if (builtinsInstalled) {
\r
460 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
\r
461 assert (subrelationOf != 0);
\r
465 if (builtinsInstalled) {
\r
466 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
\r
467 assert (superrelationOf != 0);
\r
469 superrelationOf = 0;
\r
471 if (builtinsInstalled) {
\r
472 library = core.getBuiltin(Layer0.URIs.Library);
\r
473 assert (library != 0);
\r
477 if (builtinsInstalled) {
\r
478 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
\r
479 assert (consistsOf != 0);
\r
483 if (builtinsInstalled) {
\r
484 hasName = core.getBuiltin(Layer0.URIs.HasName);
\r
485 assert (hasName != 0);
\r
491 final public void releaseWrite(ReadGraphImpl graph) {
\r
492 performDirtyUpdates(graph);
\r
493 modificationCounter++;
\r
496 final public int getId(final Resource r) {
\r
497 return querySupport.getId(r);
\r
500 public QuerySupport getCore() {
\r
501 return querySupport;
\r
504 public int getFunctionalRelation() {
\r
505 return functionalRelation;
\r
508 public int getInherits() {
\r
512 public int getInstanceOf() {
\r
516 public int getInverseOf() {
\r
520 public int getSubrelationOf() {
\r
521 return subrelationOf;
\r
524 public int getSuperrelationOf() {
\r
525 return superrelationOf;
\r
528 public int getAsserts() {
\r
532 public int getHasPredicate() {
\r
533 return hasPredicate;
\r
536 public int getHasPredicateInverse() {
\r
537 return hasPredicateInverse;
\r
540 public int getHasObject() {
\r
544 public int getRootLibrary() {
\r
545 return rootLibrary;
\r
548 public Resource getRootLibraryResource() {
\r
549 if (rootLibraryResource == null) {
\r
550 // Synchronization is not needed here, it doesn't matter if multiple
\r
551 // threads simultaneously set rootLibraryResource once.
\r
552 int root = getRootLibrary();
\r
554 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
\r
555 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
\r
557 return rootLibraryResource;
\r
560 public int getLibrary() {
\r
564 public int getConsistsOf() {
\r
568 public int getHasName() {
\r
572 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
\r
574 URIToResource.queryEach(graph, id, parent, null, new InternalProcedure<Integer>() {
\r
577 public void execute(ReadGraphImpl graph, Integer result) {
\r
579 if (result != null && result != 0) {
\r
580 procedure.execute(graph, result);
\r
584 // Fall back to using the fixed builtins.
\r
585 result = querySupport.getBuiltin(id);
\r
587 procedure.execute(graph, result);
\r
592 result = querySupport.getRandomAccessReference(id);
\r
593 } catch (ResourceNotFoundException e) {
\r
594 procedure.exception(graph, e);
\r
599 procedure.execute(graph, result);
\r
601 procedure.exception(graph, new ResourceNotFoundException(id));
\r
607 public void exception(ReadGraphImpl graph, Throwable t) {
\r
608 procedure.exception(graph, t);
\r
615 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
\r
617 Integer result = querySupport.getBuiltin(id);
\r
619 procedure.execute(graph, result);
\r
621 procedure.exception(graph, new ResourceNotFoundException(id));
\r
626 public final <T> void runAsyncRead(final ReadGraphImpl graph, final AsyncRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) {
\r
628 int hash = requestHash(query);
\r
630 AsyncReadEntry<T> entry = asyncReadMap.get(query, hash);
\r
632 if(parent == null && listener == null) {
\r
633 if(entry != null && (entry.isReady() || entry.isExcepted())) {
\r
634 System.out.println("ready " + query);
\r
635 entry.performFromCache(graph, this, procedure);
\r
636 // graph.state.barrier.dec(query);
\r
639 query.perform(graph, procedure);
\r
640 // graph.state.barrier.dec(query);
\r
645 if(entry == null) {
\r
647 entry = new AsyncReadEntry<T>(query);
\r
648 entry.setPending();
\r
649 entry.clearResult(querySupport);
\r
650 asyncReadMap.put(query, entry, hash);
\r
652 performForEach(graph, query, entry, parent, listener, procedure, false);
\r
656 if(entry.isPending()) {
\r
657 synchronized(entry) {
\r
658 if(entry.isPending()) {
\r
659 throw new IllegalStateException();
\r
660 // final AsyncBarrierImpl parentBarrier = graph.state.barrier;
\r
661 // if(entry.procs == null) entry.procs = new ArrayList<AsyncProcedure<T>>();
\r
662 // entry.procs.add(new AsyncProcedure<T>() {
\r
665 // public void execute(AsyncReadGraph graph, T result) {
\r
666 // procedure.execute(graph, result);
\r
667 // parentBarrier.dec(query);
\r
671 // public void exception(AsyncReadGraph graph, Throwable throwable) {
\r
672 // procedure.exception(graph, throwable);
\r
673 // parentBarrier.dec(query);
\r
677 // if(graph.parent != null || listener != null) {
\r
678 // registerDependencies(graph, entry, parent, listener, procedure, false);
\r
681 // query.perform(graph, procedure);
\r
689 if(entry.isReady()) {
\r
690 entry.performFromCache(graph, this, procedure);
\r
691 registerDependencies(graph, entry, parent, listener, procedure, false);
\r
693 performForEach(graph, query, entry, parent, listener, procedure, false);
\r
701 final static <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
\r
703 MultiReadEntry entry = cached != null ? cached : provider.multiReadMap.get(query);
\r
704 if(entry == null) {
\r
706 entry = new MultiReadEntry(query);
\r
707 entry.setPending();
\r
708 entry.clearResult(provider.querySupport);
\r
710 provider.multiReadMap.put(query, entry);
\r
712 provider.performForEach(graph, query, entry, parent, listener, procedure, false);
\r
716 if(entry.isPending()) {
\r
718 synchronized(entry) {
\r
720 if(entry.isPending()) {
\r
721 throw new IllegalStateException();
\r
723 // if(entry.procs == null) entry.procs = new ArrayList<Pair<AsyncMultiProcedure<T>, AsyncBarrier>>();
\r
724 // entry.procs.add(new Pair(procedure, parentBarrier));
\r
725 // if(graph.parent != null || listener != null) {
\r
726 // provider.registerDependencies(graph, entry, parent, listener, procedure, false);
\r
729 // If this was synchronized we must wait here until completion
\r
730 // if(graph.state.synchronizedExecution) {
\r
731 // while(entry.isPending()) {
\r
732 // graph.resumeTasks(graph.callerThread, null, null);
\r
741 entry.performFromCache(graph, provider, procedure);
\r
742 // graph.state.barrier.dec(query);
\r
747 provider.performForEach(graph, query, entry, parent, listener, procedure, false);
\r
755 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
\r
757 int hash = requestHash(query);
\r
759 AsyncMultiReadEntry entry = asyncMultiReadMap.get(query, hash);
\r
761 if(parent == null && listener == null) {
\r
762 if(entry != null && (entry.isReady() || entry.isExcepted())) {
\r
763 System.out.println("ready " + query);
\r
764 entry.performFromCache(graph, this, procedure);
\r
767 query.perform(graph, procedure);
\r
772 if(entry == null) {
\r
774 entry = new AsyncMultiReadEntry<T>(query);
\r
775 entry.setPending();
\r
776 entry.clearResult(querySupport);
\r
778 asyncMultiReadMap.put(query, entry, hash);
\r
780 performForEach(graph, query, entry, parent, listener, procedure, false);
\r
784 if(entry.isPending()) {
\r
786 synchronized(entry) {
\r
787 if(entry.isPending()) {
\r
788 throw new IllegalStateException();
\r
789 // if(entry.procs == null) entry.procs = new ArrayList<AsyncMultiProcedure<T>>();
\r
790 // entry.procs.add(procedure);
\r
791 // if(graph.parent != null || listener != null) {
\r
792 // registerDependencies(graph, entry, parent, listener, procedure, false);
\r
799 performForEach(graph, query, entry, parent, listener, procedure, false);
\r
805 final static <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final Procedure<T> procedure) {
\r
807 final ExternalReadEntry<T> entry = cached != null ? cached : provider.externalReadMap.get(query);
\r
808 if(entry == null) {
\r
809 provider.performForEach(graph, query, new ExternalReadEntry<T>(query), parent, listener, procedure, false);
\r
811 if(entry.isPending()) {
\r
812 synchronized(entry) {
\r
813 if(entry.isPending()) {
\r
814 throw new IllegalStateException();
\r
815 // if(entry.procs == null) entry.procs = new ArrayList<Procedure<T>>();
\r
816 // entry.procs.add(procedure);
\r
821 provider.performForEach(graph, query, entry, parent, listener, procedure, false);
\r
826 public int requestHash(Object object) {
\r
828 return object.hashCode();
\r
829 } catch (Throwable t) {
\r
830 Logger.defaultLogError(t);
\r
836 public <T> T queryRead(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws Throwable {
\r
838 assert(query != null);
\r
840 ReadEntry entry = readMap.get(query);
\r
842 if(entry != null) {
\r
843 if(parent == null && (listener == null || listener.isDisposed()) && entry.isReady()) {
\r
844 return (T)entry.get(graph, this, procedure);
\r
845 } else if (entry.isPending()) {
\r
846 throw new IllegalStateException();
\r
850 if(entry == null) {
\r
852 entry = new ReadEntry(query);
\r
853 entry.setPending();
\r
854 entry.clearResult(querySupport);
\r
856 readMap.put(query, entry);
\r
858 return (T)performForEach(graph, query, entry, parent, listener, procedure, false);
\r
862 if(entry.isPending()) {
\r
863 throw new IllegalStateException();
\r
865 return (T)performForEach(graph, query, entry, parent, listener, procedure, false);
\r
872 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
\r
874 assert(query != null);
\r
875 assert(procedure != null);
\r
877 final MultiReadEntry entry = multiReadMap.get(query);
\r
879 if(parent == null && !(listener != null)) {
\r
880 if(entry != null && entry.isReady()) {
\r
881 entry.performFromCache(graph, this, procedure);
\r
886 runMultiRead(graph, entry, query, parent, this, listener, procedure);
\r
890 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final Procedure<T> procedure) {
\r
892 assert(query != null);
\r
893 assert(procedure != null);
\r
895 final ExternalReadEntry entry = externalReadMap.get(query);
\r
897 if(parent == null && !(listener != null)) {
\r
898 if(entry != null && entry.isReady()) {
\r
899 entry.performFromCache(procedure);
\r
904 runPrimitiveRead(graph, entry, query, parent, this, listener, procedure);
\r
908 public <T> void performForEach(ReadGraphImpl parentGraph, final AsyncRead<T> query, final AsyncReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final AsyncProcedure<T> procedure,
\r
909 boolean inferredDependency) {
\r
911 if (DebugPolicy.PERFORM)
\r
912 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
\r
915 assert (!collecting);
\r
917 assert(!entry.isDiscarded());
\r
919 final ListenerEntry listenerEntry = registerDependencies(parentGraph, entry, parent, base, procedure, inferredDependency);
\r
921 // FRESH, REFUTED, EXCEPTED go here
\r
922 if (!entry.isReady()) {
\r
924 entry.setPending();
\r
930 final ReadGraphImpl finalParentGraph = parentGraph;
\r
932 query.perform(parentGraph.withParent(entry), new AsyncProcedure<T>() {
\r
935 public void execute(AsyncReadGraph returnGraph, T result) {
\r
936 ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
\r
937 //AsyncReadGraph resumeGraph = finalParentGraph.newAsync();
\r
938 entry.addOrSet(finalParentGraph, result);
\r
939 if(listenerEntry != null) {
\r
940 primeListenerEntry(listenerEntry, result);
\r
943 procedure.execute(finalParentGraph, result);
\r
944 } catch (Throwable t) {
\r
945 t.printStackTrace();
\r
947 // parentBarrier.dec(query);
\r
951 public void exception(AsyncReadGraph returnGraph, Throwable t) {
\r
952 ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
\r
953 // AsyncReadGraph resumeGraph = finalParentGraph.newAsync();
\r
954 entry.except(finalParentGraph, t);
\r
956 procedure.exception(finalParentGraph, t);
\r
957 } catch (Throwable t2) {
\r
958 t2.printStackTrace();
\r
960 // parentBarrier.dec(query);
\r
964 public String toString() {
\r
965 return procedure.toString();
\r
970 } catch (Throwable t) {
\r
974 procedure.exception(parentGraph, t);
\r
975 } catch (Throwable t2) {
\r
976 t2.printStackTrace();
\r
978 // parentBarrier.dec(query);
\r
986 entry.performFromCache(parentGraph, this, new AsyncProcedure<T>() {
\r
989 public void exception(AsyncReadGraph graph, Throwable throwable) {
\r
990 procedure.exception(graph, throwable);
\r
994 public void execute(AsyncReadGraph graph, T result) {
\r
995 procedure.execute(graph, result);
\r
996 if(listenerEntry != null) {
\r
997 primeListenerEntry(listenerEntry, result);
\r
1003 // parentBarrier.dec(query);
\r
1009 assert (!entry.isDiscarded());
\r
1013 public <T> T performForEach(final ReadGraphImpl graph, final Read<T> query, final ReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure,
\r
1014 boolean inferredDependency) throws Throwable {
\r
1016 if (DebugPolicy.PERFORM)
\r
1017 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
\r
1020 assert (!collecting);
\r
1022 entry.assertNotDiscarded();
\r
1024 if(entry.isReady()) {
\r
1026 // EXCEPTED goes here
\r
1028 // if(procedure != null) entry.performFromCache(graph, this, procedure);
\r
1029 // parentBarrier.dec(query);
\r
1032 ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
\r
1034 T result = (T)entry.get(graph, this, procedure);
\r
1036 if(listenerEntry != null) primeListenerEntry(listenerEntry, result);
\r
1042 // FRESH, REFUTED, PENDING go here
\r
1044 entry.setPending();
\r
1049 ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
\r
1051 final ReadGraphImpl performGraph = graph.newSync(entry);
\r
1055 if(Development.DEVELOPMENT)
\r
1056 Development.recordHistogram("run " + query);
\r
1058 T result = query.perform(performGraph);
\r
1059 entry.addOrSet(performGraph, result);
\r
1061 if(listenerEntry != null) primeListenerEntry(listenerEntry, result);
\r
1063 return (T)entry.get(graph, this, procedure);
\r
1065 } catch (Throwable t) {
\r
1068 return (T)entry.get(graph, this, procedure);
\r
1076 public <T> void performForEach(final ReadGraphImpl graph, final MultiRead<T> query, final MultiReadEntry<T> entry, CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,
\r
1077 boolean inferredDependency) {
\r
1079 if (DebugPolicy.PERFORM)
\r
1080 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
\r
1083 assert (!collecting);
\r
1085 assert(!entry.isPending());
\r
1086 assert(!entry.isDiscarded());
\r
1088 // FRESH, REFUTED, EXCEPTED go here
\r
1089 if (!entry.isReady()) {
\r
1091 entry.setPending();
\r
1092 entry.clearResult(querySupport);
\r
1094 multiReadMap.put(query, entry);
\r
1097 final ReadGraphImpl newGraph = graph.newSync(entry);
\r
1098 // newGraph.state.barrier.inc();
\r
1102 query.perform(newGraph, new AsyncMultiProcedure<T>() {
\r
1105 public void execute(AsyncReadGraph graph, T result) {
\r
1106 entry.addOrSet(result);
\r
1108 procedure.execute(graph, result);
\r
1109 } catch (Throwable t) {
\r
1110 t.printStackTrace();
\r
1115 public void finished(AsyncReadGraph graph) {
\r
1116 entry.finish(graph);
\r
1118 procedure.finished(graph);
\r
1119 } catch (Throwable t) {
\r
1120 t.printStackTrace();
\r
1122 // newGraph.state.barrier.dec();
\r
1123 // parentBarrier.dec();
\r
1127 public void exception(AsyncReadGraph graph, Throwable t) {
\r
1130 procedure.exception(graph, t);
\r
1131 } catch (Throwable t2) {
\r
1132 t2.printStackTrace();
\r
1134 // newGraph.state.barrier.dec();
\r
1135 // parentBarrier.dec();
\r
1140 } catch (DatabaseException e) {
\r
1144 procedure.exception(graph, e);
\r
1145 } catch (Throwable t2) {
\r
1146 t2.printStackTrace();
\r
1148 // newGraph.state.barrier.dec();
\r
1149 // parentBarrier.dec();
\r
1151 } catch (Throwable t) {
\r
1153 DatabaseException e = new DatabaseException(t);
\r
1157 procedure.exception(graph, e);
\r
1158 } catch (Throwable t2) {
\r
1159 t2.printStackTrace();
\r
1161 // newGraph.state.barrier.dec();
\r
1162 // parentBarrier.dec();
\r
1170 entry.performFromCache(graph, this, procedure);
\r
1176 assert (!entry.isDiscarded());
\r
1178 registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
\r
1183 public <T> void performForEach(final ReadGraphImpl callerGraph, AsyncMultiRead<T> query, final AsyncMultiReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,
\r
1184 boolean inferredDependency) {
\r
1186 if (DebugPolicy.PERFORM)
\r
1187 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
\r
1190 assert (!collecting);
\r
1194 assert(!entry.isDiscarded());
\r
1196 // FRESH, REFUTED, EXCEPTED go here
\r
1197 if (!entry.isReady()) {
\r
1203 ReadGraphImpl performGraph = callerGraph.withAsyncParent(entry);
\r
1205 query.perform(performGraph, new AsyncMultiProcedure<T>() {
\r
1208 public void execute(AsyncReadGraph graph, T result) {
\r
1209 ReadGraphImpl impl = (ReadGraphImpl)graph;
\r
1210 // ReadGraphImpl executeGraph = callerGraph.newAsync();
\r
1211 entry.addOrSet(result);
\r
1213 procedure.execute(callerGraph, result);
\r
1214 } catch (Throwable t) {
\r
1215 t.printStackTrace();
\r
1220 public void finished(AsyncReadGraph graph) {
\r
1221 ReadGraphImpl impl = (ReadGraphImpl)graph;
\r
1222 // ReadGraphImpl executeGraph = callerGraph.newAsync();
\r
1223 entry.finish(callerGraph);
\r
1225 procedure.finished(callerGraph);
\r
1226 } catch (Throwable t) {
\r
1227 t.printStackTrace();
\r
1232 public void exception(AsyncReadGraph graph, Throwable t) {
\r
1233 ReadGraphImpl impl = (ReadGraphImpl)graph;
\r
1234 // ReadGraphImpl executeGraph = callerGraph.newAsync();
\r
1235 entry.except(callerGraph, t);
\r
1237 procedure.exception(callerGraph, t);
\r
1238 } catch (Throwable t2) {
\r
1239 t2.printStackTrace();
\r
1245 } catch (Throwable t) {
\r
1249 procedure.exception(callerGraph, t);
\r
1250 } catch (Throwable t2) {
\r
1251 t2.printStackTrace();
\r
1261 entry.performFromCache(callerGraph, this, procedure);
\r
1267 assert (!entry.isDiscarded());
\r
1269 registerDependencies(callerGraph, entry, parent, listener, procedure, inferredDependency);
\r
1271 } catch (Throwable t) {
\r
1273 Logger.defaultLogError(t);
\r
1281 public <T> void performForEach(ReadGraphImpl graph, final ExternalRead<T> query, final ExternalReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final Procedure<T> procedure,
\r
1282 boolean inferredDependency) {
\r
1284 if (DebugPolicy.PERFORM)
\r
1285 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
\r
1288 assert (!collecting);
\r
1290 assert(!entry.isPending());
\r
1291 assert(!entry.isDiscarded());
\r
1293 registerDependencies(graph, entry, parent, base, procedure, inferredDependency);
\r
1295 // FRESH, REFUTED, EXCEPTED go here
\r
1296 if (!entry.isReady()) {
\r
1298 entry.setPending();
\r
1299 entry.clearResult(querySupport);
\r
1301 externalReadMap.put(query, entry);
\r
1306 query.register(graph, new Listener<T>() {
\r
1308 AtomicBoolean used = new AtomicBoolean(false);
\r
1311 public void execute(T result) {
\r
1313 // Just for safety
\r
1314 if(entry.isDiscarded()) return;
\r
1315 if(entry.isExcepted()) entry.setPending();
\r
1317 if(used.compareAndSet(false, true)) {
\r
1318 entry.addOrSet(QueryProcessor.this, result);
\r
1319 procedure.execute(result);
\r
1321 entry.queue(result);
\r
1322 updatePrimitive(query);
\r
1328 public void exception(Throwable t) {
\r
1332 if(used.compareAndSet(false, true)) {
\r
1333 procedure.exception(t);
\r
1335 // entry.queue(result);
\r
1336 updatePrimitive(query);
\r
1342 public String toString() {
\r
1343 return procedure.toString();
\r
1347 public boolean isDisposed() {
\r
1348 return entry.isDiscarded() || !isBound(entry);
\r
1353 } catch (Throwable t) {
\r
1356 procedure.exception(t);
\r
1364 entry.performFromCache(procedure);
\r
1370 assert (!entry.isDiscarded());
\r
1374 private boolean isBound(ExternalReadEntry<?> entry) {
\r
1375 if(entry.hasParents()) return true;
\r
1376 else if(hasListener(entry)) return true;
\r
1377 else return false;
\r
1380 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
\r
1382 if (parent != null && !inferred) {
\r
1384 if(!child.isImmutable(graph))
\r
1385 child.addParent(parent);
\r
1386 } catch (DatabaseException e) {
\r
1387 Logger.defaultLogError(e);
\r
1389 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
\r
1392 if (listener != null) {
\r
1393 return registerListener(child, listener, procedure);
\r
1400 public <Procedure> void performForEach(ReadGraphImpl graph, BinaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {
\r
1402 if (DebugPolicy.PERFORM)
\r
1403 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
\r
1406 assert (!collecting);
\r
1410 registerDependencies(graph, query, parent, listener, procedure, false);
\r
1412 // FRESH, REFUTED, EXCEPTED go here
\r
1413 if (!query.isReady()) {
\r
1415 boolean fresh = query.isFresh();
\r
1421 query.computeForEach(graph, this, procedure, true);
\r
1427 query.performFromCache(graph, this, procedure);
\r
1433 } catch (Throwable t) {
\r
1435 Logger.defaultLogError(t);
\r
1440 public <Procedure> Object performForEach(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {
\r
1442 if (DebugPolicy.PERFORM)
\r
1443 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
\r
1446 assert (!collecting);
\r
1450 assert(query.assertNotDiscarded());
\r
1452 registerDependencies(graph, query, parent, listener, procedure, false);
\r
1454 // FRESH, REFUTED, EXCEPTED go here
\r
1455 if (!query.isReady()) {
\r
1460 return query.computeForEach(graph, this, procedure, true);
\r
1467 return query.performFromCache(graph, this, procedure);
\r
1471 } catch (Throwable t) {
\r
1473 Logger.defaultLogError(t);
\r
1480 static class Dummy implements InternalProcedure<Object>, IntProcedure {
\r
1483 public void execute(ReadGraphImpl graph, int i) {
\r
1487 public void finished(ReadGraphImpl graph) {
\r
1491 public void execute(ReadGraphImpl graph, Object result) {
\r
1495 public void exception(ReadGraphImpl graph, Throwable throwable) {
\r
1500 private static final Dummy dummy = new Dummy();
\r
1502 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
\r
1504 if (DebugPolicy.PERFORM)
\r
1505 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
\r
1508 assert (!collecting);
\r
1510 assert(query.assertNotDiscarded());
\r
1512 registerDependencies(graph, query, parent, listener, procedure, false);
\r
1514 // FRESH, REFUTED, EXCEPTED go here
\r
1515 if (!query.isReady()) {
\r
1520 query.computeForEach(graph, this, (Procedure)dummy, true);
\r
1521 return query.get(graph, this, null);
\r
1527 return query.get(graph, this, procedure);
\r
1533 public <Procedure> void performForEach(ReadGraphImpl graph, StringQuery<Procedure> query, CacheEntry parent, final ListenerBase listener, Procedure procedure) {
\r
1535 if (DebugPolicy.PERFORM)
\r
1536 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
\r
1539 assert (!collecting);
\r
1543 if(query.isDiscarded()) {
\r
1544 System.err.println("aff");
\r
1546 assert(!query.isDiscarded());
\r
1548 // FRESH, REFUTED, EXCEPTED go here
\r
1549 if (!query.isReady()) {
\r
1551 query.computeForEach(graph.withAsyncParent(query), this, procedure);
\r
1558 query.performFromCache(graph, this, procedure);
\r
1564 assert (!query.isDiscarded());
\r
1566 registerDependencies(graph, query, parent, listener, procedure, false);
\r
1568 } catch (Throwable t) {
\r
1570 t.printStackTrace();
\r
1571 Logger.defaultLogError(t);
\r
1577 interface QueryCollectorSupport {
\r
1578 public CacheCollectionResult allCaches();
\r
1579 public Collection<CacheEntry> getRootList();
\r
1580 public int getCurrentSize();
\r
1581 public int calculateCurrentSize();
\r
1582 public CacheEntryBase iterate(int level);
\r
1583 public void remove();
\r
1584 public void setLevel(CacheEntryBase entry, int level);
\r
1585 public boolean start(boolean flush);
\r
1588 interface QueryCollector {
\r
1590 public void collect(int youngTarget, int allowedTimeInMs);
\r
1594 class QueryCollectorSupportImpl implements QueryCollectorSupport {
\r
1596 private static final boolean DEBUG = false;
\r
1597 private static final double ITERATION_RATIO = 0.2;
\r
1599 private CacheCollectionResult iteration = new CacheCollectionResult();
\r
1600 private boolean fresh = true;
\r
1601 private boolean needDataInStart = true;
\r
1603 QueryCollectorSupportImpl() {
\r
1604 iteration.restart();
\r
1607 public CacheCollectionResult allCaches() {
\r
1608 CacheCollectionResult result = new CacheCollectionResult();
\r
1609 QueryProcessor.this.allCaches(result);
\r
1614 public boolean start(boolean flush) {
\r
1615 // We need new data from query maps
\r
1617 if(needDataInStart || flush) {
\r
1618 // Last run ended after processing all queries => refresh data
\r
1619 restart(flush ? 0.0 : ITERATION_RATIO);
\r
1621 // continue with previous big data
\r
1623 // Notify caller about iteration situation
\r
1624 return iteration.isAtStart();
\r
1627 private void restart(double targetRatio) {
\r
1629 needDataInStart = true;
\r
1631 long start = System.nanoTime();
\r
1634 // We need new data from query maps
\r
1636 int iterationSize = iteration.size()+1;
\r
1637 int diff = calculateCurrentSize()-iterationSize;
\r
1639 double ratio = (double)diff / (double)iterationSize;
\r
1640 boolean dirty = Math.abs(ratio) >= targetRatio;
\r
1643 iteration = allCaches();
\r
1645 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
\r
1646 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
\r
1647 System.err.print(" " + iteration.levels[i].size());
\r
1648 System.err.println("");
\r
1651 iteration.restart();
\r
1655 needDataInStart = false;
\r
1657 // We are returning here within the same GC round - reuse the cache table
\r
1658 iteration.restart();
\r
1666 public CacheEntryBase iterate(int level) {
\r
1668 CacheEntryBase entry = iteration.next(level);
\r
1669 if(entry == null) {
\r
1670 restart(ITERATION_RATIO);
\r
1674 while(entry != null && entry.isDiscarded()) {
\r
1675 entry = iteration.next(level);
\r
1683 public void remove() {
\r
1684 iteration.remove();
\r
1688 public void setLevel(CacheEntryBase entry, int level) {
\r
1689 iteration.setLevel(entry, level);
\r
1692 public Collection<CacheEntry> getRootList() {
\r
1694 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>();
\r
1696 for (Object e : valueMap.values()) {
\r
1697 result.add((CacheEntry) e);
\r
1699 for (Object e : directPredicatesMap.values()) {
\r
1700 result.add((CacheEntry) e);
\r
1702 for (Object e : objectsMap.values()) {
\r
1703 result.add((CacheEntry) e);
\r
1705 for (Object e : directObjectsMap.values()) {
\r
1706 result.add((CacheEntry) e);
\r
1708 for (Object e : principalTypesMap.values()) {
\r
1709 result.add((CacheEntry) e);
\r
1711 for (Object e : superRelationsMap.values()) {
\r
1712 result.add((CacheEntry) e);
\r
1714 for (Object e : superTypesMap.values()) {
\r
1715 result.add((CacheEntry) e);
\r
1717 for (Object e : typesMap.values()) {
\r
1718 result.add((CacheEntry) e);
\r
1720 for (Object e : objectsMap.values()) {
\r
1721 result.add((CacheEntry) e);
\r
1723 for (Object e : assertedStatementsMap.values()) {
\r
1724 result.add((CacheEntry) e);
\r
1726 for (Object e : readMap.values()) {
\r
1727 if(e instanceof CacheEntry) {
\r
1728 result.add((CacheEntry) e);
\r
1730 System.err.println("e=" + e);
\r
1733 for (Object e : asyncReadMap.values()) {
\r
1734 if(e instanceof CacheEntry) {
\r
1735 result.add((CacheEntry) e);
\r
1737 System.err.println("e=" + e);
\r
1740 for (Object e : externalReadMap.values()) {
\r
1741 result.add((CacheEntry) e);
\r
1743 for (Object e : orderedSetMap.values()) {
\r
1744 result.add((CacheEntry) e);
\r
1752 public int calculateCurrentSize() {
\r
1756 realSize += directPredicatesMap.size();
\r
1757 realSize += principalTypesMap.size();
\r
1758 realSize += uriToResourceMap.size();
\r
1759 realSize += namespaceIndexMap22.size();
\r
1760 realSize += projectsMap.size();
\r
1762 realSize += relationInfoMap.size();
\r
1763 realSize += superTypesMap.size();
\r
1764 realSize += typeHierarchyMap.size();
\r
1765 realSize += superRelationsMap.size();
\r
1766 realSize += typesMap.size();
\r
1768 realSize += valueMap.size();
\r
1769 realSize += directObjectsMap.size();
\r
1770 realSize += objectsMap.size();
\r
1771 realSize += orderedSetMap.size();
\r
1772 realSize += predicatesMap.size();
\r
1774 realSize += statementsMap.size();
\r
1775 realSize += assertedPredicatesMap.size();
\r
1776 realSize += assertedStatementsMap.size();
\r
1777 realSize += externalReadMap.size();
\r
1778 realSize += asyncReadMap.size();
\r
1780 realSize += readMap.size();
\r
1781 realSize += asyncMultiReadMap.size();
\r
1782 realSize += multiReadMap.size();
\r
1791 public int getCurrentSize() {
\r
1796 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
\r
1798 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
\r
1799 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
\r
1801 public int querySize() {
\r
1805 public void gc(int youngTarget, int allowedTimeInMs) {
\r
1807 collector.collect(youngTarget, allowedTimeInMs);
\r
1811 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
\r
1813 assert (entry != null);
\r
1815 if (base.isDisposed())
\r
1818 return addListener(entry, base, procedure);
\r
1822 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
\r
1823 entry.setLastKnown(result);
\r
1826 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
\r
1828 assert (entry != null);
\r
1829 assert (procedure != null);
\r
1831 ArrayList<ListenerEntry> list = listeners.get(entry);
\r
1832 if (list == null) {
\r
1833 list = new ArrayList<ListenerEntry>(1);
\r
1834 listeners.put(entry, list);
\r
1837 ListenerEntry result = new ListenerEntry(entry, base, procedure);
\r
1838 int currentIndex = list.indexOf(result);
\r
1839 // There was already a listener
\r
1840 if(currentIndex > -1) {
\r
1841 ListenerEntry current = list.get(currentIndex);
\r
1842 if(!current.base.isDisposed()) return null;
\r
1843 list.set(currentIndex, result);
\r
1848 if(DebugPolicy.LISTENER) {
\r
1849 new Exception().printStackTrace();
\r
1850 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
\r
1857 private void scheduleListener(ListenerEntry entry) {
\r
1858 assert (entry != null);
\r
1859 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
\r
1860 scheduledListeners.add(entry);
\r
1863 private void removeListener(ListenerEntry entry) {
\r
1864 assert (entry != null);
\r
1865 ArrayList<ListenerEntry> list = listeners.get(entry.entry);
\r
1866 if(list == null) return;
\r
1867 boolean success = list.remove(entry);
\r
1869 if (list.isEmpty())
\r
1870 listeners.remove(entry.entry);
\r
1873 private boolean hasListener(CacheEntry entry) {
\r
1874 if(listeners.get(entry) != null) return true;
\r
1878 boolean hasListenerAfterDisposing(CacheEntry entry) {
\r
1879 if(listeners.get(entry) != null) {
\r
1880 ArrayList<ListenerEntry> entries = listeners.get(entry);
\r
1881 ArrayList<ListenerEntry> list = null;
\r
1882 for (ListenerEntry e : entries) {
\r
1883 if (e.base.isDisposed()) {
\r
1884 if(list == null) list = new ArrayList<ListenerEntry>();
\r
1888 if(list != null) {
\r
1889 for (ListenerEntry e : list) {
\r
1890 entries.remove(e);
\r
1893 if (entries.isEmpty()) {
\r
1894 listeners.remove(entry);
\r
1902 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
\r
1903 hasListenerAfterDisposing(entry);
\r
1904 if(listeners.get(entry) != null)
\r
1905 return listeners.get(entry);
\r
1907 return Collections.emptyList();
\r
1910 void processListenerReport(CacheEntry entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
\r
1912 if(!workarea.containsKey(entry)) {
\r
1914 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
\r
1915 for(ListenerEntry e : getListenerEntries(entry))
\r
1918 workarea.put(entry, ls);
\r
1920 for(CacheEntry parent : entry.getParents(this)) {
\r
1921 processListenerReport(parent, workarea);
\r
1922 ls.addAll(workarea.get(parent));
\r
1929 public synchronized ListenerReport getListenerReport() throws IOException {
\r
1931 class ListenerReportImpl implements ListenerReport {
\r
1933 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
\r
1936 public void print(PrintStream b) {
\r
1937 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
\r
1938 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
\r
1939 for(ListenerBase l : e.getValue()) {
\r
1940 Integer i = hist.get(l);
\r
1941 hist.put(l, i != null ? i-1 : -1);
\r
1945 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
\r
1946 b.print("" + -p.second + " " + p.first + "\n");
\r
1954 ListenerReportImpl result = new ListenerReportImpl();
\r
1956 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
\r
1957 for(CacheEntryBase entry : all) {
\r
1958 hasListenerAfterDisposing(entry);
\r
1960 for(CacheEntryBase entry : all) {
\r
1961 processListenerReport(entry, result.workarea);
\r
1968 public synchronized String reportListeners(File file) throws IOException {
\r
1971 return "Disposed!";
\r
1973 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
\r
1974 ListenerReport report = getListenerReport();
\r
1977 return "Done reporting listeners.";
\r
1981 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
\r
1983 if(entry.isDiscarded()) return;
\r
1984 if(workarea.containsKey(entry)) return;
\r
1986 Iterable<CacheEntry> parents = entry.getParents(this);
\r
1987 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
\r
1988 for(CacheEntry e : parents) {
\r
1989 if(e.isDiscarded()) continue;
\r
1991 processParentReport(e, workarea);
\r
1993 workarea.put(entry, ps);
\r
1997 public synchronized String reportQueryActivity(File file) throws IOException {
\r
1999 System.err.println("reportQueries " + file.getAbsolutePath());
\r
2002 return "Disposed!";
\r
2004 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
\r
2006 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
\r
2007 Collections.reverse(entries);
\r
2009 for(Pair<String,Integer> entry : entries) {
\r
2010 b.println(entry.first + ": " + entry.second);
\r
2015 Development.histogram.clear();
\r
2021 public synchronized String reportQueries(File file) throws IOException {
\r
2023 System.err.println("reportQueries " + file.getAbsolutePath());
\r
2026 return "Disposed!";
\r
2028 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
\r
2030 long start = System.nanoTime();
\r
2032 // ArrayList<CacheEntry> all = ;
\r
2034 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
\r
2035 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
\r
2036 for(CacheEntryBase entry : caches) {
\r
2037 processParentReport(entry, workarea);
\r
2040 // for(CacheEntry e : all) System.err.println("entry: " + e);
\r
2042 long duration = System.nanoTime() - start;
\r
2043 System.err.println("Query root set in " + 1e-9*duration + "s.");
\r
2045 start = System.nanoTime();
\r
2047 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
\r
2049 int listeners = 0;
\r
2051 for(CacheEntry entry : workarea.keySet()) {
\r
2052 boolean listener = hasListenerAfterDisposing(entry);
\r
2053 boolean hasParents = entry.getParents(this).iterator().hasNext();
\r
2056 flagMap.put(entry, 0);
\r
2057 } else if (!hasParents) {
\r
2059 flagMap.put(entry, 1);
\r
2062 flagMap.put(entry, 2);
\r
2064 // // Write leaf bit
\r
2065 // entry.flags |= 4;
\r
2068 boolean done = true;
\r
2075 long start2 = System.nanoTime();
\r
2077 int boundCounter = 0;
\r
2078 int unboundCounter = 0;
\r
2079 int unknownCounter = 0;
\r
2081 for(CacheEntry entry : workarea.keySet()) {
\r
2083 //System.err.println("process " + entry);
\r
2085 int flags = flagMap.get(entry);
\r
2086 int bindStatus = flags & 3;
\r
2088 if(bindStatus == 0) boundCounter++;
\r
2089 else if(bindStatus == 1) unboundCounter++;
\r
2090 else if(bindStatus == 2) unknownCounter++;
\r
2092 if(bindStatus < 2) continue;
\r
2094 int newStatus = 1;
\r
2095 for(CacheEntry parent : entry.getParents(this)) {
\r
2097 if(parent.isDiscarded()) flagMap.put(parent, 1);
\r
2099 int flags2 = flagMap.get(parent);
\r
2100 int bindStatus2 = flags2 & 3;
\r
2101 // Parent is bound => child is bound
\r
2102 if(bindStatus2 == 0) {
\r
2106 // Parent is unknown => child is unknown
\r
2107 else if (bindStatus2 == 2) {
\r
2114 flagMap.put(entry, newStatus);
\r
2118 duration = System.nanoTime() - start2;
\r
2119 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
\r
2120 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
\r
2122 } while(!done && loops++ < 20);
\r
2126 for(CacheEntry entry : workarea.keySet()) {
\r
2128 int bindStatus = flagMap.get(entry);
\r
2129 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
\r
2135 duration = System.nanoTime() - start;
\r
2136 System.err.println("Query analysis in " + 1e-9*duration + "s.");
\r
2138 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
\r
2140 for(CacheEntry entry : workarea.keySet()) {
\r
2141 Class<?> clazz = entry.getClass();
\r
2142 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
\r
2143 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
\r
2144 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
\r
2145 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
\r
2146 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
\r
2147 Integer c = counts.get(clazz);
\r
2148 if(c == null) counts.put(clazz, -1);
\r
2149 else counts.put(clazz, c-1);
\r
2152 b.print("// Simantics DB client query report file\n");
\r
2153 b.print("// This file contains the following information\n");
\r
2154 b.print("// -The amount of cached query instances per query class\n");
\r
2155 b.print("// -The sizes of retained child sets\n");
\r
2156 b.print("// -List of parents for each query (search for 'P <query name>')\n");
\r
2157 b.print("// -Followed by status, where\n");
\r
2158 b.print("// -0=bound\n");
\r
2159 b.print("// -1=free\n");
\r
2160 b.print("// -2=unknown\n");
\r
2161 b.print("// -L=has listener\n");
\r
2162 b.print("// -List of children for each query (search for 'C <query name>')\n");
\r
2164 b.print("----------------------------------------\n");
\r
2166 b.print("// Queries by class\n");
\r
2167 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
\r
2168 b.print(-p.second + " " + p.first.getName() + "\n");
\r
2171 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
\r
2172 for(CacheEntry e : workarea.keySet())
\r
2175 boolean changed = true;
\r
2177 while(changed && iter++<50) {
\r
2181 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
\r
2182 for(CacheEntry e : workarea.keySet())
\r
2183 newHist.put(e, -1);
\r
2185 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
\r
2186 Integer c = hist.get(e.getKey());
\r
2187 for(CacheEntry p : e.getValue()) {
\r
2188 Integer i = newHist.get(p);
\r
2189 newHist.put(p, i+c);
\r
2192 for(CacheEntry e : workarea.keySet()) {
\r
2193 Integer value = newHist.get(e);
\r
2194 Integer old = hist.get(e);
\r
2195 if(!value.equals(old)) {
\r
2196 hist.put(e, value);
\r
2197 // System.err.println("hist " + e + ": " + old + " => " + value);
\r
2202 System.err.println("Retained set iteration " + iter);
\r
2206 b.print("// Queries by retained set\n");
\r
2207 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
\r
2208 b.print("" + -p.second + " " + p.first + "\n");
\r
2211 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
\r
2213 b.print("// Entry parent listing\n");
\r
2214 for(CacheEntry entry : workarea.keySet()) {
\r
2215 int status = flagMap.get(entry);
\r
2216 boolean hasListener = hasListenerAfterDisposing(entry);
\r
2217 b.print("Q " + entry.toString());
\r
2219 b.print(" (L" + status + ")");
\r
2222 b.print(" (" + status + ")");
\r
2225 for(CacheEntry parent : workarea.get(entry)) {
\r
2226 Collection<CacheEntry> inv = inverse.get(parent);
\r
2228 inv = new ArrayList<CacheEntry>();
\r
2229 inverse.put(parent, inv);
\r
2232 b.print(" " + parent.toString());
\r
2237 b.print("// Entry child listing\n");
\r
2238 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
\r
2239 b.print("C " + entry.getKey().toString());
\r
2241 for(CacheEntry child : entry.getValue()) {
\r
2242 Integer h = hist.get(child);
\r
2246 b.print(" <no children>");
\r
2248 b.print(" " + child.toString());
\r
2253 b.print("#queries: " + workarea.keySet().size() + "\n");
\r
2254 b.print("#listeners: " + listeners + "\n");
\r
2258 return "Dumped " + workarea.keySet().size() + " queries.";
\r
2262 class UpdateEntry {
\r
2264 public CacheEntry caller;
\r
2266 public CacheEntry entry;
\r
2268 public int indent;
\r
2270 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
\r
2271 this.caller = caller;
\r
2272 this.entry = entry;
\r
2273 this.indent = indent;
\r
2278 boolean removeQuery(CacheEntry entry) {
\r
2280 // This entry has been removed before. No need to do anything here.
\r
2281 if(entry.isDiscarded()) return false;
\r
2283 assert (!entry.isDiscarded());
\r
2285 Query query = entry.getQuery();
\r
2287 query.removeEntry(this);
\r
2292 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
\r
2303 * @return true if this entry is being listened
\r
2305 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
\r
2307 assert (e != null);
\r
2309 CacheEntry entry = e.entry;
\r
2311 // System.err.println("updateQuery " + entry);
\r
2314 * If the dependency graph forms a DAG, some entries are inserted in the
\r
2315 * todo list many times. They only need to be processed once though.
\r
2317 if (entry.isDiscarded()) {
\r
2318 if (Development.DEVELOPMENT) {
\r
2319 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
\r
2320 System.out.print("D");
\r
2321 for (int i = 0; i < e.indent; i++)
\r
2322 System.out.print(" ");
\r
2323 System.out.println(entry.getQuery());
\r
2326 // System.err.println(" => DISCARDED");
\r
2330 if (entry.isRefuted()) {
\r
2331 if (Development.DEVELOPMENT) {
\r
2332 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
\r
2333 System.out.print("R");
\r
2334 for (int i = 0; i < e.indent; i++)
\r
2335 System.out.print(" ");
\r
2336 System.out.println(entry.getQuery());
\r
2342 if (entry.isExcepted()) {
\r
2343 if (Development.DEVELOPMENT) {
\r
2344 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
\r
2345 System.out.print("E");
\r
2350 if (entry.isPending()) {
\r
2351 if (Development.DEVELOPMENT) {
\r
2352 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
\r
2353 System.out.print("P");
\r
2360 if (Development.DEVELOPMENT) {
\r
2361 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
\r
2362 System.out.print("U ");
\r
2363 for (int i = 0; i < e.indent; i++)
\r
2364 System.out.print(" ");
\r
2365 System.out.print(entry.getQuery());
\r
2369 Query query = entry.getQuery();
\r
2370 int type = query.type();
\r
2372 boolean hasListener = hasListener(entry);
\r
2374 if (Development.DEVELOPMENT) {
\r
2375 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
\r
2376 if(hasListener(entry)) {
\r
2377 System.out.println(" (L)");
\r
2379 System.out.println("");
\r
2384 if(entry.isPending() || entry.isExcepted()) {
\r
2387 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
\r
2389 immediates.put(entry, entry);
\r
2396 removeQuery(entry);
\r
2404 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
\r
2406 immediates.put(entry, entry);
\r
2413 removeQuery(entry);
\r
2420 // System.err.println(" => FOO " + type);
\r
2422 if (hasListener) {
\r
2423 ArrayList<ListenerEntry> entries = listeners.get(entry);
\r
2424 if(entries != null) {
\r
2425 for (ListenerEntry le : entries) {
\r
2426 scheduleListener(le);
\r
2431 // If invalid, update parents
\r
2432 if (type == RequestFlags.INVALIDATE) {
\r
2433 updateParents(e.indent, entry, todo);
\r
2436 return hasListener;
\r
2440 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
\r
2442 Iterable<CacheEntry> oldParents = entry.getParents(this);
\r
2443 for (CacheEntry parent : oldParents) {
\r
2444 // System.err.println("updateParents " + entry + " => " + parent);
\r
2445 if(!parent.isDiscarded())
\r
2446 todo.push(new UpdateEntry(entry, parent, indent + 2));
\r
2451 private boolean pruneListener(ListenerEntry entry) {
\r
2452 if (entry.base.isDisposed()) {
\r
2453 removeListener(entry);
\r
2461 * @param av1 an array (guaranteed)
\r
2462 * @param av2 any object
\r
2463 * @return <code>true</code> if the two arrays are equal
\r
2465 private final boolean arrayEquals(Object av1, Object av2) {
\r
2468 Class<?> c1 = av1.getClass().getComponentType();
\r
2469 Class<?> c2 = av2.getClass().getComponentType();
\r
2470 if (c2 == null || !c1.equals(c2))
\r
2472 boolean p1 = c1.isPrimitive();
\r
2473 boolean p2 = c2.isPrimitive();
\r
2477 return Arrays.equals((Object[]) av1, (Object[]) av2);
\r
2478 if (boolean.class.equals(c1))
\r
2479 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
\r
2480 else if (byte.class.equals(c1))
\r
2481 return Arrays.equals((byte[]) av1, (byte[]) av2);
\r
2482 else if (int.class.equals(c1))
\r
2483 return Arrays.equals((int[]) av1, (int[]) av2);
\r
2484 else if (long.class.equals(c1))
\r
2485 return Arrays.equals((long[]) av1, (long[]) av2);
\r
2486 else if (float.class.equals(c1))
\r
2487 return Arrays.equals((float[]) av1, (float[]) av2);
\r
2488 else if (double.class.equals(c1))
\r
2489 return Arrays.equals((double[]) av1, (double[]) av2);
\r
2490 throw new RuntimeException("??? Contact application querySupport.");
\r
2495 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
\r
2499 Query query = entry.getQuery();
\r
2501 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
\r
2503 entry.prepareRecompute(querySupport);
\r
2505 ReadGraphImpl parentGraph = graph.withParent(entry);
\r
2507 query.recompute(parentGraph, this, entry);
\r
2509 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
\r
2511 Object newValue = entry.getResult();
\r
2513 if (ListenerEntry.NO_VALUE == oldValue) {
\r
2514 if(DebugPolicy.CHANGES) {
\r
2515 System.out.println("C " + query);
\r
2516 System.out.println("- " + oldValue);
\r
2517 System.out.println("- " + newValue);
\r
2522 boolean changed = false;
\r
2524 if (newValue != null) {
\r
2525 if (newValue.getClass().isArray()) {
\r
2526 changed = !arrayEquals(newValue, oldValue);
\r
2528 changed = !newValue.equals(oldValue);
\r
2531 changed = (oldValue != null);
\r
2533 if(DebugPolicy.CHANGES && changed) {
\r
2534 System.out.println("C " + query);
\r
2535 System.out.println("- " + oldValue);
\r
2536 System.out.println("- " + newValue);
\r
2539 return changed ? newValue : ListenerEntry.NOT_CHANGED;
\r
2541 } catch (Throwable t) {
\r
2543 Logger.defaultLogError(t);
\r
2545 return ListenerEntry.NO_VALUE;
\r
2551 public boolean hasScheduledUpdates() {
\r
2552 return !scheduledListeners.isEmpty();
\r
2555 public void performScheduledUpdates(WriteGraphImpl graph) {
\r
2557 assert (!updating);
\r
2558 assert (!collecting);
\r
2559 assert (!firingListeners);
\r
2561 firingListeners = true;
\r
2565 // Performing may cause further events to be scheduled.
\r
2566 while (!scheduledListeners.isEmpty()) {
\r
2568 // graph.restart();
\r
2569 // graph.state.barrier.inc();
\r
2571 // Clone current events to make new entries possible during
\r
2573 THashSet<ListenerEntry> entries = scheduledListeners;
\r
2574 scheduledListeners = new THashSet<ListenerEntry>();
\r
2576 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
\r
2578 for (ListenerEntry listenerEntry : entries) {
\r
2580 if (pruneListener(listenerEntry)) {
\r
2581 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
\r
2585 final CacheEntry entry = listenerEntry.entry;
\r
2586 assert (entry != null);
\r
2588 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
\r
2590 if (newValue != ListenerEntry.NOT_CHANGED) {
\r
2591 if(DebugPolicy.LISTENER)
\r
2592 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
\r
2593 schedule.add(listenerEntry);
\r
2594 listenerEntry.setLastKnown(entry.getResult());
\r
2599 for(ListenerEntry listenerEntry : schedule) {
\r
2600 final CacheEntry entry = listenerEntry.entry;
\r
2601 if(DebugPolicy.LISTENER)
\r
2602 System.out.println("Firing " + listenerEntry.procedure);
\r
2604 if(DebugPolicy.LISTENER)
\r
2605 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
\r
2606 entry.performFromCache(graph, this, listenerEntry.procedure);
\r
2607 } catch (Throwable t) {
\r
2608 t.printStackTrace();
\r
2612 // graph.state.barrier.dec();
\r
2613 // graph.waitAsync(null);
\r
2614 // graph.state.barrier.assertReady();
\r
2619 firingListeners = false;
\r
2626 * @return true if this entry still has listeners
\r
2628 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
\r
2630 assert (!collecting);
\r
2631 assert (!updating);
\r
2634 boolean hadListeners = false;
\r
2635 boolean listenersUnknown = false;
\r
2639 assert(entry != null);
\r
2640 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
\r
2641 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
\r
2642 todo.add(new UpdateEntry(null, entry, 0));
\r
2646 // Walk the tree and collect immediate updates
\r
2647 while (!todo.isEmpty()) {
\r
2648 UpdateEntry e = todo.pop();
\r
2649 hadListeners |= updateQuery(e, todo, immediates);
\r
2652 if(immediates.isEmpty()) break;
\r
2654 // Evaluate all immediate updates and collect parents to update
\r
2655 for(CacheEntry immediate : immediates.values()) {
\r
2657 if(immediate.isDiscarded()) {
\r
2661 if(immediate.isExcepted()) {
\r
2663 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
\r
2664 if (newValue != ListenerEntry.NOT_CHANGED)
\r
2665 updateParents(0, immediate, todo);
\r
2669 Object oldValue = immediate.getResult();
\r
2670 Object newValue = compareTo(graph, immediate, oldValue);
\r
2672 if (newValue != ListenerEntry.NOT_CHANGED) {
\r
2673 updateParents(0, immediate, todo);
\r
2675 // If not changed, keep the old value
\r
2676 immediate.setResult(oldValue);
\r
2677 listenersUnknown = true;
\r
2683 immediates.clear();
\r
2687 } catch (Throwable t) {
\r
2688 Logger.defaultLogError(t);
\r
2691 assert (updating);
\r
2694 return hadListeners | listenersUnknown;
\r
2698 volatile public boolean dirty = false;
\r
2700 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
\r
2701 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
\r
2702 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
\r
2703 // Maybe use a mutex from util.concurrent?
\r
2704 private Object primitiveUpdateLock = new Object();
\r
2705 private THashSet scheduledPrimitiveUpdates = new THashSet();
\r
2707 public void performDirtyUpdates(final ReadGraphImpl graph) {
\r
2710 lastInvalidate = 0;
\r
2712 if (Development.DEVELOPMENT) {
\r
2713 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
\r
2714 System.err.println("== Query update ==");
\r
2718 // Special case - one statement
\r
2719 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
\r
2721 long arg0 = scheduledObjectUpdates.getFirst();
\r
2723 final int subject = (int)(arg0 >>> 32);
\r
2724 final int predicate = (int)(arg0 & 0xffffffff);
\r
2726 for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);
\r
2727 for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);
\r
2728 for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);
\r
2730 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
\r
2731 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);
\r
2732 if(principalTypes != null) update(graph, principalTypes);
\r
2733 Types types = Types.entry(QueryProcessor.this, subject);
\r
2734 if(types != null) update(graph, types);
\r
2737 if(predicate == subrelationOf) {
\r
2738 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
\r
2739 if(superRelations != null) update(graph, superRelations);
\r
2742 DirectPredicates dp = DirectPredicates.entry(QueryProcessor.this, subject);
\r
2743 if(dp != null) update(graph, dp);
\r
2744 OrderedSet os = OrderedSet.entry(QueryProcessor.this, predicate);
\r
2745 if(os != null) update(graph, os);
\r
2747 scheduledObjectUpdates.clear();
\r
2752 // Special case - one value
\r
2753 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
\r
2755 int arg0 = scheduledValueUpdates.getFirst();
\r
2757 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);
\r
2758 if(valueQuery != null) update(graph, valueQuery);
\r
2760 scheduledValueUpdates.clear();
\r
2765 final TIntHashSet predicates = new TIntHashSet();
\r
2766 final TIntHashSet orderedSets = new TIntHashSet();
\r
2768 THashSet primitiveUpdates;
\r
2769 synchronized (primitiveUpdateLock) {
\r
2770 primitiveUpdates = scheduledPrimitiveUpdates;
\r
2771 scheduledPrimitiveUpdates = new THashSet();
\r
2774 primitiveUpdates.forEach(new TObjectProcedure() {
\r
2777 public boolean execute(Object arg0) {
\r
2779 ExternalReadEntry query = (ExternalReadEntry)externalReadMap.get(arg0);
\r
2780 if (query != null) {
\r
2781 boolean listening = update(graph, query);
\r
2782 if (!listening && !query.hasParents()) {
\r
2783 externalReadMap.remove(arg0);
\r
2792 scheduledValueUpdates.forEach(new TIntProcedure() {
\r
2795 public boolean execute(int arg0) {
\r
2796 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);
\r
2797 if(valueQuery != null) update(graph, valueQuery);
\r
2803 scheduledInvalidates.forEach(new TIntProcedure() {
\r
2806 public boolean execute(int resource) {
\r
2808 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, resource);
\r
2809 if(valueQuery != null) update(graph, valueQuery);
\r
2811 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, resource);
\r
2812 if(principalTypes != null) update(graph, principalTypes);
\r
2813 Types types = Types.entry(QueryProcessor.this, resource);
\r
2814 if(types != null) update(graph, types);
\r
2816 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
\r
2817 if(superRelations != null) update(graph, superRelations);
\r
2819 predicates.add(resource);
\r
2826 scheduledObjectUpdates.forEach(new TLongProcedure() {
\r
2829 public boolean execute(long arg0) {
\r
2831 final int subject = (int)(arg0 >>> 32);
\r
2832 final int predicate = (int)(arg0 & 0xffffffff);
\r
2834 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
\r
2835 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);
\r
2836 if(principalTypes != null) update(graph, principalTypes);
\r
2837 Types types = Types.entry(QueryProcessor.this, subject);
\r
2838 if(types != null) update(graph, types);
\r
2841 if(predicate == subrelationOf) {
\r
2842 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
\r
2843 if(superRelations != null) update(graph, superRelations);
\r
2846 predicates.add(subject);
\r
2847 orderedSets.add(predicate);
\r
2855 predicates.forEach(new TIntProcedure() {
\r
2858 public boolean execute(final int subject) {
\r
2860 for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);
\r
2861 for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);
\r
2862 for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);
\r
2864 DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
\r
2865 if(entry != null) update(graph, entry);
\r
2873 orderedSets.forEach(new TIntProcedure() {
\r
2876 public boolean execute(int orderedSet) {
\r
2878 OrderedSet entry = OrderedSet.entry(QueryProcessor.this, orderedSet);
\r
2879 if(entry != null) update(graph, entry);
\r
2887 // for (Integer subject : predicates) {
\r
2888 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
\r
2889 // if(entry != null) update(graph, entry);
\r
2893 if (Development.DEVELOPMENT) {
\r
2894 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
\r
2895 System.err.println("== Query update ends ==");
\r
2899 scheduledValueUpdates.clear();
\r
2900 scheduledObjectUpdates.clear();
\r
2901 scheduledInvalidates.clear();
\r
2905 public void updateValue(final int resource) {
\r
2906 scheduledValueUpdates.add(resource);
\r
2910 public void updateStatements(final int resource, final int predicate) {
\r
2911 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
\r
2915 private int lastInvalidate = 0;
\r
2917 public void invalidateResource(final int resource) {
\r
2918 if(lastInvalidate == resource) return;
\r
2919 scheduledValueUpdates.add(resource);
\r
2920 lastInvalidate = resource;
\r
2924 public void updatePrimitive(final ExternalRead primitive) {
\r
2926 // External reads may be updated from arbitrary threads.
\r
2927 // Synchronize to prevent race-conditions.
\r
2928 synchronized (primitiveUpdateLock) {
\r
2929 scheduledPrimitiveUpdates.add(primitive);
\r
2931 querySupport.dirtyPrimitives();
\r
2936 public synchronized String toString() {
\r
2937 return "QueryProvider [size = " + size + ", hits = " + hits + " misses = " + misses + ", updates = " + updates + "]";
\r
2941 protected void doDispose() {
\r
2943 for(int index = 0; index < THREADS; index++) {
\r
2944 executors[index].dispose();
\r
2947 // First just wait
\r
2948 for(int i=0;i<100;i++) {
\r
2950 boolean alive = false;
\r
2951 for(int index = 0; index < THREADS; index++) {
\r
2952 alive |= executors[index].isAlive();
\r
2954 if(!alive) return;
\r
2957 } catch (InterruptedException e) {
\r
2958 Logger.defaultLogError(e);
\r
2963 // Then start interrupting
\r
2964 for(int i=0;i<100;i++) {
\r
2966 boolean alive = false;
\r
2967 for(int index = 0; index < THREADS; index++) {
\r
2968 alive |= executors[index].isAlive();
\r
2970 if(!alive) return;
\r
2971 for(int index = 0; index < THREADS; index++) {
\r
2972 executors[index].interrupt();
\r
2976 // // Then just destroy
\r
2977 // for(int index = 0; index < THREADS; index++) {
\r
2978 // executors[index].destroy();
\r
2981 for(int index = 0; index < THREADS; index++) {
\r
2983 executors[index].join(5000);
\r
2984 } catch (InterruptedException e) {
\r
2985 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
\r
2987 executors[index] = null;
\r
2992 public int getHits() {
\r
2996 public int getMisses() {
\r
3000 public int getSize() {
\r
3004 public Set<Long> getReferencedClusters() {
\r
3005 HashSet<Long> result = new HashSet<Long>();
\r
3006 for (CacheEntry entry : objectsMap.values()) {
\r
3007 Objects query = (Objects) entry.getQuery();
\r
3008 result.add(querySupport.getClusterId(query.r1()));
\r
3010 for (CacheEntry entry : directPredicatesMap.values()) {
\r
3011 DirectPredicates query = (DirectPredicates) entry.getQuery();
\r
3012 result.add(querySupport.getClusterId(query.id));
\r
3014 for (CacheEntry entry : valueMap.values()) {
\r
3015 ValueQuery query = (ValueQuery) entry.getQuery();
\r
3016 result.add(querySupport.getClusterId(query.id));
\r
3021 public void assertDone() {
\r
3024 CacheCollectionResult allCaches(CacheCollectionResult result) {
\r
3026 int level = Integer.MAX_VALUE;
\r
3027 directPredicatesMap.values(level, result);
\r
3028 principalTypesMap.values(level, result);
\r
3029 for(CacheEntryBase e : uriToResourceMap.values())
\r
3030 if(e.getLevel() <= level)
\r
3032 for(CacheEntryBase e : namespaceIndexMap22.values())
\r
3033 if(e.getLevel() <= level)
\r
3035 projectsMap.values(level, result);
\r
3037 relationInfoMap.values(level, result);
\r
3038 superTypesMap.values(level, result);
\r
3039 typeHierarchyMap.values(level, result);
\r
3040 superRelationsMap.values(level, result);
\r
3041 typesMap.values(level, result);
\r
3043 valueMap.values(level, result);
\r
3044 directObjectsMap.values(level, result);
\r
3045 objectsMap.values(level, result);
\r
3046 orderedSetMap.values(level, result);
\r
3047 predicatesMap.values(level, result);
\r
3049 statementsMap.values(level, result);
\r
3050 assertedPredicatesMap.values(level, result);
\r
3051 assertedStatementsMap.values(level, result);
\r
3052 externalReadMap.values(level, result);
\r
3053 asyncReadMap.values(level, result);
\r
3055 readMap.values(level, result);
\r
3056 asyncMultiReadMap.values(level, result);
\r
3057 multiReadMap.values(level, result);
\r
3063 public void printDiagnostics() {
\r
3066 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
\r
3067 querySupport.requestCluster(graph, clusterId, runnable);
\r
3070 public int clean() {
\r
3071 collector.collect(0, Integer.MAX_VALUE);
\r
3075 public void clean(final Collection<ExternalRead<?>> requests) {
\r
3076 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
\r
3077 Iterator<ExternalRead<?>> iterator = requests.iterator();
\r
3079 public CacheCollectionResult allCaches() {
\r
3080 throw new UnsupportedOperationException();
\r
3083 public CacheEntryBase iterate(int level) {
\r
3084 if(iterator.hasNext()) {
\r
3085 ExternalRead<?> request = iterator.next();
\r
3086 ExternalReadEntry entry = externalReadMap.get(request);
\r
3087 if (entry != null) return entry;
\r
3088 else return iterate(level);
\r
3090 iterator = requests.iterator();
\r
3095 public void remove() {
\r
3096 throw new UnsupportedOperationException();
\r
3099 public void setLevel(CacheEntryBase entry, int level) {
\r
3100 throw new UnsupportedOperationException();
\r
3103 public Collection<CacheEntry> getRootList() {
\r
3104 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
\r
3105 for (ExternalRead<?> request : requests) {
\r
3106 ExternalReadEntry entry = externalReadMap.get(request);
\r
3107 if (entry != null)
\r
3108 result.add(entry);
\r
3113 public int getCurrentSize() {
\r
3117 public int calculateCurrentSize() {
\r
3118 // This tells the collector to attempt collecting everything.
\r
3119 return Integer.MAX_VALUE;
\r
3122 public boolean start(boolean flush) {
\r
3126 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
\r
3129 public void scanPending() {
\r
3131 ArrayList<CacheEntry> entries = new ArrayList<CacheEntry>();
\r
3133 entries.addAll(directPredicatesMap.values());
\r
3134 entries.addAll(principalTypesMap.values());
\r
3135 entries.addAll(uriToResourceMap.values());
\r
3136 entries.addAll(namespaceIndexMap22.values());
\r
3137 entries.addAll(projectsMap.values());
\r
3138 entries.addAll(relationInfoMap.values());
\r
3139 entries.addAll(superTypesMap.values());
\r
3140 entries.addAll(superRelationsMap.values());
\r
3141 entries.addAll(typesMap.values());
\r
3142 entries.addAll(valueMap.values());
\r
3143 entries.addAll(directObjectsMap.values());
\r
3144 entries.addAll(objectsMap.values());
\r
3145 entries.addAll(orderedSetMap.values());
\r
3146 entries.addAll(predicatesMap.values());
\r
3147 entries.addAll(orderedSetMap.values());
\r
3148 entries.addAll(statementsMap.values());
\r
3149 // entries.addAll(assertedObjectsMap.values());
\r
3150 entries.addAll(assertedPredicatesMap.values());
\r
3151 entries.addAll(assertedStatementsMap.values());
\r
3152 entries.addAll(externalReadMap.values());
\r
3153 entries.addAll(asyncReadMap.values());
\r
3154 entries.addAll(externalReadMap.values());
\r
3155 entries.addAll(readMap.values());
\r
3156 entries.addAll(asyncMultiReadMap.values());
\r
3157 entries.addAll(multiReadMap.values());
\r
3158 entries.addAll(readMap.values());
\r
3159 System.out.println(entries.size() + " entries.");
\r
3160 for(Object e : entries) {
\r
3161 if(e instanceof CacheEntry) {
\r
3162 CacheEntry en = (CacheEntry)e;
\r
3163 if(en.isPending()) System.out.println("pending " + e);
\r
3164 if(en.isExcepted()) System.out.println("excepted " + e);
\r
3165 if(en.isDiscarded()) System.out.println("discarded " + e);
\r
3166 if(en.isRefuted()) System.out.println("refuted " + e);
\r
3167 if(en.isFresh()) System.out.println("fresh " + e);
\r
3169 //System.out.println("Unknown object " + e);
\r
3175 public ReadGraphImpl graphForVirtualRequest() {
\r
3176 return ReadGraphImpl.createAsync(this);
\r
3180 private HashMap<Resource, Class<?>> builtinValues;
\r
3182 public Class<?> getBuiltinValue(Resource r) {
\r
3183 if(builtinValues == null) initBuiltinValues();
\r
3184 return builtinValues.get(r);
\r
3187 Exception callerException = null;
\r
3189 public interface AsyncBarrier {
\r
3190 public void inc();
\r
3191 public void dec();
\r
3192 // public void inc(String debug);
\r
3193 // public void dec(String debug);
\r
3196 // final public QueryProcessor processor;
\r
3197 // final public QuerySupport support;
\r
3199 // boolean disposed = false;
\r
3201 private void initBuiltinValues() {
\r
3203 Layer0 b = getSession().peekService(Layer0.class);
\r
3204 if(b == null) return;
\r
3206 builtinValues = new HashMap<Resource, Class<?>>();
\r
3208 builtinValues.put(b.String, String.class);
\r
3209 builtinValues.put(b.Double, Double.class);
\r
3210 builtinValues.put(b.Float, Float.class);
\r
3211 builtinValues.put(b.Long, Long.class);
\r
3212 builtinValues.put(b.Integer, Integer.class);
\r
3213 builtinValues.put(b.Byte, Byte.class);
\r
3214 builtinValues.put(b.Boolean, Boolean.class);
\r
3216 builtinValues.put(b.StringArray, String[].class);
\r
3217 builtinValues.put(b.DoubleArray, double[].class);
\r
3218 builtinValues.put(b.FloatArray, float[].class);
\r
3219 builtinValues.put(b.LongArray, long[].class);
\r
3220 builtinValues.put(b.IntegerArray, int[].class);
\r
3221 builtinValues.put(b.ByteArray, byte[].class);
\r
3222 builtinValues.put(b.BooleanArray, boolean[].class);
\r
3226 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
\r
3228 // if (null == provider2) {
\r
3229 // this.processor = null;
\r
3230 // support = null;
\r
3233 // this.processor = provider2;
\r
3234 // support = provider2.getCore();
\r
3235 // initBuiltinValues();
\r
3239 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
\r
3240 // return new ReadGraphSupportImpl(impl.processor);
\r
3244 final public Session getSession() {
\r
3248 final public ResourceSupport getResourceSupport() {
\r
3249 return resourceSupport;
\r
3253 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
\r
3255 assert(subject != null);
\r
3256 assert(procedure != null);
\r
3258 final ListenerBase listener = getListenerBase(procedure);
\r
3260 IntProcedure ip = new IntProcedure() {
\r
3262 AtomicBoolean first = new AtomicBoolean(true);
\r
3265 public void execute(ReadGraphImpl graph, int i) {
\r
3268 procedure.execute(graph, querySupport.getResource(i));
\r
3270 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
\r
3272 } catch (Throwable t2) {
\r
3273 Logger.defaultLogError(t2);
\r
3278 public void finished(ReadGraphImpl graph) {
\r
3280 if(first.compareAndSet(true, false)) {
\r
3281 procedure.finished(graph);
\r
3282 // impl.state.barrier.dec(this);
\r
3284 procedure.finished(impl.newRestart(graph));
\r
3287 } catch (Throwable t2) {
\r
3288 Logger.defaultLogError(t2);
\r
3293 public void exception(ReadGraphImpl graph, Throwable t) {
\r
3295 if(first.compareAndSet(true, false)) {
\r
3296 procedure.exception(graph, t);
\r
3297 // impl.state.barrier.dec(this);
\r
3299 procedure.exception(impl.newRestart(graph), t);
\r
3301 } catch (Throwable t2) {
\r
3302 Logger.defaultLogError(t2);
\r
3308 int sId = querySupport.getId(subject);
\r
3310 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Predicates#" + sId);
\r
3311 // else impl.state.barrier.inc(null, null);
\r
3313 Predicates.queryEach(impl, sId, this, impl.parent, listener, ip);
\r
3318 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
\r
3320 assert(subject != null);
\r
3321 assert(procedure != null);
\r
3323 final ListenerBase listener = getListenerBase(procedure);
\r
3325 // impl.state.barrier.inc();
\r
3327 Predicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
\r
3330 public void execute(ReadGraphImpl graph, int i) {
\r
3332 procedure.execute(querySupport.getResource(i));
\r
3333 } catch (Throwable t2) {
\r
3334 Logger.defaultLogError(t2);
\r
3339 public void finished(ReadGraphImpl graph) {
\r
3341 procedure.finished();
\r
3342 } catch (Throwable t2) {
\r
3343 Logger.defaultLogError(t2);
\r
3345 // impl.state.barrier.dec();
\r
3349 public void exception(ReadGraphImpl graph, Throwable t) {
\r
3351 procedure.exception(t);
\r
3352 } catch (Throwable t2) {
\r
3353 Logger.defaultLogError(t2);
\r
3355 // impl.state.barrier.dec();
\r
3363 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
\r
3365 assert(subject != null);
\r
3367 return Predicates.queryEach2(impl, querySupport.getId(subject), this, impl.parent);
\r
3373 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
\r
3374 final Resource predicate, final MultiProcedure<Statement> procedure) {
\r
3376 assert(subject != null);
\r
3377 assert(predicate != null);
\r
3378 assert(procedure != null);
\r
3380 final ListenerBase listener = getListenerBase(procedure);
\r
3382 // impl.state.barrier.inc();
\r
3384 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
\r
3387 public void execute(ReadGraphImpl graph, int s, int p, int o) {
\r
3389 procedure.execute(querySupport.getStatement(s, p, o));
\r
3390 } catch (Throwable t2) {
\r
3391 Logger.defaultLogError(t2);
\r
3396 public void finished(ReadGraphImpl graph) {
\r
3398 procedure.finished();
\r
3399 } catch (Throwable t2) {
\r
3400 Logger.defaultLogError(t2);
\r
3402 // impl.state.barrier.dec();
\r
3406 public void exception(ReadGraphImpl graph, Throwable t) {
\r
3408 procedure.exception(t);
\r
3409 } catch (Throwable t2) {
\r
3410 Logger.defaultLogError(t2);
\r
3412 // impl.state.barrier.dec();
\r
3420 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
\r
3421 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
\r
3423 assert(subject != null);
\r
3424 assert(predicate != null);
\r
3425 assert(procedure != null);
\r
3427 final ListenerBase listener = getListenerBase(procedure);
\r
3429 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
\r
3431 boolean first = true;
\r
3434 public void execute(ReadGraphImpl graph, int s, int p, int o) {
\r
3437 procedure.execute(graph, querySupport.getStatement(s, p, o));
\r
3439 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
\r
3441 } catch (Throwable t2) {
\r
3442 Logger.defaultLogError(t2);
\r
3447 public void finished(ReadGraphImpl graph) {
\r
3452 procedure.finished(graph);
\r
3453 // impl.state.barrier.dec(this);
\r
3455 procedure.finished(impl.newRestart(graph));
\r
3457 } catch (Throwable t2) {
\r
3458 Logger.defaultLogError(t2);
\r
3464 public void exception(ReadGraphImpl graph, Throwable t) {
\r
3469 procedure.exception(graph, t);
\r
3470 // impl.state.barrier.dec(this);
\r
3472 procedure.exception(impl.newRestart(graph), t);
\r
3474 } catch (Throwable t2) {
\r
3475 Logger.defaultLogError(t2);
\r
3482 int sId = querySupport.getId(subject);
\r
3483 int pId = querySupport.getId(predicate);
\r
3485 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
\r
3486 // else impl.state.barrier.inc(null, null);
\r
3488 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
\r
3493 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
\r
3494 final Resource predicate, final StatementProcedure procedure) {
\r
3496 assert(subject != null);
\r
3497 assert(predicate != null);
\r
3498 assert(procedure != null);
\r
3500 final ListenerBase listener = getListenerBase(procedure);
\r
3502 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
\r
3504 boolean first = true;
\r
3507 public void execute(ReadGraphImpl graph, int s, int p, int o) {
\r
3510 procedure.execute(graph, s, p, o);
\r
3512 procedure.execute(impl.newRestart(graph), s, p, o);
\r
3514 } catch (Throwable t2) {
\r
3515 Logger.defaultLogError(t2);
\r
3520 public void finished(ReadGraphImpl graph) {
\r
3525 procedure.finished(graph);
\r
3526 // impl.state.barrier.dec(this);
\r
3528 procedure.finished(impl.newRestart(graph));
\r
3530 } catch (Throwable t2) {
\r
3531 Logger.defaultLogError(t2);
\r
3537 public void exception(ReadGraphImpl graph, Throwable t) {
\r
3542 procedure.exception(graph, t);
\r
3543 // impl.state.barrier.dec(this);
\r
3545 procedure.exception(impl.newRestart(graph), t);
\r
3547 } catch (Throwable t2) {
\r
3548 Logger.defaultLogError(t2);
\r
3555 int sId = querySupport.getId(subject);
\r
3556 int pId = querySupport.getId(predicate);
\r
3558 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
\r
3559 // else impl.state.barrier.inc(null, null);
\r
3561 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
\r
3566 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
\r
3568 assert(subject != null);
\r
3569 assert(predicate != null);
\r
3570 assert(procedure != null);
\r
3572 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
\r
3574 private Set<Statement> current = null;
\r
3575 private Set<Statement> run = new HashSet<Statement>();
\r
3578 public void execute(AsyncReadGraph graph, Statement result) {
\r
3580 boolean found = false;
\r
3582 if(current != null) {
\r
3584 found = current.remove(result);
\r
3588 if(!found) procedure.add(graph, result);
\r
3595 public void finished(AsyncReadGraph graph) {
\r
3597 if(current != null) {
\r
3598 for(Statement r : current) procedure.remove(graph, r);
\r
3603 run = new HashSet<Statement>();
\r
3608 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3609 procedure.exception(graph, t);
\r
3613 public boolean isDisposed() {
\r
3614 return procedure.isDisposed();
\r
3622 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
\r
3623 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
\r
3625 assert(subject != null);
\r
3626 assert(predicate != null);
\r
3627 assert(procedure != null);
\r
3629 final ListenerBase listener = getListenerBase(procedure);
\r
3631 // impl.state.barrier.inc();
\r
3633 AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
\r
3636 public void execute(ReadGraphImpl graph, int s, int p, int o) {
\r
3638 procedure.execute(graph, querySupport.getStatement(s, p, o));
\r
3639 } catch (Throwable t2) {
\r
3640 Logger.defaultLogError(t2);
\r
3645 public void finished(ReadGraphImpl graph) {
\r
3647 procedure.finished(graph);
\r
3648 } catch (Throwable t2) {
\r
3649 Logger.defaultLogError(t2);
\r
3651 // impl.state.barrier.dec();
\r
3655 public void exception(ReadGraphImpl graph, Throwable t) {
\r
3657 procedure.exception(graph, t);
\r
3658 } catch (Throwable t2) {
\r
3659 Logger.defaultLogError(t2);
\r
3661 // impl.state.barrier.dec();
\r
3668 private static ListenerBase getListenerBase(Object procedure) {
\r
3669 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
\r
3674 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
\r
3676 assert(subject != null);
\r
3677 assert(predicate != null);
\r
3678 assert(procedure != null);
\r
3680 final ListenerBase listener = getListenerBase(procedure);
\r
3682 // impl.state.barrier.inc();
\r
3684 Objects.runner(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
\r
3687 public void execute(ReadGraphImpl graph, int i) {
\r
3689 procedure.execute(querySupport.getResource(i));
\r
3690 } catch (Throwable t2) {
\r
3691 Logger.defaultLogError(t2);
\r
3696 public void finished(ReadGraphImpl graph) {
\r
3698 procedure.finished();
\r
3699 } catch (Throwable t2) {
\r
3700 Logger.defaultLogError(t2);
\r
3702 // impl.state.barrier.dec();
\r
3706 public void exception(ReadGraphImpl graph, Throwable t) {
\r
3707 System.out.println("forEachObject exception " + t);
\r
3709 procedure.exception(t);
\r
3710 } catch (Throwable t2) {
\r
3711 Logger.defaultLogError(t2);
\r
3713 // impl.state.barrier.dec();
\r
3722 // final public void forEachDirectObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
\r
3724 // assert(subject != null);
\r
3725 // assert(predicate != null);
\r
3726 // assert(procedure != null);
\r
3728 // final ListenerBase listener = getListenerBase(procedure);
\r
3730 // int sId = querySupport.getId(subject);
\r
3731 // int pId = querySupport.getId(predicate);
\r
3733 // MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, support);
\r
3735 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectObjects" + sId + "#" + pId);
\r
3736 // else impl.state.barrier.inc(null, null);
\r
3738 // // final Exception caller = new Exception();
\r
3740 // // final Pair<Exception, Exception> exceptions = Pair.make(callerException, new Exception());
\r
3742 // DirectObjects.queryEach(impl, sId, pId, processor, impl.parent, listener, proc);
\r
3747 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
\r
3749 assert(subject != null);
\r
3750 assert(procedure != null);
\r
3752 final ListenerBase listener = getListenerBase(procedure);
\r
3754 MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);
\r
3756 int sId = querySupport.getId(subject);
\r
3758 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectPredicates" + sId);
\r
3759 // else impl.state.barrier.inc(null, null);
\r
3761 DirectPredicates.queryEach(impl, sId, this, impl.parent, listener, proc);
\r
3766 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {
\r
3768 assert(subject != null);
\r
3769 assert(procedure != null);
\r
3771 final ListenerBase listener = getListenerBase(procedure);
\r
3773 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
\r
3778 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
\r
3780 assert(subject != null);
\r
3781 assert(procedure != null);
\r
3783 final ListenerBase listener = getListenerBase(procedure);
\r
3785 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
\r
3789 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
\r
3792 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
\r
3794 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
\r
3796 private Resource single = null;
\r
3799 public synchronized void execute(AsyncReadGraph graph, Resource result) {
\r
3800 if(single == null) {
\r
3803 single = INVALID_RESOURCE;
\r
3808 public synchronized void finished(AsyncReadGraph graph) {
\r
3809 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
\r
3810 else procedure.execute(graph, single);
\r
3814 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
\r
3815 procedure.exception(graph, throwable);
\r
3822 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
\r
3824 final int sId = querySupport.getId(subject);
\r
3825 final int pId = querySupport.getId(predicate);
\r
3827 Objects.runner(impl, sId, pId, impl.parent, listener, procedure);
\r
3831 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
\r
3833 final int sId = querySupport.getId(subject);
\r
3834 final int pId = querySupport.getId(predicate);
\r
3836 return Objects.runner2(impl, sId, pId, impl.parent);
\r
3840 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
\r
3842 assert(subject != null);
\r
3843 assert(predicate != null);
\r
3845 final ListenerBase listener = getListenerBase(procedure);
\r
3847 if(impl.parent != null || listener != null) {
\r
3849 IntProcedure ip = new IntProcedure() {
\r
3851 AtomicBoolean first = new AtomicBoolean(true);
\r
3854 public void execute(ReadGraphImpl graph, int i) {
\r
3857 procedure.execute(impl, querySupport.getResource(i));
\r
3859 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
\r
3861 } catch (Throwable t2) {
\r
3862 Logger.defaultLogError(t2);
\r
3868 public void finished(ReadGraphImpl graph) {
\r
3870 if(first.compareAndSet(true, false)) {
\r
3871 procedure.finished(impl);
\r
3872 // impl.state.barrier.dec(this);
\r
3874 procedure.finished(impl.newRestart(graph));
\r
3876 } catch (Throwable t2) {
\r
3877 Logger.defaultLogError(t2);
\r
3882 public void exception(ReadGraphImpl graph, Throwable t) {
\r
3884 procedure.exception(graph, t);
\r
3885 } catch (Throwable t2) {
\r
3886 Logger.defaultLogError(t2);
\r
3888 // impl.state.barrier.dec(this);
\r
3892 public String toString() {
\r
3893 return "forEachObject with " + procedure;
\r
3898 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
\r
3899 // else impl.state.barrier.inc(null, null);
\r
3901 forEachObject(impl, subject, predicate, listener, ip);
\r
3905 IntProcedure ip = new IntProcedure() {
\r
3908 public void execute(ReadGraphImpl graph, int i) {
\r
3909 procedure.execute(graph, querySupport.getResource(i));
\r
3913 public void finished(ReadGraphImpl graph) {
\r
3914 procedure.finished(graph);
\r
3918 public void exception(ReadGraphImpl graph, Throwable t) {
\r
3919 procedure.exception(graph, t);
\r
3923 public String toString() {
\r
3924 return "forEachObject with " + procedure;
\r
3929 forEachObject(impl, subject, predicate, listener, ip);
\r
3936 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
\r
3938 assert(subject != null);
\r
3939 assert(predicate != null);
\r
3940 assert(procedure != null);
\r
3942 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
\r
3944 private Set<Resource> current = null;
\r
3945 private Set<Resource> run = new HashSet<Resource>();
\r
3948 public void execute(AsyncReadGraph graph, Resource result) {
\r
3950 boolean found = false;
\r
3952 if(current != null) {
\r
3954 found = current.remove(result);
\r
3958 if(!found) procedure.add(graph, result);
\r
3965 public void finished(AsyncReadGraph graph) {
\r
3967 if(current != null) {
\r
3968 for(Resource r : current) procedure.remove(graph, r);
\r
3973 run = new HashSet<Resource>();
\r
3978 public boolean isDisposed() {
\r
3979 return procedure.isDisposed();
\r
3983 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3984 procedure.exception(graph, t);
\r
3988 public String toString() {
\r
3989 return "forObjectSet " + procedure;
\r
3997 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
\r
3999 assert(subject != null);
\r
4000 assert(procedure != null);
\r
4002 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
\r
4004 private Set<Resource> current = null;
\r
4005 private Set<Resource> run = new HashSet<Resource>();
\r
4008 public void execute(AsyncReadGraph graph, Resource result) {
\r
4010 boolean found = false;
\r
4012 if(current != null) {
\r
4014 found = current.remove(result);
\r
4018 if(!found) procedure.add(graph, result);
\r
4025 public void finished(AsyncReadGraph graph) {
\r
4027 if(current != null) {
\r
4028 for(Resource r : current) procedure.remove(graph, r);
\r
4033 run = new HashSet<Resource>();
\r
4038 public boolean isDisposed() {
\r
4039 return procedure.isDisposed();
\r
4043 public void exception(AsyncReadGraph graph, Throwable t) {
\r
4044 procedure.exception(graph, t);
\r
4048 public String toString() {
\r
4049 return "forPredicateSet " + procedure;
\r
4057 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
\r
4059 assert(subject != null);
\r
4060 assert(procedure != null);
\r
4062 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
\r
4064 private Set<Resource> current = null;
\r
4065 private Set<Resource> run = new HashSet<Resource>();
\r
4068 public void execute(AsyncReadGraph graph, Resource result) {
\r
4070 boolean found = false;
\r
4072 if(current != null) {
\r
4074 found = current.remove(result);
\r
4078 if(!found) procedure.add(graph, result);
\r
4085 public void finished(AsyncReadGraph graph) {
\r
4087 if(current != null) {
\r
4088 for(Resource r : current) procedure.remove(graph, r);
\r
4093 run = new HashSet<Resource>();
\r
4098 public boolean isDisposed() {
\r
4099 return procedure.isDisposed();
\r
4103 public void exception(AsyncReadGraph graph, Throwable t) {
\r
4104 procedure.exception(graph, t);
\r
4108 public String toString() {
\r
4109 return "forPrincipalTypeSet " + procedure;
\r
4117 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
\r
4119 assert(subject != null);
\r
4120 assert(predicate != null);
\r
4121 assert(procedure != null);
\r
4123 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
\r
4125 private Set<Resource> current = null;
\r
4126 private Set<Resource> run = new HashSet<Resource>();
\r
4129 public void execute(AsyncReadGraph graph, Resource result) {
\r
4131 boolean found = false;
\r
4133 if(current != null) {
\r
4135 found = current.remove(result);
\r
4139 if(!found) procedure.add(graph, result);
\r
4146 public void finished(AsyncReadGraph graph) {
\r
4148 if(current != null) {
\r
4149 for(Resource r : current) procedure.remove(graph, r);
\r
4154 run = new HashSet<Resource>();
\r
4159 public boolean isDisposed() {
\r
4160 return procedure.isDisposed();
\r
4164 public void exception(AsyncReadGraph graph, Throwable t) {
\r
4165 procedure.exception(graph, t);
\r
4169 public String toString() {
\r
4170 return "forObjectSet " + procedure;
\r
4178 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
\r
4180 assert(subject != null);
\r
4181 assert(predicate != null);
\r
4182 assert(procedure != null);
\r
4184 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
\r
4186 private Set<Statement> current = null;
\r
4187 private Set<Statement> run = new HashSet<Statement>();
\r
4190 public void execute(AsyncReadGraph graph, Statement result) {
\r
4192 boolean found = false;
\r
4194 if(current != null) {
\r
4196 found = current.remove(result);
\r
4200 if(!found) procedure.add(graph, result);
\r
4207 public void finished(AsyncReadGraph graph) {
\r
4209 if(current != null) {
\r
4210 for(Statement s : current) procedure.remove(graph, s);
\r
4215 run = new HashSet<Statement>();
\r
4220 public boolean isDisposed() {
\r
4221 return procedure.isDisposed();
\r
4225 public void exception(AsyncReadGraph graph, Throwable t) {
\r
4226 procedure.exception(graph, t);
\r
4230 public String toString() {
\r
4231 return "forStatementSet " + procedure;
\r
4239 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
\r
4240 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
\r
4242 assert(subject != null);
\r
4243 assert(predicate != null);
\r
4244 assert(procedure != null);
\r
4246 final ListenerBase listener = getListenerBase(procedure);
\r
4248 // impl.state.barrier.inc();
\r
4250 AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedure() {
\r
4253 public void execute(ReadGraphImpl graph, int s, int p, int o) {
\r
4255 procedure.execute(graph, querySupport.getResource(o));
\r
4256 } catch (Throwable t2) {
\r
4257 Logger.defaultLogError(t2);
\r
4262 public void finished(ReadGraphImpl graph) {
\r
4264 procedure.finished(graph);
\r
4265 } catch (Throwable t2) {
\r
4266 Logger.defaultLogError(t2);
\r
4268 // impl.state.barrier.dec();
\r
4272 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4274 procedure.exception(graph, t);
\r
4275 } catch (Throwable t2) {
\r
4276 Logger.defaultLogError(t2);
\r
4278 // impl.state.barrier.dec();
\r
4286 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
\r
4288 assert(subject != null);
\r
4289 assert(procedure != null);
\r
4291 final ListenerBase listener = getListenerBase(procedure);
\r
4293 IntProcedure ip = new IntProcedure() {
\r
4296 public void execute(ReadGraphImpl graph, int i) {
\r
4298 procedure.execute(graph, querySupport.getResource(i));
\r
4299 } catch (Throwable t2) {
\r
4300 Logger.defaultLogError(t2);
\r
4305 public void finished(ReadGraphImpl graph) {
\r
4307 procedure.finished(graph);
\r
4308 } catch (Throwable t2) {
\r
4309 Logger.defaultLogError(t2);
\r
4311 // impl.state.barrier.dec(this);
\r
4315 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4317 procedure.exception(graph, t);
\r
4318 } catch (Throwable t2) {
\r
4319 Logger.defaultLogError(t2);
\r
4321 // impl.state.barrier.dec(this);
\r
4326 int sId = querySupport.getId(subject);
\r
4328 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
\r
4329 // else impl.state.barrier.inc(null, null);
\r
4331 PrincipalTypes.queryEach(impl, sId, this, impl.parent, listener, ip);
\r
4336 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
\r
4338 assert(subject != null);
\r
4339 assert(procedure != null);
\r
4341 final ListenerBase listener = getListenerBase(procedure);
\r
4343 // impl.state.barrier.inc();
\r
4345 PrincipalTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
\r
4348 public void execute(ReadGraphImpl graph, int i) {
\r
4350 procedure.execute(querySupport.getResource(i));
\r
4351 } catch (Throwable t2) {
\r
4352 Logger.defaultLogError(t2);
\r
4357 public void finished(ReadGraphImpl graph) {
\r
4359 procedure.finished();
\r
4360 } catch (Throwable t2) {
\r
4361 Logger.defaultLogError(t2);
\r
4363 // impl.state.barrier.dec();
\r
4367 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4369 procedure.exception(t);
\r
4370 } catch (Throwable t2) {
\r
4371 Logger.defaultLogError(t2);
\r
4373 // impl.state.barrier.dec();
\r
4380 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
\r
4382 assert(subject != null);
\r
4383 assert(procedure != null);
\r
4385 final ListenerBase listener = getListenerBase(procedure);
\r
4387 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
\r
4389 AtomicBoolean first = new AtomicBoolean(true);
\r
4392 public void execute(final ReadGraphImpl graph, IntSet set) {
\r
4394 if(first.compareAndSet(true, false)) {
\r
4395 procedure.execute(graph, set);
\r
4396 // impl.state.barrier.dec(this);
\r
4398 procedure.execute(impl.newRestart(graph), set);
\r
4400 } catch (Throwable t2) {
\r
4401 Logger.defaultLogError(t2);
\r
4406 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4408 if(first.compareAndSet(true, false)) {
\r
4409 procedure.exception(graph, t);
\r
4410 // impl.state.barrier.dec(this);
\r
4412 procedure.exception(impl.newRestart(graph), t);
\r
4414 } catch (Throwable t2) {
\r
4415 Logger.defaultLogError(t2);
\r
4421 int sId = querySupport.getId(subject);
\r
4423 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Types" + sId);
\r
4424 // else impl.state.barrier.inc(null, null);
\r
4426 Types.queryEach(impl, sId, this, impl.parent, listener, ip);
\r
4431 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
\r
4433 assert(subject != null);
\r
4435 return Types.queryEach2(impl, querySupport.getId(subject), this, impl.parent);
\r
4440 final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {
\r
4442 assert(subject != null);
\r
4443 assert(procedure != null);
\r
4445 final ListenerBase listener = getListenerBase(procedure);
\r
4447 // impl.state.barrier.inc();
\r
4449 RelationInfoQuery.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<RelationInfo>() {
\r
4451 AtomicBoolean first = new AtomicBoolean(true);
\r
4454 public void execute(final ReadGraphImpl graph, RelationInfo set) {
\r
4456 if(first.compareAndSet(true, false)) {
\r
4457 procedure.execute(graph, set);
\r
4458 // impl.state.barrier.dec();
\r
4460 procedure.execute(impl.newRestart(graph), set);
\r
4462 } catch (Throwable t2) {
\r
4463 Logger.defaultLogError(t2);
\r
4468 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4470 if(first.compareAndSet(true, false)) {
\r
4471 procedure.exception(graph, t);
\r
4472 // impl.state.barrier.dec("ReadGraphSupportImpl.1353");
\r
4474 procedure.exception(impl.newRestart(graph), t);
\r
4476 } catch (Throwable t2) {
\r
4477 Logger.defaultLogError(t2);
\r
4486 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
\r
4488 assert(subject != null);
\r
4489 assert(procedure != null);
\r
4491 final ListenerBase listener = getListenerBase(procedure);
\r
4493 // impl.state.barrier.inc();
\r
4495 SuperTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<IntSet>() {
\r
4497 AtomicBoolean first = new AtomicBoolean(true);
\r
4500 public void execute(final ReadGraphImpl graph, IntSet set) {
\r
4501 // final HashSet<Resource> result = new HashSet<Resource>();
\r
4502 // set.forEach(new TIntProcedure() {
\r
4505 // public boolean execute(int type) {
\r
4506 // result.add(querySupport.getResource(type));
\r
4512 if(first.compareAndSet(true, false)) {
\r
4513 procedure.execute(graph, set);
\r
4514 // impl.state.barrier.dec();
\r
4516 procedure.execute(impl.newRestart(graph), set);
\r
4518 } catch (Throwable t2) {
\r
4519 Logger.defaultLogError(t2);
\r
4524 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4526 if(first.compareAndSet(true, false)) {
\r
4527 procedure.exception(graph, t);
\r
4528 // impl.state.barrier.dec();
\r
4530 procedure.exception(impl.newRestart(graph), t);
\r
4532 } catch (Throwable t2) {
\r
4533 Logger.defaultLogError(t2);
\r
4542 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
\r
4544 assert(subject != null);
\r
4545 assert(procedure != null);
\r
4547 final ListenerBase listener = getListenerBase(procedure);
\r
4549 IntProcedure ip = new IntProcedureAdapter() {
\r
4552 public void execute(final ReadGraphImpl graph, int superRelation) {
\r
4554 procedure.execute(graph, querySupport.getResource(superRelation));
\r
4555 } catch (Throwable t2) {
\r
4556 Logger.defaultLogError(t2);
\r
4561 public void finished(final ReadGraphImpl graph) {
\r
4563 procedure.finished(graph);
\r
4564 } catch (Throwable t2) {
\r
4565 Logger.defaultLogError(t2);
\r
4567 // impl.state.barrier.dec(this);
\r
4572 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4574 procedure.exception(graph, t);
\r
4575 } catch (Throwable t2) {
\r
4576 Logger.defaultLogError(t2);
\r
4578 // impl.state.barrier.dec(this);
\r
4583 int sId = querySupport.getId(subject);
\r
4585 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
\r
4586 // else impl.state.barrier.inc(null, null);
\r
4588 DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
\r
4593 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
\r
4595 assert(subject != null);
\r
4596 assert(procedure != null);
\r
4598 final ListenerBase listener = getListenerBase(procedure);
\r
4600 // impl.state.barrier.inc();
\r
4602 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
\r
4607 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
\r
4609 assert(subject != null);
\r
4610 assert(procedure != null);
\r
4612 final ListenerBase listener = getListenerBase(procedure);
\r
4614 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
\r
4617 public void execute(final ReadGraphImpl graph, IntSet set) {
\r
4618 // final HashSet<Resource> result = new HashSet<Resource>();
\r
4619 // set.forEach(new TIntProcedure() {
\r
4622 // public boolean execute(int type) {
\r
4623 // result.add(querySupport.getResource(type));
\r
4629 procedure.execute(graph, set);
\r
4630 } catch (Throwable t2) {
\r
4631 Logger.defaultLogError(t2);
\r
4633 // impl.state.barrier.dec(this);
\r
4637 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4639 procedure.exception(graph, t);
\r
4640 } catch (Throwable t2) {
\r
4641 Logger.defaultLogError(t2);
\r
4643 // impl.state.barrier.dec(this);
\r
4648 int sId = querySupport.getId(subject);
\r
4650 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
\r
4651 // else impl.state.barrier.inc(null, null);
\r
4653 SuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
\r
4657 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
\r
4659 int sId = querySupport.getId(subject);
\r
4660 return ValueQuery.queryEach(impl, sId, impl.parent);
\r
4664 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
\r
4666 return ValueQuery.queryEach(impl, subject, impl.parent);
\r
4671 final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
\r
4673 assert(subject != null);
\r
4675 int sId = querySupport.getId(subject);
\r
4677 if(procedure != null) {
\r
4679 final ListenerBase listener = getListenerBase(procedure);
\r
4681 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
\r
4683 AtomicBoolean first = new AtomicBoolean(true);
\r
4686 public void execute(ReadGraphImpl graph, byte[] result) {
\r
4688 if(first.compareAndSet(true, false)) {
\r
4689 procedure.execute(graph, result);
\r
4690 // impl.state.barrier.dec(this);
\r
4692 procedure.execute(impl.newRestart(graph), result);
\r
4694 } catch (Throwable t2) {
\r
4695 Logger.defaultLogError(t2);
\r
4700 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4702 if(first.compareAndSet(true, false)) {
\r
4703 procedure.exception(graph, t);
\r
4704 // impl.state.barrier.dec(this);
\r
4706 procedure.exception(impl.newRestart(graph), t);
\r
4708 } catch (Throwable t2) {
\r
4709 Logger.defaultLogError(t2);
\r
4715 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
\r
4716 // else impl.state.barrier.inc(null, null);
\r
4718 return ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
\r
4722 return ValueQuery.queryEach(impl, sId, impl.parent, null, null);
\r
4729 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
\r
4731 assert(subject != null);
\r
4732 assert(procedure != null);
\r
4734 final ListenerBase listener = getListenerBase(procedure);
\r
4736 if(impl.parent != null || listener != null) {
\r
4738 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
\r
4740 AtomicBoolean first = new AtomicBoolean(true);
\r
4743 public void execute(ReadGraphImpl graph, byte[] result) {
\r
4745 if(first.compareAndSet(true, false)) {
\r
4746 procedure.execute(graph, result);
\r
4747 // impl.state.barrier.dec(this);
\r
4749 procedure.execute(impl.newRestart(graph), result);
\r
4751 } catch (Throwable t2) {
\r
4752 Logger.defaultLogError(t2);
\r
4757 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4759 if(first.compareAndSet(true, false)) {
\r
4760 procedure.exception(graph, t);
\r
4761 // impl.state.barrier.dec(this);
\r
4763 procedure.exception(impl.newRestart(graph), t);
\r
4765 } catch (Throwable t2) {
\r
4766 Logger.defaultLogError(t2);
\r
4772 int sId = querySupport.getId(subject);
\r
4774 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
\r
4775 // else impl.state.barrier.inc(null, null);
\r
4777 ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
\r
4781 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
\r
4784 public void execute(ReadGraphImpl graph, byte[] result) {
\r
4786 procedure.execute(graph, result);
\r
4791 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4793 procedure.exception(graph, t);
\r
4799 int sId = querySupport.getId(subject);
\r
4801 ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
\r
4808 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
\r
4810 assert(relation != null);
\r
4811 assert(procedure != null);
\r
4813 final ListenerBase listener = getListenerBase(procedure);
\r
4815 IntProcedure ip = new IntProcedure() {
\r
4817 private int result = 0;
\r
4819 final AtomicBoolean found = new AtomicBoolean(false);
\r
4820 final AtomicBoolean done = new AtomicBoolean(false);
\r
4823 public void finished(ReadGraphImpl graph) {
\r
4825 // Shall fire exactly once!
\r
4826 if(done.compareAndSet(false, true)) {
\r
4829 procedure.exception(graph, new NoInverseException(""));
\r
4830 // impl.state.barrier.dec(this);
\r
4832 procedure.execute(graph, querySupport.getResource(result));
\r
4833 // impl.state.barrier.dec(this);
\r
4835 } catch (Throwable t) {
\r
4836 Logger.defaultLogError(t);
\r
4843 public void execute(ReadGraphImpl graph, int i) {
\r
4845 if(found.compareAndSet(false, true)) {
\r
4848 // Shall fire exactly once!
\r
4849 if(done.compareAndSet(false, true)) {
\r
4851 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
\r
4852 // impl.state.barrier.dec(this);
\r
4853 } catch (Throwable t) {
\r
4854 Logger.defaultLogError(t);
\r
4862 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4863 // Shall fire exactly once!
\r
4864 if(done.compareAndSet(false, true)) {
\r
4866 procedure.exception(graph, t);
\r
4867 // impl.state.barrier.dec(this);
\r
4868 } catch (Throwable t2) {
\r
4869 Logger.defaultLogError(t2);
\r
4876 int sId = querySupport.getId(relation);
\r
4878 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
\r
4879 // else impl.state.barrier.inc(null, null);
\r
4881 Objects.runner(impl, sId, getInverseOf(), impl.parent, listener, ip);
\r
4886 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
\r
4888 assert(id != null);
\r
4889 assert(procedure != null);
\r
4891 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
\r
4894 public void execute(ReadGraphImpl graph, Integer result) {
\r
4896 procedure.execute(graph, querySupport.getResource(result));
\r
4897 } catch (Throwable t2) {
\r
4898 Logger.defaultLogError(t2);
\r
4900 // impl.state.barrier.dec(this);
\r
4904 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4907 procedure.exception(graph, t);
\r
4908 } catch (Throwable t2) {
\r
4909 Logger.defaultLogError(t2);
\r
4911 // impl.state.barrier.dec(this);
\r
4916 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
\r
4917 // else impl.state.barrier.inc(null, null);
\r
4919 forResource(impl, id, impl.parent, ip);
\r
4924 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
\r
4926 assert(id != null);
\r
4927 assert(procedure != null);
\r
4929 // impl.state.barrier.inc();
\r
4931 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
\r
4934 public void execute(ReadGraphImpl graph, Integer result) {
\r
4936 procedure.execute(graph, querySupport.getResource(result));
\r
4937 } catch (Throwable t2) {
\r
4938 Logger.defaultLogError(t2);
\r
4940 // impl.state.barrier.dec();
\r
4944 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4946 procedure.exception(graph, t);
\r
4947 } catch (Throwable t2) {
\r
4948 Logger.defaultLogError(t2);
\r
4950 // impl.state.barrier.dec();
\r
4958 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
\r
4960 assert(subject != null);
\r
4961 assert(procedure != null);
\r
4963 final ListenerBase listener = getListenerBase(procedure);
\r
4965 // impl.state.barrier.inc();
\r
4967 DirectPredicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
\r
4969 boolean found = false;
\r
4972 public void execute(ReadGraphImpl graph, int object) {
\r
4977 public void finished(ReadGraphImpl graph) {
\r
4979 procedure.execute(graph, found);
\r
4980 } catch (Throwable t2) {
\r
4981 Logger.defaultLogError(t2);
\r
4983 // impl.state.barrier.dec();
\r
4987 public void exception(ReadGraphImpl graph, Throwable t) {
\r
4989 procedure.exception(graph, t);
\r
4990 } catch (Throwable t2) {
\r
4991 Logger.defaultLogError(t2);
\r
4993 // impl.state.barrier.dec();
\r
5001 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
\r
5003 assert(subject != null);
\r
5004 assert(predicate != null);
\r
5005 assert(procedure != null);
\r
5007 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
\r
5009 boolean found = false;
\r
5012 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
\r
5017 synchronized public void finished(AsyncReadGraph graph) {
\r
5019 procedure.execute(graph, found);
\r
5020 } catch (Throwable t2) {
\r
5021 Logger.defaultLogError(t2);
\r
5023 // impl.state.barrier.dec(this);
\r
5027 public void exception(AsyncReadGraph graph, Throwable t) {
\r
5029 procedure.exception(graph, t);
\r
5030 } catch (Throwable t2) {
\r
5031 Logger.defaultLogError(t2);
\r
5033 // impl.state.barrier.dec(this);
\r
5038 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
\r
5039 // else impl.state.barrier.inc(null, null);
\r
5041 forEachObject(impl, subject, predicate, ip);
\r
5046 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
\r
5048 assert(subject != null);
\r
5049 assert(predicate != null);
\r
5050 assert(procedure != null);
\r
5052 // impl.state.barrier.inc();
\r
5054 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
\r
5056 boolean found = false;
\r
5059 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
\r
5060 if(resource.equals(object)) found = true;
\r
5064 synchronized public void finished(AsyncReadGraph graph) {
\r
5066 procedure.execute(graph, found);
\r
5067 } catch (Throwable t2) {
\r
5068 Logger.defaultLogError(t2);
\r
5070 // impl.state.barrier.dec();
\r
5074 public void exception(AsyncReadGraph graph, Throwable t) {
\r
5076 procedure.exception(graph, t);
\r
5077 } catch (Throwable t2) {
\r
5078 Logger.defaultLogError(t2);
\r
5080 // impl.state.barrier.dec();
\r
5088 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
\r
5090 assert(subject != null);
\r
5091 assert(procedure != null);
\r
5093 final ListenerBase listener = getListenerBase(procedure);
\r
5095 // impl.state.barrier.inc();
\r
5097 ValueQuery.queryEach(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
\r
5100 public void execute(ReadGraphImpl graph, byte[] object) {
\r
5101 boolean result = object != null;
\r
5103 procedure.execute(graph, result);
\r
5104 } catch (Throwable t2) {
\r
5105 Logger.defaultLogError(t2);
\r
5107 // impl.state.barrier.dec();
\r
5111 public void exception(ReadGraphImpl graph, Throwable t) {
\r
5113 procedure.exception(graph, t);
\r
5114 } catch (Throwable t2) {
\r
5115 Logger.defaultLogError(t2);
\r
5117 // impl.state.barrier.dec();
\r
5125 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
\r
5127 assert(subject != null);
\r
5128 assert(procedure != null);
\r
5130 final ListenerBase listener = getListenerBase(procedure);
\r
5132 // impl.state.barrier.inc();
\r
5134 OrderedSet.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
\r
5137 public void exception(ReadGraphImpl graph, Throwable t) {
\r
5139 procedure.exception(graph, t);
\r
5140 } catch (Throwable t2) {
\r
5141 Logger.defaultLogError(t2);
\r
5143 // impl.state.barrier.dec();
\r
5147 public void execute(ReadGraphImpl graph, int i) {
\r
5149 procedure.execute(graph, querySupport.getResource(i));
\r
5150 } catch (Throwable t2) {
\r
5151 Logger.defaultLogError(t2);
\r
5156 public void finished(ReadGraphImpl graph) {
\r
5158 procedure.finished(graph);
\r
5159 } catch (Throwable t2) {
\r
5160 Logger.defaultLogError(t2);
\r
5162 // impl.state.barrier.dec();
\r
5170 final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) {
\r
5172 assert(request != null);
\r
5173 assert(procedure != null);
\r
5175 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(request, "#" + request.toString() + ".1999");
\r
5176 // else impl.state.barrier.inc(null, null);
\r
5178 runAsyncRead(impl, request, parent, listener, procedure);
\r
5183 final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
\r
5185 assert(graph != null);
\r
5186 assert(request != null);
\r
5188 final ReadEntry entry = readMap.get(request);
\r
5189 if(entry != null && entry.isReady()) {
\r
5190 return (T)entry.get(graph, this, null);
\r
5192 return request.perform(graph);
\r
5197 final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
\r
5199 assert(graph != null);
\r
5200 assert(request != null);
\r
5202 final ExternalReadEntry<T> entry = externalReadMap.get(request);
\r
5203 if(entry != null && entry.isReady()) {
\r
5204 if(entry.isExcepted()) {
\r
5205 Throwable t = (Throwable)entry.getResult();
\r
5206 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
5207 else throw new DatabaseException(t);
\r
5209 return (T)entry.getResult();
\r
5213 final DataContainer<T> result = new DataContainer<T>();
\r
5214 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
5216 request.register(graph, new Listener<T>() {
\r
5219 public void exception(Throwable t) {
\r
5224 public void execute(T t) {
\r
5229 public boolean isDisposed() {
\r
5235 Throwable t = exception.get();
\r
5237 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
5238 else throw new DatabaseException(t);
\r
5241 return result.get();
\r
5248 final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
\r
5250 assert(graph != null);
\r
5251 assert(request != null);
\r
5253 final AsyncReadEntry entry = asyncReadMap.get(request);
\r
5254 if(entry != null && entry.isReady()) {
\r
5255 if(entry.isExcepted()) {
\r
5256 procedure.exception(graph, (Throwable)entry.getResult());
\r
5258 procedure.execute(graph, (T)entry.getResult());
\r
5261 request.perform(graph, procedure);
\r
5267 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
\r
5269 assert(request != null);
\r
5270 assert(procedure != null);
\r
5272 // impl.state.barrier.inc(null, null);
\r
5274 queryMultiRead(impl, request, parent, listener, procedure);
\r
5279 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
\r
5281 assert(request != null);
\r
5282 assert(procedure != null);
\r
5284 // impl.state.barrier.inc();
\r
5286 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
\r
5288 public void execute(AsyncReadGraph graph, T result) {
\r
5291 procedure.execute(graph, result);
\r
5292 } catch (Throwable t2) {
\r
5293 Logger.defaultLogError(t2);
\r
5298 public void finished(AsyncReadGraph graph) {
\r
5301 procedure.finished(graph);
\r
5302 } catch (Throwable t2) {
\r
5303 Logger.defaultLogError(t2);
\r
5306 // impl.state.barrier.dec();
\r
5311 public String toString() {
\r
5312 return procedure.toString();
\r
5316 public void exception(AsyncReadGraph graph, Throwable t) {
\r
5319 procedure.exception(graph, t);
\r
5320 } catch (Throwable t2) {
\r
5321 Logger.defaultLogError(t2);
\r
5324 // impl.state.barrier.dec();
\r
5333 final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {
\r
5335 assert(request != null);
\r
5336 assert(procedure != null);
\r
5338 queryPrimitiveRead(impl, request, parent, listener, new Procedure<T>() {
\r
5341 public void execute(T result) {
\r
5343 procedure.execute(result);
\r
5344 } catch (Throwable t2) {
\r
5345 Logger.defaultLogError(t2);
\r
5350 public String toString() {
\r
5351 return procedure.toString();
\r
5355 public void exception(Throwable t) {
\r
5357 procedure.exception(t);
\r
5358 } catch (Throwable t2) {
\r
5359 Logger.defaultLogError(t2);
\r
5368 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
\r
5370 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
\r
5375 public VirtualGraph getProvider(Resource subject, Resource predicate) {
\r
5377 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
\r
5382 public VirtualGraph getValueProvider(Resource subject) {
\r
5384 return querySupport.getValueProvider(querySupport.getId(subject));
\r
5388 public boolean resumeTasks(ReadGraphImpl graph) {
\r
5390 return querySupport.resume(graph);
\r
5394 public boolean isImmutable(int resourceId) {
\r
5395 return querySupport.isImmutable(resourceId);
\r
5398 public boolean isImmutable(Resource resource) {
\r
5399 ResourceImpl impl = (ResourceImpl)resource;
\r
5400 return isImmutable(impl.id);
\r
5403 private Layer0 L0;
\r
5405 public Layer0 getL0(ReadGraph graph) {
\r
5407 L0 = Layer0.getInstance(graph);
\r