1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.query;
14 import java.io.BufferedOutputStream;
16 import java.io.FileOutputStream;
17 import java.io.IOException;
18 import java.io.PrintStream;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.IdentityHashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.locks.Condition;
35 import java.util.concurrent.locks.ReentrantLock;
37 import org.simantics.databoard.Bindings;
38 import org.simantics.db.AsyncReadGraph;
39 import org.simantics.db.DevelopmentKeys;
40 import org.simantics.db.DirectStatements;
41 import org.simantics.db.ReadGraph;
42 import org.simantics.db.RelationInfo;
43 import org.simantics.db.Resource;
44 import org.simantics.db.Session;
45 import org.simantics.db.Statement;
46 import org.simantics.db.VirtualGraph;
47 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
48 import org.simantics.db.common.utils.Logger;
49 import org.simantics.db.debug.ListenerReport;
50 import org.simantics.db.exception.DatabaseException;
51 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
52 import org.simantics.db.exception.NoInverseException;
53 import org.simantics.db.exception.ResourceNotFoundException;
54 import org.simantics.db.impl.DebugPolicy;
55 import org.simantics.db.impl.ResourceImpl;
56 import org.simantics.db.impl.graph.MultiIntProcedure;
57 import org.simantics.db.impl.graph.ReadGraphImpl;
58 import org.simantics.db.impl.graph.ReadGraphSupport;
59 import org.simantics.db.impl.graph.WriteGraphImpl;
60 import org.simantics.db.impl.procedure.IntProcedureAdapter;
61 import org.simantics.db.impl.procedure.InternalProcedure;
62 import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
63 import org.simantics.db.impl.support.ResourceSupport;
64 import org.simantics.db.procedure.AsyncMultiListener;
65 import org.simantics.db.procedure.AsyncMultiProcedure;
66 import org.simantics.db.procedure.AsyncProcedure;
67 import org.simantics.db.procedure.AsyncSetListener;
68 import org.simantics.db.procedure.Listener;
69 import org.simantics.db.procedure.ListenerBase;
70 import org.simantics.db.procedure.MultiProcedure;
71 import org.simantics.db.procedure.Procedure;
72 import org.simantics.db.procedure.StatementProcedure;
73 import org.simantics.db.request.AsyncMultiRead;
74 import org.simantics.db.request.AsyncRead;
75 import org.simantics.db.request.ExternalRead;
76 import org.simantics.db.request.MultiRead;
77 import org.simantics.db.request.Read;
78 import org.simantics.db.request.RequestFlags;
79 import org.simantics.db.request.WriteTraits;
80 import org.simantics.layer0.Layer0;
81 import org.simantics.utils.DataContainer;
82 import org.simantics.utils.Development;
83 import org.simantics.utils.datastructures.Pair;
84 import org.simantics.utils.datastructures.collections.CollectionUtils;
85 import org.simantics.utils.datastructures.disposable.AbstractDisposable;
87 import gnu.trove.map.hash.THashMap;
88 import gnu.trove.procedure.TIntProcedure;
89 import gnu.trove.procedure.TLongProcedure;
90 import gnu.trove.procedure.TObjectProcedure;
91 import gnu.trove.set.hash.THashSet;
92 import gnu.trove.set.hash.TIntHashSet;
94 @SuppressWarnings({"rawtypes", "unchecked"})
95 final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
97 final public QueryCache cache = new QueryCache();
99 public static int indent = 0;
104 // Garbage collection
106 public int boundQueries = 0;
109 private int hits = 0;
111 private int misses = 0;
113 private int updates = 0;
115 final private int functionalRelation;
117 final private int superrelationOf;
119 final private int instanceOf;
121 final private int inverseOf;
123 final private int asserts;
125 final private int hasPredicate;
127 final private int hasPredicateInverse;
129 final private int hasObject;
131 final private int inherits;
133 final private int subrelationOf;
135 final private int rootLibrary;
138 * A cache for the root library resource. Initialized in
139 * {@link #getRootLibraryResource()}.
141 private volatile ResourceImpl rootLibraryResource;
143 final private int library;
145 final private int consistsOf;
147 final private int hasName;
149 AtomicInteger sleepers = new AtomicInteger(0);
151 private boolean updating = false;
153 static public boolean collecting = false;
155 private boolean firingListeners = false;
157 final public QuerySupport querySupport;
158 final public Session session;
159 final public ResourceSupport resourceSupport;
161 private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
163 QueryThread[] executors;
165 public ArrayList<SessionTask>[] queues;
169 INIT, RUN, SLEEP, DISPOSED
173 public ThreadState[] threadStates;
174 public ReentrantLock[] threadLocks;
175 public Condition[] threadConditions;
177 public ArrayList<SessionTask>[] ownTasks;
179 public ArrayList<SessionTask>[] ownSyncTasks;
181 ArrayList<SessionTask>[] delayQueues;
183 public boolean synch = true;
185 final Object querySupportLock;
187 public Long modificationCounter = 0L;
189 public void close() {
192 final public void scheduleOwn(int caller, SessionTask request) {
193 ownTasks[caller].add(request);
196 final public void scheduleAlways(int caller, SessionTask request) {
198 int performer = request.thread;
199 if(caller == performer) {
200 ownTasks[caller].add(request);
202 schedule(caller, request);
207 final public void schedule(int caller, SessionTask request) {
209 int performer = request.thread;
211 if(DebugPolicy.SCHEDULE)
212 System.out.println("schedule " + request + " " + caller + " -> " + performer);
214 assert(performer >= 0);
216 assert(request != null);
218 if(caller == performer) {
221 ReentrantLock queueLock = threadLocks[performer];
223 queues[performer].add(request);
224 // This thread could have been sleeping
225 if(queues[performer].size() == 1) {
226 if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
227 threadConditions[performer].signalAll();
237 final public int THREAD_MASK;
238 final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
240 public static abstract class SessionTask {
242 final public int thread;
243 final public int syncCaller;
244 final public Object object;
246 public SessionTask(WriteTraits object, int thread) {
247 this.thread = thread;
248 this.syncCaller = -1;
249 this.object = object;
252 public SessionTask(Object object, int thread, int syncCaller) {
253 this.thread = thread;
254 this.syncCaller = syncCaller;
255 this.object = object;
258 public abstract void run(int thread);
261 public String toString() {
262 return "SessionTask[" + object + "]";
267 public static abstract class SessionRead extends SessionTask {
269 final public Semaphore notify;
270 final public DataContainer<Throwable> throwable;
272 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
273 super(object, thread, thread);
274 this.throwable = throwable;
275 this.notify = notify;
278 public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
279 super(object, thread, syncThread);
280 this.throwable = throwable;
281 this.notify = notify;
286 long waitingTime = 0;
289 static int koss2 = 0;
291 public boolean resume(ReadGraphImpl graph) {
292 return executors[0].runSynchronized();
295 public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
296 throws DatabaseException {
299 THREAD_MASK = threads - 1;
302 session = querySupport.getSession();
303 resourceSupport = querySupport.getSupport();
304 querySupportLock = core.getLock();
306 executors = new QueryThread[THREADS];
307 queues = new ArrayList[THREADS];
308 threadLocks = new ReentrantLock[THREADS];
309 threadConditions = new Condition[THREADS];
310 threadStates = new ThreadState[THREADS];
311 ownTasks = new ArrayList[THREADS];
312 ownSyncTasks = new ArrayList[THREADS];
313 delayQueues = new ArrayList[THREADS * THREADS];
315 // freeSchedule = new AtomicInteger(0);
317 for (int i = 0; i < THREADS * THREADS; i++) {
318 delayQueues[i] = new ArrayList<SessionTask>();
321 for (int i = 0; i < THREADS; i++) {
323 // tasks[i] = new ArrayList<Runnable>();
324 ownTasks[i] = new ArrayList<SessionTask>();
325 ownSyncTasks[i] = new ArrayList<SessionTask>();
326 queues[i] = new ArrayList<SessionTask>();
327 threadLocks[i] = new ReentrantLock();
328 threadConditions[i] = threadLocks[i].newCondition();
329 // limits[i] = false;
330 threadStates[i] = ThreadState.INIT;
334 for (int i = 0; i < THREADS; i++) {
338 executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
340 threadSet.add(executors[i]);
345 for (int i = 0; i < THREADS; i++) {
346 executors[i].start();
349 // Make sure that query threads are up and running
350 while(sleepers.get() != THREADS) {
353 } catch (InterruptedException e) {
358 rootLibrary = core.getBuiltin("http:/");
359 boolean builtinsInstalled = rootLibrary != 0;
361 if (builtinsInstalled) {
362 functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
363 assert (functionalRelation != 0);
365 functionalRelation = 0;
367 if (builtinsInstalled) {
368 instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
369 assert (instanceOf != 0);
373 if (builtinsInstalled) {
374 inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
375 assert (inverseOf != 0);
380 if (builtinsInstalled) {
381 inherits = core.getBuiltin(Layer0.URIs.Inherits);
382 assert (inherits != 0);
386 if (builtinsInstalled) {
387 asserts = core.getBuiltin(Layer0.URIs.Asserts);
388 assert (asserts != 0);
392 if (builtinsInstalled) {
393 hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
394 assert (hasPredicate != 0);
398 if (builtinsInstalled) {
399 hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
400 assert (hasPredicateInverse != 0);
402 hasPredicateInverse = 0;
404 if (builtinsInstalled) {
405 hasObject = core.getBuiltin(Layer0.URIs.HasObject);
406 assert (hasObject != 0);
410 if (builtinsInstalled) {
411 subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
412 assert (subrelationOf != 0);
416 if (builtinsInstalled) {
417 superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
418 assert (superrelationOf != 0);
422 if (builtinsInstalled) {
423 library = core.getBuiltin(Layer0.URIs.Library);
424 assert (library != 0);
428 if (builtinsInstalled) {
429 consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
430 assert (consistsOf != 0);
434 if (builtinsInstalled) {
435 hasName = core.getBuiltin(Layer0.URIs.HasName);
436 assert (hasName != 0);
442 final public void releaseWrite(ReadGraphImpl graph) {
443 performDirtyUpdates(graph);
444 modificationCounter++;
447 final public int getId(final Resource r) {
448 return querySupport.getId(r);
451 public QuerySupport getCore() {
455 public int getFunctionalRelation() {
456 return functionalRelation;
459 public int getInherits() {
463 public int getInstanceOf() {
467 public int getInverseOf() {
471 public int getSubrelationOf() {
472 return subrelationOf;
475 public int getSuperrelationOf() {
476 return superrelationOf;
479 public int getAsserts() {
483 public int getHasPredicate() {
487 public int getHasPredicateInverse() {
488 return hasPredicateInverse;
491 public int getHasObject() {
495 public int getRootLibrary() {
499 public Resource getRootLibraryResource() {
500 if (rootLibraryResource == null) {
501 // Synchronization is not needed here, it doesn't matter if multiple
502 // threads simultaneously set rootLibraryResource once.
503 int root = getRootLibrary();
505 throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
506 this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
508 return rootLibraryResource;
511 public int getLibrary() {
515 public int getConsistsOf() {
519 public int getHasName() {
523 public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
525 URIToResource.queryEach(graph, id, parent, null, new InternalProcedure<Integer>() {
528 public void execute(ReadGraphImpl graph, Integer result) {
530 if (result != null && result != 0) {
531 procedure.execute(graph, result);
535 // Fall back to using the fixed builtins.
536 result = querySupport.getBuiltin(id);
538 procedure.execute(graph, result);
543 result = querySupport.getRandomAccessReference(id);
544 } catch (ResourceNotFoundException e) {
545 procedure.exception(graph, e);
550 procedure.execute(graph, result);
552 procedure.exception(graph, new ResourceNotFoundException(id));
558 public void exception(ReadGraphImpl graph, Throwable t) {
559 procedure.exception(graph, t);
566 public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
568 Integer result = querySupport.getBuiltin(id);
570 procedure.execute(graph, result);
572 procedure.exception(graph, new ResourceNotFoundException(id));
577 public final <T> void runAsyncRead(final ReadGraphImpl graph, final AsyncRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) {
580 cache.runQuery(graph, query, parent, listener, procedure);
581 } catch (DatabaseException e) {
582 throw new IllegalStateException(e);
585 // int hash = requestHash(query);
587 // AsyncReadEntry<T> entry = asyncReadMap.get(query, hash);
589 // if(parent == null && listener == null) {
590 // if(entry != null && (entry.isReady() || entry.isExcepted())) {
591 // System.out.println("ready " + query);
592 // entry.performFromCache(graph, this, procedure);
593 //// graph.state.barrier.dec(query);
596 // query.perform(graph, procedure);
597 //// graph.state.barrier.dec(query);
602 // if(entry == null) {
604 // entry = new AsyncReadEntry<T>(query);
605 // entry.setPending();
606 // entry.clearResult(querySupport);
607 // asyncReadMap.put(query, entry, hash);
609 // performForEach(graph, query, entry, parent, listener, procedure, false);
613 // if(entry.isPending()) {
614 // synchronized(entry) {
615 // if(entry.isPending()) {
616 // throw new IllegalStateException();
617 // // final AsyncBarrierImpl parentBarrier = graph.state.barrier;
618 // // if(entry.procs == null) entry.procs = new ArrayList<AsyncProcedure<T>>();
619 // // entry.procs.add(new AsyncProcedure<T>() {
622 // // public void execute(AsyncReadGraph graph, T result) {
623 // // procedure.execute(graph, result);
624 // // parentBarrier.dec(query);
628 // // public void exception(AsyncReadGraph graph, Throwable throwable) {
629 // // procedure.exception(graph, throwable);
630 // // parentBarrier.dec(query);
634 //// if(graph.parent != null || listener != null) {
635 //// registerDependencies(graph, entry, parent, listener, procedure, false);
638 //// query.perform(graph, procedure);
646 // if(entry.isReady()) {
647 // entry.performFromCache(graph, this, procedure);
648 // registerDependencies(graph, entry, parent, listener, procedure, false);
650 // performForEach(graph, query, entry, parent, listener, procedure, false);
658 final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
661 cache.runQuery(graph, query, parent, listener, procedure);
662 } catch (DatabaseException e) {
663 throw new IllegalStateException(e);
666 // MultiReadEntry entry = cached != null ? cached : provider.multiReadMap.get(query);
667 // if(entry == null) {
669 // entry = new MultiReadEntry(query);
670 // entry.setPending();
671 // entry.clearResult(provider.querySupport);
673 // provider.multiReadMap.put(query, entry);
675 // provider.performForEach(graph, query, entry, parent, listener, procedure, false);
679 // if(entry.isPending()) {
681 // synchronized(entry) {
683 // if(entry.isPending()) {
684 // throw new IllegalStateException();
686 //// if(entry.procs == null) entry.procs = new ArrayList<Pair<AsyncMultiProcedure<T>, AsyncBarrier>>();
687 //// entry.procs.add(new Pair(procedure, parentBarrier));
688 //// if(graph.parent != null || listener != null) {
689 //// provider.registerDependencies(graph, entry, parent, listener, procedure, false);
692 // // If this was synchronized we must wait here until completion
693 // // if(graph.state.synchronizedExecution) {
694 // // while(entry.isPending()) {
695 // // graph.resumeTasks(graph.callerThread, null, null);
704 // entry.performFromCache(graph, provider, procedure);
705 //// graph.state.barrier.dec(query);
710 // provider.performForEach(graph, query, entry, parent, listener, procedure, false);
718 public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
722 cache.runQuery(graph, query, parent, listener, procedure);
723 } catch (DatabaseException e) {
724 throw new IllegalStateException(e);
728 // int hash = requestHash(query);
730 // AsyncMultiReadEntry entry = asyncMultiReadMap.get(query, hash);
732 // if(parent == null && listener == null) {
733 // if(entry != null && (entry.isReady() || entry.isExcepted())) {
734 // System.out.println("ready " + query);
735 // entry.performFromCache(graph, this, procedure);
738 // query.perform(graph, procedure);
743 // if(entry == null) {
745 // entry = new AsyncMultiReadEntry<T>(query);
746 // entry.setPending();
747 // entry.clearResult(querySupport);
749 // asyncMultiReadMap.put(query, entry, hash);
751 // performForEach(graph, query, entry, parent, listener, procedure, false);
755 // if(entry.isPending()) {
757 // synchronized(entry) {
758 // if(entry.isPending()) {
759 // throw new IllegalStateException();
760 //// if(entry.procs == null) entry.procs = new ArrayList<AsyncMultiProcedure<T>>();
761 //// entry.procs.add(procedure);
762 //// if(graph.parent != null || listener != null) {
763 //// registerDependencies(graph, entry, parent, listener, procedure, false);
770 // performForEach(graph, query, entry, parent, listener, procedure, false);
776 final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final Procedure<T> procedure) throws DatabaseException {
778 cache.runQuery(graph, query, parent, listener, procedure);
780 // final ExternalReadEntry<T> entry = cached != null ? cached : provider.externalReadMap.get(query);
781 // if(entry == null) {
782 // provider.performForEach(graph, query, new ExternalReadEntry<T>(query), parent, listener, procedure, false);
784 // if(entry.isPending()) {
785 // synchronized(entry) {
786 // if(entry.isPending()) {
787 // throw new IllegalStateException();
788 //// if(entry.procs == null) entry.procs = new ArrayList<Procedure<T>>();
789 //// entry.procs.add(procedure);
794 // provider.performForEach(graph, query, entry, parent, listener, procedure, false);
799 public int requestHash(Object object) {
801 return object.hashCode();
802 } catch (Throwable t) {
803 Logger.defaultLogError(t);
809 public <T> T queryRead(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws Throwable {
811 return (T)cache.runQuery(graph, query, parent, listener, procedure);
813 // assert(query != null);
815 // ReadEntry entry = readMap.get(query);
817 // if(entry != null) {
818 // if(parent == null && (listener == null || listener.isDisposed()) && entry.isReady()) {
819 // return (T)entry.get(graph, this, procedure);
820 // } else if (entry.isPending()) {
821 // throw new IllegalStateException();
825 // if(entry == null) {
827 // entry = new ReadEntry(query);
828 // entry.setPending();
829 // entry.clearResult(querySupport);
831 // readMap.put(query, entry);
833 // return (T)performForEach(graph, query, entry, parent, listener, procedure, false);
837 // if(entry.isPending()) {
838 // throw new IllegalStateException();
840 // return (T)performForEach(graph, query, entry, parent, listener, procedure, false);
847 public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
849 cache.runQuery(graph, query, parent, listener, procedure);
851 // assert(query != null);
852 // assert(procedure != null);
854 // final MultiReadEntry entry = multiReadMap.get(query);
856 // if(parent == null && !(listener != null)) {
857 // if(entry != null && entry.isReady()) {
858 // entry.performFromCache(graph, this, procedure);
863 // runMultiRead(graph, entry, query, parent, this, listener, procedure);
867 public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final Procedure<T> procedure) throws DatabaseException {
869 cache.runQuery(graph, query, parent, listener, procedure);
871 // assert(query != null);
872 // assert(procedure != null);
874 // final ExternalReadEntry entry = externalReadMap.get(query);
876 // if(parent == null && !(listener != null)) {
877 // if(entry != null && entry.isReady()) {
878 // entry.performFromCache(procedure);
883 // runPrimitiveRead(graph, entry, query, parent, this, listener, procedure);
887 // public <T> void performForEach(ReadGraphImpl parentGraph, final AsyncRead<T> query, final AsyncReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final AsyncProcedure<T> procedure,
888 // boolean inferredDependency) {
890 // if (DebugPolicy.PERFORM)
891 // System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
894 // assert (!collecting);
896 // assert(!entry.isDiscarded());
898 // final ListenerEntry listenerEntry = registerDependencies(parentGraph, entry, parent, base, procedure, inferredDependency);
900 // // FRESH, REFUTED, EXCEPTED go here
901 // if (!entry.isReady()) {
903 // entry.setPending();
909 // final ReadGraphImpl finalParentGraph = parentGraph;
911 // query.perform(parentGraph.withParent(entry), new AsyncProcedure<T>() {
914 // public void execute(AsyncReadGraph returnGraph, T result) {
915 // ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
916 // //AsyncReadGraph resumeGraph = finalParentGraph.newAsync();
917 // entry.addOrSet(finalParentGraph, result);
918 // if(listenerEntry != null) {
919 // primeListenerEntry(listenerEntry, result);
922 // procedure.execute(finalParentGraph, result);
923 // } catch (Throwable t) {
924 // t.printStackTrace();
926 //// parentBarrier.dec(query);
930 // public void exception(AsyncReadGraph returnGraph, Throwable t) {
931 // ReadGraphImpl impl = (ReadGraphImpl)returnGraph;
932 //// AsyncReadGraph resumeGraph = finalParentGraph.newAsync();
933 // entry.except(finalParentGraph, t);
935 // procedure.exception(finalParentGraph, t);
936 // } catch (Throwable t2) {
937 // t2.printStackTrace();
939 //// parentBarrier.dec(query);
943 // public String toString() {
944 // return procedure.toString();
949 // } catch (Throwable t) {
953 // procedure.exception(parentGraph, t);
954 // } catch (Throwable t2) {
955 // t2.printStackTrace();
957 //// parentBarrier.dec(query);
965 // entry.performFromCache(parentGraph, this, new AsyncProcedure<T>() {
968 // public void exception(AsyncReadGraph graph, Throwable throwable) {
969 // procedure.exception(graph, throwable);
973 // public void execute(AsyncReadGraph graph, T result) {
974 // procedure.execute(graph, result);
975 // if(listenerEntry != null) {
976 // primeListenerEntry(listenerEntry, result);
982 //// parentBarrier.dec(query);
988 // assert (!entry.isDiscarded());
992 // public <T> T performForEach(final ReadGraphImpl graph, final Read<T> query, final ReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure,
993 // boolean inferredDependency) throws Throwable {
995 // if (DebugPolicy.PERFORM)
996 // System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
999 // assert (!collecting);
1001 // entry.assertNotDiscarded();
1003 // if(entry.isReady()) {
1005 // // EXCEPTED goes here
1007 //// if(procedure != null) entry.performFromCache(graph, this, procedure);
1008 //// parentBarrier.dec(query);
1011 // ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
1013 // T result = (T)entry.get(graph, this, procedure);
1015 // if(listenerEntry != null) primeListenerEntry(listenerEntry, result);
1021 // // FRESH, REFUTED, PENDING go here
1023 // entry.setPending();
1028 // ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
1030 // final ReadGraphImpl performGraph = graph.newSync(entry);
1034 // if(Development.DEVELOPMENT)
1035 // Development.recordHistogram("run " + query);
1037 // T result = query.perform(performGraph);
1038 // entry.addOrSet(performGraph, result);
1040 // if(listenerEntry != null) primeListenerEntry(listenerEntry, result);
1042 // return (T)entry.get(graph, this, procedure);
1044 // } catch (Throwable t) {
1047 // return (T)entry.get(graph, this, procedure);
1055 // public <T> void performForEach(final ReadGraphImpl graph, final MultiRead<T> query, final MultiReadEntry<T> entry, CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,
1056 // boolean inferredDependency) {
1058 // if (DebugPolicy.PERFORM)
1059 // System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1062 // assert (!collecting);
1064 // assert(!entry.isPending());
1065 // assert(!entry.isDiscarded());
1067 // // FRESH, REFUTED, EXCEPTED go here
1068 // if (!entry.isReady()) {
1070 // entry.setPending();
1071 // entry.clearResult();
1073 // multiReadMap.put(query, entry);
1076 // final ReadGraphImpl newGraph = graph.newSync(entry);
1077 //// newGraph.state.barrier.inc();
1081 // query.perform(newGraph, new AsyncMultiProcedure<T>() {
1084 // public void execute(AsyncReadGraph graph, T result) {
1085 // entry.addOrSet(result);
1087 // procedure.execute(graph, result);
1088 // } catch (Throwable t) {
1089 // t.printStackTrace();
1094 // public void finished(AsyncReadGraph graph) {
1095 // entry.finish(graph);
1097 // procedure.finished(graph);
1098 // } catch (Throwable t) {
1099 // t.printStackTrace();
1101 //// newGraph.state.barrier.dec();
1102 //// parentBarrier.dec();
1106 // public void exception(AsyncReadGraph graph, Throwable t) {
1109 // procedure.exception(graph, t);
1110 // } catch (Throwable t2) {
1111 // t2.printStackTrace();
1113 //// newGraph.state.barrier.dec();
1114 //// parentBarrier.dec();
1119 // } catch (DatabaseException e) {
1123 // procedure.exception(graph, e);
1124 // } catch (Throwable t2) {
1125 // t2.printStackTrace();
1127 //// newGraph.state.barrier.dec();
1128 //// parentBarrier.dec();
1130 // } catch (Throwable t) {
1132 // DatabaseException e = new DatabaseException(t);
1136 // procedure.exception(graph, e);
1137 // } catch (Throwable t2) {
1138 // t2.printStackTrace();
1140 //// newGraph.state.barrier.dec();
1141 //// parentBarrier.dec();
1149 // entry.performFromCache(graph, this, procedure);
1155 // assert (!entry.isDiscarded());
1157 // registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);
1162 // public <T> void performForEach(final ReadGraphImpl callerGraph, AsyncMultiRead<T> query, final AsyncMultiReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,
1163 // boolean inferredDependency) {
1165 // if (DebugPolicy.PERFORM)
1166 // System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1169 // assert (!collecting);
1173 // assert(!entry.isDiscarded());
1175 // // FRESH, REFUTED, EXCEPTED go here
1176 // if (!entry.isReady()) {
1182 // ReadGraphImpl performGraph = callerGraph.withAsyncParent(entry);
1184 // query.perform(performGraph, new AsyncMultiProcedure<T>() {
1187 // public void execute(AsyncReadGraph graph, T result) {
1188 // ReadGraphImpl impl = (ReadGraphImpl)graph;
1189 //// ReadGraphImpl executeGraph = callerGraph.newAsync();
1190 // entry.addOrSet(result);
1192 // procedure.execute(callerGraph, result);
1193 // } catch (Throwable t) {
1194 // t.printStackTrace();
1199 // public void finished(AsyncReadGraph graph) {
1200 // ReadGraphImpl impl = (ReadGraphImpl)graph;
1201 //// ReadGraphImpl executeGraph = callerGraph.newAsync();
1202 // entry.finish(callerGraph);
1204 // procedure.finished(callerGraph);
1205 // } catch (Throwable t) {
1206 // t.printStackTrace();
1211 // public void exception(AsyncReadGraph graph, Throwable t) {
1212 // ReadGraphImpl impl = (ReadGraphImpl)graph;
1213 //// ReadGraphImpl executeGraph = callerGraph.newAsync();
1214 // entry.except(callerGraph, t);
1216 // procedure.exception(callerGraph, t);
1217 // } catch (Throwable t2) {
1218 // t2.printStackTrace();
1224 // } catch (Throwable t) {
1228 // procedure.exception(callerGraph, t);
1229 // } catch (Throwable t2) {
1230 // t2.printStackTrace();
1240 // entry.performFromCache(callerGraph, this, procedure);
1246 // assert (!entry.isDiscarded());
1248 // registerDependencies(callerGraph, entry, parent, listener, procedure, inferredDependency);
1250 // } catch (Throwable t) {
1252 // Logger.defaultLogError(t);
1260 // public <T> void performForEach(ReadGraphImpl graph, final ExternalRead<T> query, final ExternalReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final Procedure<T> procedure,
1261 // boolean inferredDependency) {
1263 // if (DebugPolicy.PERFORM)
1264 // System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1267 // assert (!collecting);
1269 // assert(!entry.isPending());
1270 // assert(!entry.isDiscarded());
1272 // registerDependencies(graph, entry, parent, base, procedure, inferredDependency);
1274 // // FRESH, REFUTED, EXCEPTED go here
1275 // if (!entry.isReady()) {
1277 // entry.setPending();
1278 // entry.clearResult(querySupport);
1280 // externalReadMap.put(query, entry);
1285 // query.register(graph, new Listener<T>() {
1287 // AtomicBoolean used = new AtomicBoolean(false);
1290 // public void execute(T result) {
1292 // // Just for safety
1293 // if(entry.isDiscarded()) return;
1294 // if(entry.isExcepted()) entry.setPending();
1296 // if(used.compareAndSet(false, true)) {
1297 // entry.addOrSet(QueryProcessor.this, result);
1298 // procedure.execute(result);
1300 // entry.queue(result);
1301 // updatePrimitive(query);
1307 // public void exception(Throwable t) {
1311 // if(used.compareAndSet(false, true)) {
1312 // procedure.exception(t);
1314 //// entry.queue(result);
1315 // updatePrimitive(query);
1321 // public String toString() {
1322 // return procedure.toString();
1326 // public boolean isDisposed() {
1327 // return entry.isDiscarded() || !isBound(entry);
1332 // } catch (Throwable t) {
1335 // procedure.exception(t);
1343 // entry.performFromCache(procedure);
1349 // assert (!entry.isDiscarded());
1353 boolean isBound(ExternalReadEntry<?> entry) {
1354 if(entry.hasParents()) return true;
1355 else if(hasListener(entry)) return true;
1359 synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
1361 if (parent != null && !inferred) {
1363 if(!child.isImmutable(graph))
1364 child.addParent(parent);
1365 } catch (DatabaseException e) {
1366 Logger.defaultLogError(e);
1368 if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
1371 if (listener != null) {
1372 return registerListener(child, listener, procedure);
1379 public <Procedure> void performForEach(ReadGraphImpl graph, BinaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {
1381 if (DebugPolicy.PERFORM)
1382 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1385 assert (!collecting);
1389 registerDependencies(graph, query, parent, listener, procedure, false);
1391 // FRESH, REFUTED, EXCEPTED go here
1392 if (!query.isReady()) {
1394 boolean fresh = query.isFresh();
1400 query.computeForEach(graph, this, procedure, true);
1406 query.performFromCache(graph, this, procedure);
1412 } catch (Throwable t) {
1414 Logger.defaultLogError(t);
1419 public <Procedure> Object performForEach(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {
1421 if (DebugPolicy.PERFORM)
1422 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1425 assert (!collecting);
1429 assert(query.assertNotDiscarded());
1431 registerDependencies(graph, query, parent, listener, procedure, false);
1433 // FRESH, REFUTED, EXCEPTED go here
1434 if (!query.isReady()) {
1439 return query.computeForEach(graph, this, procedure, true);
1446 return query.performFromCache(graph, this, procedure);
1450 } catch (Throwable t) {
1452 Logger.defaultLogError(t);
1459 static class Dummy implements InternalProcedure<Object>, IntProcedure {
1462 public void execute(ReadGraphImpl graph, int i) {
1466 public void finished(ReadGraphImpl graph) {
1470 public void execute(ReadGraphImpl graph, Object result) {
1474 public void exception(ReadGraphImpl graph, Throwable throwable) {
1479 private static final Dummy dummy = new Dummy();
1481 public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
1483 if (DebugPolicy.PERFORM)
1484 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1487 assert (!collecting);
1489 assert(query.assertNotDiscarded());
1491 registerDependencies(graph, query, parent, listener, procedure, false);
1493 // FRESH, REFUTED, EXCEPTED go here
1494 if (!query.isReady()) {
1499 query.computeForEach(graph, this, (Procedure)dummy, true);
1500 return query.get(graph, this, null);
1506 return query.get(graph, this, procedure);
1512 public <Procedure> void performForEach(ReadGraphImpl graph, StringQuery<Procedure> query, CacheEntry parent, final ListenerBase listener, Procedure procedure) {
1514 if (DebugPolicy.PERFORM)
1515 System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
1518 assert (!collecting);
1522 if(query.isDiscarded()) {
1523 System.err.println("aff");
1525 assert(!query.isDiscarded());
1527 // FRESH, REFUTED, EXCEPTED go here
1528 if (!query.isReady()) {
1530 query.computeForEach(graph.withAsyncParent(query), this, procedure);
1537 query.performFromCache(graph, this, procedure);
1543 assert (!query.isDiscarded());
1545 registerDependencies(graph, query, parent, listener, procedure, false);
1547 } catch (Throwable t) {
1549 t.printStackTrace();
1550 Logger.defaultLogError(t);
1556 interface QueryCollectorSupport {
1557 public CacheCollectionResult allCaches();
1558 public Collection<CacheEntry> getRootList();
1559 public int getCurrentSize();
1560 public int calculateCurrentSize();
1561 public CacheEntryBase iterate(int level);
1562 public void remove();
1563 public void setLevel(CacheEntryBase entry, int level);
1564 public boolean start(boolean flush);
1567 interface QueryCollector {
1569 public void collect(int youngTarget, int allowedTimeInMs);
1573 class QueryCollectorSupportImpl implements QueryCollectorSupport {
1575 private static final boolean DEBUG = false;
1576 private static final double ITERATION_RATIO = 0.2;
1578 private CacheCollectionResult iteration = new CacheCollectionResult();
1579 private boolean fresh = true;
1580 private boolean needDataInStart = true;
1582 QueryCollectorSupportImpl() {
1583 iteration.restart();
1586 public CacheCollectionResult allCaches() {
1587 CacheCollectionResult result = new CacheCollectionResult();
1588 QueryProcessor.this.allCaches(result);
1593 public boolean start(boolean flush) {
1594 // We need new data from query maps
1596 if(needDataInStart || flush) {
1597 // Last run ended after processing all queries => refresh data
1598 restart(flush ? 0.0 : ITERATION_RATIO);
1600 // continue with previous big data
1602 // Notify caller about iteration situation
1603 return iteration.isAtStart();
1606 private void restart(double targetRatio) {
1608 needDataInStart = true;
1610 long start = System.nanoTime();
1613 // We need new data from query maps
1615 int iterationSize = iteration.size()+1;
1616 int diff = calculateCurrentSize()-iterationSize;
1618 double ratio = (double)diff / (double)iterationSize;
1619 boolean dirty = Math.abs(ratio) >= targetRatio;
1622 iteration = allCaches();
1624 System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
1625 for(int i=0;i<CacheCollectionResult.LEVELS;i++)
1626 System.err.print(" " + iteration.levels[i].size());
1627 System.err.println("");
1630 iteration.restart();
1634 needDataInStart = false;
1636 // We are returning here within the same GC round - reuse the cache table
1637 iteration.restart();
1645 public CacheEntryBase iterate(int level) {
1647 CacheEntryBase entry = iteration.next(level);
1649 restart(ITERATION_RATIO);
1653 while(entry != null && entry.isDiscarded()) {
1654 entry = iteration.next(level);
1662 public void remove() {
1667 public void setLevel(CacheEntryBase entry, int level) {
1668 iteration.setLevel(entry, level);
1671 public Collection<CacheEntry> getRootList() {
1672 return cache.getRootList();
1676 public int calculateCurrentSize() {
1677 return cache.calculateCurrentSize();
1681 public int getCurrentSize() {
1686 // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
1688 private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
1689 private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
1691 public int querySize() {
1695 public void gc(int youngTarget, int allowedTimeInMs) {
1697 collector.collect(youngTarget, allowedTimeInMs);
1701 public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
1703 assert (entry != null);
1705 if (base.isDisposed())
1708 return addListener(entry, base, procedure);
1712 private void primeListenerEntry(final ListenerEntry entry, final Object result) {
1713 entry.setLastKnown(result);
1716 private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
1718 assert (entry != null);
1719 assert (procedure != null);
1721 ArrayList<ListenerEntry> list = cache.listeners.get(entry);
1723 list = new ArrayList<ListenerEntry>(1);
1724 cache.listeners.put(entry, list);
1727 ListenerEntry result = new ListenerEntry(entry, base, procedure);
1728 int currentIndex = list.indexOf(result);
1729 // There was already a listener
1730 if(currentIndex > -1) {
1731 ListenerEntry current = list.get(currentIndex);
1732 if(!current.base.isDisposed()) return null;
1733 list.set(currentIndex, result);
1738 if(DebugPolicy.LISTENER) {
1739 new Exception().printStackTrace();
1740 System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
1747 private void scheduleListener(ListenerEntry entry) {
1748 assert (entry != null);
1749 if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
1750 scheduledListeners.add(entry);
1753 private void removeListener(ListenerEntry entry) {
1754 assert (entry != null);
1755 ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
1756 if(list == null) return;
1757 boolean success = list.remove(entry);
1760 cache.listeners.remove(entry.entry);
1763 private boolean hasListener(CacheEntry entry) {
1764 if(cache.listeners.get(entry) != null) return true;
1768 boolean hasListenerAfterDisposing(CacheEntry entry) {
1769 if(cache.listeners.get(entry) != null) {
1770 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
1771 ArrayList<ListenerEntry> list = null;
1772 for (ListenerEntry e : entries) {
1773 if (e.base.isDisposed()) {
1774 if(list == null) list = new ArrayList<ListenerEntry>();
1779 for (ListenerEntry e : list) {
1783 if (entries.isEmpty()) {
1784 cache.listeners.remove(entry);
1792 List<ListenerEntry> getListenerEntries(CacheEntry entry) {
1793 hasListenerAfterDisposing(entry);
1794 if(cache.listeners.get(entry) != null)
1795 return cache.listeners.get(entry);
1797 return Collections.emptyList();
1800 void processListenerReport(CacheEntry entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
1802 if(!workarea.containsKey(entry)) {
1804 HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
1805 for(ListenerEntry e : getListenerEntries(entry))
1808 workarea.put(entry, ls);
1810 for(CacheEntry parent : entry.getParents(this)) {
1811 processListenerReport(parent, workarea);
1812 ls.addAll(workarea.get(parent));
1819 public synchronized ListenerReport getListenerReport() throws IOException {
1821 class ListenerReportImpl implements ListenerReport {
1823 Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
1826 public void print(PrintStream b) {
1827 Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
1828 for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
1829 for(ListenerBase l : e.getValue()) {
1830 Integer i = hist.get(l);
1831 hist.put(l, i != null ? i-1 : -1);
1835 for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
1836 b.print("" + -p.second + " " + p.first + "\n");
1844 ListenerReportImpl result = new ListenerReportImpl();
1846 Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
1847 for(CacheEntryBase entry : all) {
1848 hasListenerAfterDisposing(entry);
1850 for(CacheEntryBase entry : all) {
1851 processListenerReport(entry, result.workarea);
1858 public synchronized String reportListeners(File file) throws IOException {
1863 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1864 ListenerReport report = getListenerReport();
1867 return "Done reporting listeners.";
1871 void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
1873 if(entry.isDiscarded()) return;
1874 if(workarea.containsKey(entry)) return;
1876 Iterable<CacheEntry> parents = entry.getParents(this);
1877 HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
1878 for(CacheEntry e : parents) {
1879 if(e.isDiscarded()) continue;
1881 processParentReport(e, workarea);
1883 workarea.put(entry, ps);
1887 public synchronized String reportQueryActivity(File file) throws IOException {
1889 System.err.println("reportQueries " + file.getAbsolutePath());
1894 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1896 List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
1897 Collections.reverse(entries);
1899 for(Pair<String,Integer> entry : entries) {
1900 b.println(entry.first + ": " + entry.second);
1905 Development.histogram.clear();
1911 public synchronized String reportQueries(File file) throws IOException {
1913 System.err.println("reportQueries " + file.getAbsolutePath());
1918 PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
1920 long start = System.nanoTime();
1922 // ArrayList<CacheEntry> all = ;
1924 Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
1925 Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
1926 for(CacheEntryBase entry : caches) {
1927 processParentReport(entry, workarea);
1930 // for(CacheEntry e : all) System.err.println("entry: " + e);
1932 long duration = System.nanoTime() - start;
1933 System.err.println("Query root set in " + 1e-9*duration + "s.");
1935 start = System.nanoTime();
1937 HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
1941 for(CacheEntry entry : workarea.keySet()) {
1942 boolean listener = hasListenerAfterDisposing(entry);
1943 boolean hasParents = entry.getParents(this).iterator().hasNext();
1946 flagMap.put(entry, 0);
1947 } else if (!hasParents) {
1949 flagMap.put(entry, 1);
1952 flagMap.put(entry, 2);
1954 // // Write leaf bit
1955 // entry.flags |= 4;
1958 boolean done = true;
1965 long start2 = System.nanoTime();
1967 int boundCounter = 0;
1968 int unboundCounter = 0;
1969 int unknownCounter = 0;
1971 for(CacheEntry entry : workarea.keySet()) {
1973 //System.err.println("process " + entry);
1975 int flags = flagMap.get(entry);
1976 int bindStatus = flags & 3;
1978 if(bindStatus == 0) boundCounter++;
1979 else if(bindStatus == 1) unboundCounter++;
1980 else if(bindStatus == 2) unknownCounter++;
1982 if(bindStatus < 2) continue;
1985 for(CacheEntry parent : entry.getParents(this)) {
1987 if(parent.isDiscarded()) flagMap.put(parent, 1);
1989 int flags2 = flagMap.get(parent);
1990 int bindStatus2 = flags2 & 3;
1991 // Parent is bound => child is bound
1992 if(bindStatus2 == 0) {
1996 // Parent is unknown => child is unknown
1997 else if (bindStatus2 == 2) {
2004 flagMap.put(entry, newStatus);
2008 duration = System.nanoTime() - start2;
2009 System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
2010 b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
2012 } while(!done && loops++ < 20);
2016 for(CacheEntry entry : workarea.keySet()) {
2018 int bindStatus = flagMap.get(entry);
2019 if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
2025 duration = System.nanoTime() - start;
2026 System.err.println("Query analysis in " + 1e-9*duration + "s.");
2028 Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
2030 for(CacheEntry entry : workarea.keySet()) {
2031 Class<?> clazz = entry.getClass();
2032 if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
2033 else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
2034 else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
2035 else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
2036 else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
2037 Integer c = counts.get(clazz);
2038 if(c == null) counts.put(clazz, -1);
2039 else counts.put(clazz, c-1);
2042 b.print("// Simantics DB client query report file\n");
2043 b.print("// This file contains the following information\n");
2044 b.print("// -The amount of cached query instances per query class\n");
2045 b.print("// -The sizes of retained child sets\n");
2046 b.print("// -List of parents for each query (search for 'P <query name>')\n");
2047 b.print("// -Followed by status, where\n");
2048 b.print("// -0=bound\n");
2049 b.print("// -1=free\n");
2050 b.print("// -2=unknown\n");
2051 b.print("// -L=has listener\n");
2052 b.print("// -List of children for each query (search for 'C <query name>')\n");
2054 b.print("----------------------------------------\n");
2056 b.print("// Queries by class\n");
2057 for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
2058 b.print(-p.second + " " + p.first.getName() + "\n");
2061 Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
2062 for(CacheEntry e : workarea.keySet())
2065 boolean changed = true;
2067 while(changed && iter++<50) {
2071 Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
2072 for(CacheEntry e : workarea.keySet())
2075 for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
2076 Integer c = hist.get(e.getKey());
2077 for(CacheEntry p : e.getValue()) {
2078 Integer i = newHist.get(p);
2079 newHist.put(p, i+c);
2082 for(CacheEntry e : workarea.keySet()) {
2083 Integer value = newHist.get(e);
2084 Integer old = hist.get(e);
2085 if(!value.equals(old)) {
2087 // System.err.println("hist " + e + ": " + old + " => " + value);
2092 System.err.println("Retained set iteration " + iter);
2096 b.print("// Queries by retained set\n");
2097 for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
2098 b.print("" + -p.second + " " + p.first + "\n");
2101 HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
2103 b.print("// Entry parent listing\n");
2104 for(CacheEntry entry : workarea.keySet()) {
2105 int status = flagMap.get(entry);
2106 boolean hasListener = hasListenerAfterDisposing(entry);
2107 b.print("Q " + entry.toString());
2109 b.print(" (L" + status + ")");
2112 b.print(" (" + status + ")");
2115 for(CacheEntry parent : workarea.get(entry)) {
2116 Collection<CacheEntry> inv = inverse.get(parent);
2118 inv = new ArrayList<CacheEntry>();
2119 inverse.put(parent, inv);
2122 b.print(" " + parent.toString());
2127 b.print("// Entry child listing\n");
2128 for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
2129 b.print("C " + entry.getKey().toString());
2131 for(CacheEntry child : entry.getValue()) {
2132 Integer h = hist.get(child);
2136 b.print(" <no children>");
2138 b.print(" " + child.toString());
2143 b.print("#queries: " + workarea.keySet().size() + "\n");
2144 b.print("#listeners: " + listeners + "\n");
2148 return "Dumped " + workarea.keySet().size() + " queries.";
2154 public CacheEntry caller;
2156 public CacheEntry entry;
2160 public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
2161 this.caller = caller;
2163 this.indent = indent;
2168 boolean removeQuery(CacheEntry entry) {
2170 // This entry has been removed before. No need to do anything here.
2171 if(entry.isDiscarded()) return false;
2173 assert (!entry.isDiscarded());
2175 Query query = entry.getQuery();
2177 query.removeEntry(this);
2182 if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
2193 * @return true if this entry is being listened
2195 private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
2199 CacheEntry entry = e.entry;
2201 // System.err.println("updateQuery " + entry);
2204 * If the dependency graph forms a DAG, some entries are inserted in the
2205 * todo list many times. They only need to be processed once though.
2207 if (entry.isDiscarded()) {
2208 if (Development.DEVELOPMENT) {
2209 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2210 System.out.print("D");
2211 for (int i = 0; i < e.indent; i++)
2212 System.out.print(" ");
2213 System.out.println(entry.getQuery());
2216 // System.err.println(" => DISCARDED");
2220 if (entry.isRefuted()) {
2221 if (Development.DEVELOPMENT) {
2222 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2223 System.out.print("R");
2224 for (int i = 0; i < e.indent; i++)
2225 System.out.print(" ");
2226 System.out.println(entry.getQuery());
2232 if (entry.isExcepted()) {
2233 if (Development.DEVELOPMENT) {
2234 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2235 System.out.print("E");
2240 if (entry.isPending()) {
2241 if (Development.DEVELOPMENT) {
2242 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2243 System.out.print("P");
2250 if (Development.DEVELOPMENT) {
2251 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2252 System.out.print("U ");
2253 for (int i = 0; i < e.indent; i++)
2254 System.out.print(" ");
2255 System.out.print(entry.getQuery());
2259 Query query = entry.getQuery();
2260 int type = query.type();
2262 boolean hasListener = hasListener(entry);
2264 if (Development.DEVELOPMENT) {
2265 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2266 if(hasListener(entry)) {
2267 System.out.println(" (L)");
2269 System.out.println("");
2274 if(entry.isPending() || entry.isExcepted()) {
2277 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
2279 immediates.put(entry, entry);
2294 if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
2296 immediates.put(entry, entry);
2310 // System.err.println(" => FOO " + type);
2313 ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
2314 if(entries != null) {
2315 for (ListenerEntry le : entries) {
2316 scheduleListener(le);
2321 // If invalid, update parents
2322 if (type == RequestFlags.INVALIDATE) {
2323 updateParents(e.indent, entry, todo);
2330 private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
2332 Iterable<CacheEntry> oldParents = entry.getParents(this);
2333 for (CacheEntry parent : oldParents) {
2334 // System.err.println("updateParents " + entry + " => " + parent);
2335 if(!parent.isDiscarded())
2336 todo.push(new UpdateEntry(entry, parent, indent + 2));
2341 private boolean pruneListener(ListenerEntry entry) {
2342 if (entry.base.isDisposed()) {
2343 removeListener(entry);
2351 * @param av1 an array (guaranteed)
2352 * @param av2 any object
2353 * @return <code>true</code> if the two arrays are equal
2355 private final boolean arrayEquals(Object av1, Object av2) {
2358 Class<?> c1 = av1.getClass().getComponentType();
2359 Class<?> c2 = av2.getClass().getComponentType();
2360 if (c2 == null || !c1.equals(c2))
2362 boolean p1 = c1.isPrimitive();
2363 boolean p2 = c2.isPrimitive();
2367 return Arrays.equals((Object[]) av1, (Object[]) av2);
2368 if (boolean.class.equals(c1))
2369 return Arrays.equals((boolean[]) av1, (boolean[]) av2);
2370 else if (byte.class.equals(c1))
2371 return Arrays.equals((byte[]) av1, (byte[]) av2);
2372 else if (int.class.equals(c1))
2373 return Arrays.equals((int[]) av1, (int[]) av2);
2374 else if (long.class.equals(c1))
2375 return Arrays.equals((long[]) av1, (long[]) av2);
2376 else if (float.class.equals(c1))
2377 return Arrays.equals((float[]) av1, (float[]) av2);
2378 else if (double.class.equals(c1))
2379 return Arrays.equals((double[]) av1, (double[]) av2);
2380 throw new RuntimeException("??? Contact application querySupport.");
2385 final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
2389 Query query = entry.getQuery();
2391 if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
2393 entry.prepareRecompute(querySupport);
2395 ReadGraphImpl parentGraph = graph.withParent(entry);
2397 query.recompute(parentGraph, this, entry);
2399 if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
2401 Object newValue = entry.getResult();
2403 if (ListenerEntry.NO_VALUE == oldValue) {
2404 if(DebugPolicy.CHANGES) {
2405 System.out.println("C " + query);
2406 System.out.println("- " + oldValue);
2407 System.out.println("- " + newValue);
2412 boolean changed = false;
2414 if (newValue != null) {
2415 if (newValue.getClass().isArray()) {
2416 changed = !arrayEquals(newValue, oldValue);
2418 changed = !newValue.equals(oldValue);
2421 changed = (oldValue != null);
2423 if(DebugPolicy.CHANGES && changed) {
2424 System.out.println("C " + query);
2425 System.out.println("- " + oldValue);
2426 System.out.println("- " + newValue);
2429 return changed ? newValue : ListenerEntry.NOT_CHANGED;
2431 } catch (Throwable t) {
2433 Logger.defaultLogError(t);
2435 return ListenerEntry.NO_VALUE;
2441 public boolean hasScheduledUpdates() {
2442 return !scheduledListeners.isEmpty();
2445 public void performScheduledUpdates(WriteGraphImpl graph) {
2448 assert (!collecting);
2449 assert (!firingListeners);
2451 firingListeners = true;
2455 // Performing may cause further events to be scheduled.
2456 while (!scheduledListeners.isEmpty()) {
2459 // graph.state.barrier.inc();
2461 // Clone current events to make new entries possible during
2463 THashSet<ListenerEntry> entries = scheduledListeners;
2464 scheduledListeners = new THashSet<ListenerEntry>();
2466 ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
2468 for (ListenerEntry listenerEntry : entries) {
2470 if (pruneListener(listenerEntry)) {
2471 if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
2475 final CacheEntry entry = listenerEntry.entry;
2476 assert (entry != null);
2478 Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
2480 if (newValue != ListenerEntry.NOT_CHANGED) {
2481 if(DebugPolicy.LISTENER)
2482 System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
2483 schedule.add(listenerEntry);
2484 listenerEntry.setLastKnown(entry.getResult());
2489 for(ListenerEntry listenerEntry : schedule) {
2490 final CacheEntry entry = listenerEntry.entry;
2491 if(DebugPolicy.LISTENER)
2492 System.out.println("Firing " + listenerEntry.procedure);
2494 if(DebugPolicy.LISTENER)
2495 System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
2496 entry.performFromCache(graph, this, listenerEntry.procedure);
2497 } catch (Throwable t) {
2498 t.printStackTrace();
2502 // graph.state.barrier.dec();
2503 // graph.waitAsync(null);
2504 // graph.state.barrier.assertReady();
2509 firingListeners = false;
2516 * @return true if this entry still has listeners
2518 public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
2520 assert (!collecting);
2524 boolean hadListeners = false;
2525 boolean listenersUnknown = false;
2529 assert(entry != null);
2530 LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
2531 IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
2532 todo.add(new UpdateEntry(null, entry, 0));
2536 // Walk the tree and collect immediate updates
2537 while (!todo.isEmpty()) {
2538 UpdateEntry e = todo.pop();
2539 hadListeners |= updateQuery(e, todo, immediates);
2542 if(immediates.isEmpty()) break;
2544 // Evaluate all immediate updates and collect parents to update
2545 for(CacheEntry immediate : immediates.values()) {
2547 if(immediate.isDiscarded()) {
2551 if(immediate.isExcepted()) {
2553 Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
2554 if (newValue != ListenerEntry.NOT_CHANGED)
2555 updateParents(0, immediate, todo);
2559 Object oldValue = immediate.getResult();
2560 Object newValue = compareTo(graph, immediate, oldValue);
2562 if (newValue != ListenerEntry.NOT_CHANGED) {
2563 updateParents(0, immediate, todo);
2565 // If not changed, keep the old value
2566 immediate.setResult(oldValue);
2567 listenersUnknown = true;
2577 } catch (Throwable t) {
2578 Logger.defaultLogError(t);
2584 return hadListeners | listenersUnknown;
2588 volatile public boolean dirty = false;
2590 private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
2591 private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
2592 private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
2593 // Maybe use a mutex from util.concurrent?
2594 private Object primitiveUpdateLock = new Object();
2595 private THashSet scheduledPrimitiveUpdates = new THashSet();
2597 public void performDirtyUpdates(final ReadGraphImpl graph) {
2602 if (Development.DEVELOPMENT) {
2603 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2604 System.err.println("== Query update ==");
2608 // Special case - one statement
2609 if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
2611 long arg0 = scheduledObjectUpdates.getFirst();
2613 final int subject = (int)(arg0 >>> 32);
2614 final int predicate = (int)(arg0 & 0xffffffff);
2616 for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);
2617 for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);
2618 for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);
2620 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
2621 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);
2622 if(principalTypes != null) update(graph, principalTypes);
2623 Types types = Types.entry(QueryProcessor.this, subject);
2624 if(types != null) update(graph, types);
2627 if(predicate == subrelationOf) {
2628 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
2629 if(superRelations != null) update(graph, superRelations);
2632 DirectPredicates dp = DirectPredicates.entry(QueryProcessor.this, subject);
2633 if(dp != null) update(graph, dp);
2634 OrderedSet os = OrderedSet.entry(QueryProcessor.this, predicate);
2635 if(os != null) update(graph, os);
2637 scheduledObjectUpdates.clear();
2642 // Special case - one value
2643 if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
2645 int arg0 = scheduledValueUpdates.getFirst();
2647 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);
2648 if(valueQuery != null) update(graph, valueQuery);
2650 scheduledValueUpdates.clear();
2655 final TIntHashSet predicates = new TIntHashSet();
2656 final TIntHashSet orderedSets = new TIntHashSet();
2658 THashSet primitiveUpdates;
2659 synchronized (primitiveUpdateLock) {
2660 primitiveUpdates = scheduledPrimitiveUpdates;
2661 scheduledPrimitiveUpdates = new THashSet();
2664 primitiveUpdates.forEach(new TObjectProcedure() {
2667 public boolean execute(Object arg0) {
2669 ExternalReadEntry query = (ExternalReadEntry)cache.externalReadMap.get(arg0);
2670 if (query != null) {
2671 boolean listening = update(graph, query);
2672 if (!listening && !query.hasParents()) {
2673 cache.externalReadMap.remove(arg0);
2682 scheduledValueUpdates.forEach(new TIntProcedure() {
2685 public boolean execute(int arg0) {
2686 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);
2687 if(valueQuery != null) update(graph, valueQuery);
2693 scheduledInvalidates.forEach(new TIntProcedure() {
2696 public boolean execute(int resource) {
2698 ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, resource);
2699 if(valueQuery != null) update(graph, valueQuery);
2701 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, resource);
2702 if(principalTypes != null) update(graph, principalTypes);
2703 Types types = Types.entry(QueryProcessor.this, resource);
2704 if(types != null) update(graph, types);
2706 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
2707 if(superRelations != null) update(graph, superRelations);
2709 predicates.add(resource);
2716 scheduledObjectUpdates.forEach(new TLongProcedure() {
2719 public boolean execute(long arg0) {
2721 final int subject = (int)(arg0 >>> 32);
2722 final int predicate = (int)(arg0 & 0xffffffff);
2724 if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
2725 PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);
2726 if(principalTypes != null) update(graph, principalTypes);
2727 Types types = Types.entry(QueryProcessor.this, subject);
2728 if(types != null) update(graph, types);
2731 if(predicate == subrelationOf) {
2732 SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
2733 if(superRelations != null) update(graph, superRelations);
2736 predicates.add(subject);
2737 orderedSets.add(predicate);
2745 predicates.forEach(new TIntProcedure() {
2748 public boolean execute(final int subject) {
2750 for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);
2751 for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);
2752 for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);
2754 DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
2755 if(entry != null) update(graph, entry);
2763 orderedSets.forEach(new TIntProcedure() {
2766 public boolean execute(int orderedSet) {
2768 OrderedSet entry = OrderedSet.entry(QueryProcessor.this, orderedSet);
2769 if(entry != null) update(graph, entry);
2777 // for (Integer subject : predicates) {
2778 // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
2779 // if(entry != null) update(graph, entry);
2783 if (Development.DEVELOPMENT) {
2784 if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
2785 System.err.println("== Query update ends ==");
2789 scheduledValueUpdates.clear();
2790 scheduledObjectUpdates.clear();
2791 scheduledInvalidates.clear();
2795 public void updateValue(final int resource) {
2796 scheduledValueUpdates.add(resource);
2800 public void updateStatements(final int resource, final int predicate) {
2801 scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
2805 private int lastInvalidate = 0;
2807 public void invalidateResource(final int resource) {
2808 if(lastInvalidate == resource) return;
2809 scheduledValueUpdates.add(resource);
2810 lastInvalidate = resource;
2814 public void updatePrimitive(final ExternalRead primitive) {
2816 // External reads may be updated from arbitrary threads.
2817 // Synchronize to prevent race-conditions.
2818 synchronized (primitiveUpdateLock) {
2819 scheduledPrimitiveUpdates.add(primitive);
2821 querySupport.dirtyPrimitives();
2826 public synchronized String toString() {
2827 return "QueryProvider [size = " + size + ", hits = " + hits + " misses = " + misses + ", updates = " + updates + "]";
2831 protected void doDispose() {
2833 for(int index = 0; index < THREADS; index++) {
2834 executors[index].dispose();
2838 for(int i=0;i<100;i++) {
2840 boolean alive = false;
2841 for(int index = 0; index < THREADS; index++) {
2842 alive |= executors[index].isAlive();
2847 } catch (InterruptedException e) {
2848 Logger.defaultLogError(e);
2853 // Then start interrupting
2854 for(int i=0;i<100;i++) {
2856 boolean alive = false;
2857 for(int index = 0; index < THREADS; index++) {
2858 alive |= executors[index].isAlive();
2861 for(int index = 0; index < THREADS; index++) {
2862 executors[index].interrupt();
2866 // // Then just destroy
2867 // for(int index = 0; index < THREADS; index++) {
2868 // executors[index].destroy();
2871 for(int index = 0; index < THREADS; index++) {
2873 executors[index].join(5000);
2874 } catch (InterruptedException e) {
2875 Logger.defaultLogError("QueryThread " + index + " will not die.", e);
2877 executors[index] = null;
2882 public int getHits() {
2886 public int getMisses() {
2890 public int getSize() {
2894 public Set<Long> getReferencedClusters() {
2895 HashSet<Long> result = new HashSet<Long>();
2896 for (CacheEntry entry : cache.objectsMap.values()) {
2897 Objects query = (Objects) entry.getQuery();
2898 result.add(querySupport.getClusterId(query.r1()));
2900 for (CacheEntry entry : cache.directPredicatesMap.values()) {
2901 DirectPredicates query = (DirectPredicates) entry.getQuery();
2902 result.add(querySupport.getClusterId(query.id));
2904 for (CacheEntry entry : cache.valueMap.values()) {
2905 ValueQuery query = (ValueQuery) entry.getQuery();
2906 result.add(querySupport.getClusterId(query.id));
2911 public void assertDone() {
2914 CacheCollectionResult allCaches(CacheCollectionResult result) {
2916 return cache.allCaches(result);
2920 public void printDiagnostics() {
2923 public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
2924 querySupport.requestCluster(graph, clusterId, runnable);
2927 public int clean() {
2928 collector.collect(0, Integer.MAX_VALUE);
2932 public void clean(final Collection<ExternalRead<?>> requests) {
2933 QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
2934 Iterator<ExternalRead<?>> iterator = requests.iterator();
2936 public CacheCollectionResult allCaches() {
2937 throw new UnsupportedOperationException();
2940 public CacheEntryBase iterate(int level) {
2941 if(iterator.hasNext()) {
2942 ExternalRead<?> request = iterator.next();
2943 ExternalReadEntry entry = cache.externalReadMap.get(request);
2944 if (entry != null) return entry;
2945 else return iterate(level);
2947 iterator = requests.iterator();
2952 public void remove() {
2953 throw new UnsupportedOperationException();
2956 public void setLevel(CacheEntryBase entry, int level) {
2957 throw new UnsupportedOperationException();
2960 public Collection<CacheEntry> getRootList() {
2961 ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
2962 for (ExternalRead<?> request : requests) {
2963 ExternalReadEntry entry = cache.externalReadMap.get(request);
2970 public int getCurrentSize() {
2974 public int calculateCurrentSize() {
2975 // This tells the collector to attempt collecting everything.
2976 return Integer.MAX_VALUE;
2979 public boolean start(boolean flush) {
2983 new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
2986 public void scanPending() {
2988 cache.scanPending();
2992 public ReadGraphImpl graphForVirtualRequest() {
2993 return ReadGraphImpl.createAsync(this);
2997 private HashMap<Resource, Class<?>> builtinValues;
2999 public Class<?> getBuiltinValue(Resource r) {
3000 if(builtinValues == null) initBuiltinValues();
3001 return builtinValues.get(r);
3004 Exception callerException = null;
3006 public interface AsyncBarrier {
3009 // public void inc(String debug);
3010 // public void dec(String debug);
3013 // final public QueryProcessor processor;
3014 // final public QuerySupport support;
3016 // boolean disposed = false;
3018 private void initBuiltinValues() {
3020 Layer0 b = getSession().peekService(Layer0.class);
3021 if(b == null) return;
3023 builtinValues = new HashMap<Resource, Class<?>>();
3025 builtinValues.put(b.String, String.class);
3026 builtinValues.put(b.Double, Double.class);
3027 builtinValues.put(b.Float, Float.class);
3028 builtinValues.put(b.Long, Long.class);
3029 builtinValues.put(b.Integer, Integer.class);
3030 builtinValues.put(b.Byte, Byte.class);
3031 builtinValues.put(b.Boolean, Boolean.class);
3033 builtinValues.put(b.StringArray, String[].class);
3034 builtinValues.put(b.DoubleArray, double[].class);
3035 builtinValues.put(b.FloatArray, float[].class);
3036 builtinValues.put(b.LongArray, long[].class);
3037 builtinValues.put(b.IntegerArray, int[].class);
3038 builtinValues.put(b.ByteArray, byte[].class);
3039 builtinValues.put(b.BooleanArray, boolean[].class);
3043 // public ReadGraphSupportImpl(final QueryProcessor provider2) {
3045 // if (null == provider2) {
3046 // this.processor = null;
3050 // this.processor = provider2;
3051 // support = provider2.getCore();
3052 // initBuiltinValues();
3056 // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
3057 // return new ReadGraphSupportImpl(impl.processor);
3061 final public Session getSession() {
3065 final public ResourceSupport getResourceSupport() {
3066 return resourceSupport;
3070 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3072 assert(subject != null);
3073 assert(procedure != null);
3075 final ListenerBase listener = getListenerBase(procedure);
3077 IntProcedure ip = new IntProcedure() {
3079 AtomicBoolean first = new AtomicBoolean(true);
3082 public void execute(ReadGraphImpl graph, int i) {
3085 procedure.execute(graph, querySupport.getResource(i));
3087 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
3089 } catch (Throwable t2) {
3090 Logger.defaultLogError(t2);
3095 public void finished(ReadGraphImpl graph) {
3097 if(first.compareAndSet(true, false)) {
3098 procedure.finished(graph);
3099 // impl.state.barrier.dec(this);
3101 procedure.finished(impl.newRestart(graph));
3104 } catch (Throwable t2) {
3105 Logger.defaultLogError(t2);
3110 public void exception(ReadGraphImpl graph, Throwable t) {
3112 if(first.compareAndSet(true, false)) {
3113 procedure.exception(graph, t);
3114 // impl.state.barrier.dec(this);
3116 procedure.exception(impl.newRestart(graph), t);
3118 } catch (Throwable t2) {
3119 Logger.defaultLogError(t2);
3125 int sId = querySupport.getId(subject);
3127 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Predicates#" + sId);
3128 // else impl.state.barrier.inc(null, null);
3130 Predicates.queryEach(impl, sId, this, impl.parent, listener, ip);
3135 final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
3137 assert(subject != null);
3138 assert(procedure != null);
3140 final ListenerBase listener = getListenerBase(procedure);
3142 // impl.state.barrier.inc();
3144 Predicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
3147 public void execute(ReadGraphImpl graph, int i) {
3149 procedure.execute(querySupport.getResource(i));
3150 } catch (Throwable t2) {
3151 Logger.defaultLogError(t2);
3156 public void finished(ReadGraphImpl graph) {
3158 procedure.finished();
3159 } catch (Throwable t2) {
3160 Logger.defaultLogError(t2);
3162 // impl.state.barrier.dec();
3166 public void exception(ReadGraphImpl graph, Throwable t) {
3168 procedure.exception(t);
3169 } catch (Throwable t2) {
3170 Logger.defaultLogError(t2);
3172 // impl.state.barrier.dec();
3180 final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
3182 assert(subject != null);
3184 return Predicates.queryEach2(impl, querySupport.getId(subject), this, impl.parent);
3190 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
3191 final Resource predicate, final MultiProcedure<Statement> procedure) {
3193 assert(subject != null);
3194 assert(predicate != null);
3195 assert(procedure != null);
3197 final ListenerBase listener = getListenerBase(procedure);
3199 // impl.state.barrier.inc();
3201 Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
3204 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3206 procedure.execute(querySupport.getStatement(s, p, o));
3207 } catch (Throwable t2) {
3208 Logger.defaultLogError(t2);
3213 public void finished(ReadGraphImpl graph) {
3215 procedure.finished();
3216 } catch (Throwable t2) {
3217 Logger.defaultLogError(t2);
3219 // impl.state.barrier.dec();
3223 public void exception(ReadGraphImpl graph, Throwable t) {
3225 procedure.exception(t);
3226 } catch (Throwable t2) {
3227 Logger.defaultLogError(t2);
3229 // impl.state.barrier.dec();
3237 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
3238 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
3240 assert(subject != null);
3241 assert(predicate != null);
3242 assert(procedure != null);
3244 final ListenerBase listener = getListenerBase(procedure);
3246 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
3248 boolean first = true;
3251 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3254 procedure.execute(graph, querySupport.getStatement(s, p, o));
3256 procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
3258 } catch (Throwable t2) {
3259 Logger.defaultLogError(t2);
3264 public void finished(ReadGraphImpl graph) {
3269 procedure.finished(graph);
3270 // impl.state.barrier.dec(this);
3272 procedure.finished(impl.newRestart(graph));
3274 } catch (Throwable t2) {
3275 Logger.defaultLogError(t2);
3281 public void exception(ReadGraphImpl graph, Throwable t) {
3286 procedure.exception(graph, t);
3287 // impl.state.barrier.dec(this);
3289 procedure.exception(impl.newRestart(graph), t);
3291 } catch (Throwable t2) {
3292 Logger.defaultLogError(t2);
3299 int sId = querySupport.getId(subject);
3300 int pId = querySupport.getId(predicate);
3302 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
3303 // else impl.state.barrier.inc(null, null);
3305 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
3310 final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
3311 final Resource predicate, final StatementProcedure procedure) {
3313 assert(subject != null);
3314 assert(predicate != null);
3315 assert(procedure != null);
3317 final ListenerBase listener = getListenerBase(procedure);
3319 TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
3321 boolean first = true;
3324 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3327 procedure.execute(graph, s, p, o);
3329 procedure.execute(impl.newRestart(graph), s, p, o);
3331 } catch (Throwable t2) {
3332 Logger.defaultLogError(t2);
3337 public void finished(ReadGraphImpl graph) {
3342 procedure.finished(graph);
3343 // impl.state.barrier.dec(this);
3345 procedure.finished(impl.newRestart(graph));
3347 } catch (Throwable t2) {
3348 Logger.defaultLogError(t2);
3354 public void exception(ReadGraphImpl graph, Throwable t) {
3359 procedure.exception(graph, t);
3360 // impl.state.barrier.dec(this);
3362 procedure.exception(impl.newRestart(graph), t);
3364 } catch (Throwable t2) {
3365 Logger.defaultLogError(t2);
3372 int sId = querySupport.getId(subject);
3373 int pId = querySupport.getId(predicate);
3375 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
3376 // else impl.state.barrier.inc(null, null);
3378 Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
3383 final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3385 assert(subject != null);
3386 assert(predicate != null);
3387 assert(procedure != null);
3389 forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
3391 private Set<Statement> current = null;
3392 private Set<Statement> run = new HashSet<Statement>();
3395 public void execute(AsyncReadGraph graph, Statement result) {
3397 boolean found = false;
3399 if(current != null) {
3401 found = current.remove(result);
3405 if(!found) procedure.add(graph, result);
3412 public void finished(AsyncReadGraph graph) {
3414 if(current != null) {
3415 for(Statement r : current) procedure.remove(graph, r);
3420 run = new HashSet<Statement>();
3425 public void exception(AsyncReadGraph graph, Throwable t) {
3426 procedure.exception(graph, t);
3430 public boolean isDisposed() {
3431 return procedure.isDisposed();
3439 final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
3440 final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
3442 assert(subject != null);
3443 assert(predicate != null);
3444 assert(procedure != null);
3446 final ListenerBase listener = getListenerBase(procedure);
3448 // impl.state.barrier.inc();
3450 AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
3453 public void execute(ReadGraphImpl graph, int s, int p, int o) {
3455 procedure.execute(graph, querySupport.getStatement(s, p, o));
3456 } catch (Throwable t2) {
3457 Logger.defaultLogError(t2);
3462 public void finished(ReadGraphImpl graph) {
3464 procedure.finished(graph);
3465 } catch (Throwable t2) {
3466 Logger.defaultLogError(t2);
3468 // impl.state.barrier.dec();
3472 public void exception(ReadGraphImpl graph, Throwable t) {
3474 procedure.exception(graph, t);
3475 } catch (Throwable t2) {
3476 Logger.defaultLogError(t2);
3478 // impl.state.barrier.dec();
3485 private static ListenerBase getListenerBase(Object procedure) {
3486 if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
3491 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
3493 assert(subject != null);
3494 assert(predicate != null);
3495 assert(procedure != null);
3497 final ListenerBase listener = getListenerBase(procedure);
3499 // impl.state.barrier.inc();
3501 Objects.runner(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
3504 public void execute(ReadGraphImpl graph, int i) {
3506 procedure.execute(querySupport.getResource(i));
3507 } catch (Throwable t2) {
3508 Logger.defaultLogError(t2);
3513 public void finished(ReadGraphImpl graph) {
3515 procedure.finished();
3516 } catch (Throwable t2) {
3517 Logger.defaultLogError(t2);
3519 // impl.state.barrier.dec();
3523 public void exception(ReadGraphImpl graph, Throwable t) {
3524 System.out.println("forEachObject exception " + t);
3526 procedure.exception(t);
3527 } catch (Throwable t2) {
3528 Logger.defaultLogError(t2);
3530 // impl.state.barrier.dec();
3539 // final public void forEachDirectObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3541 // assert(subject != null);
3542 // assert(predicate != null);
3543 // assert(procedure != null);
3545 // final ListenerBase listener = getListenerBase(procedure);
3547 // int sId = querySupport.getId(subject);
3548 // int pId = querySupport.getId(predicate);
3550 // MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, support);
3552 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectObjects" + sId + "#" + pId);
3553 // else impl.state.barrier.inc(null, null);
3555 // // final Exception caller = new Exception();
3557 // // final Pair<Exception, Exception> exceptions = Pair.make(callerException, new Exception());
3559 // DirectObjects.queryEach(impl, sId, pId, processor, impl.parent, listener, proc);
3564 final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
3566 assert(subject != null);
3567 assert(procedure != null);
3569 final ListenerBase listener = getListenerBase(procedure);
3571 MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);
3573 int sId = querySupport.getId(subject);
3575 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectPredicates" + sId);
3576 // else impl.state.barrier.inc(null, null);
3578 DirectPredicates.queryEach(impl, sId, this, impl.parent, listener, proc);
3583 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {
3585 assert(subject != null);
3586 assert(procedure != null);
3588 final ListenerBase listener = getListenerBase(procedure);
3590 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
3595 final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
3597 assert(subject != null);
3598 assert(procedure != null);
3600 final ListenerBase listener = getListenerBase(procedure);
3602 org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
3606 private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
3609 final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
3611 forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
3613 private Resource single = null;
3616 public synchronized void execute(AsyncReadGraph graph, Resource result) {
3617 if(single == null) {
3620 single = INVALID_RESOURCE;
3625 public synchronized void finished(AsyncReadGraph graph) {
3626 if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
3627 else procedure.execute(graph, single);
3631 public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
3632 procedure.exception(graph, throwable);
3639 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
3641 final int sId = querySupport.getId(subject);
3642 final int pId = querySupport.getId(predicate);
3644 Objects.runner(impl, sId, pId, impl.parent, listener, procedure);
3648 final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
3650 final int sId = querySupport.getId(subject);
3651 final int pId = querySupport.getId(predicate);
3653 return Objects.runner2(impl, sId, pId, impl.parent);
3657 final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
3659 assert(subject != null);
3660 assert(predicate != null);
3662 final ListenerBase listener = getListenerBase(procedure);
3664 if(impl.parent != null || listener != null) {
3666 IntProcedure ip = new IntProcedure() {
3668 AtomicBoolean first = new AtomicBoolean(true);
3671 public void execute(ReadGraphImpl graph, int i) {
3674 procedure.execute(impl, querySupport.getResource(i));
3676 procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
3678 } catch (Throwable t2) {
3679 Logger.defaultLogError(t2);
3685 public void finished(ReadGraphImpl graph) {
3687 if(first.compareAndSet(true, false)) {
3688 procedure.finished(impl);
3689 // impl.state.barrier.dec(this);
3691 procedure.finished(impl.newRestart(graph));
3693 } catch (Throwable t2) {
3694 Logger.defaultLogError(t2);
3699 public void exception(ReadGraphImpl graph, Throwable t) {
3701 procedure.exception(graph, t);
3702 } catch (Throwable t2) {
3703 Logger.defaultLogError(t2);
3705 // impl.state.barrier.dec(this);
3709 public String toString() {
3710 return "forEachObject with " + procedure;
3715 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
3716 // else impl.state.barrier.inc(null, null);
3718 forEachObject(impl, subject, predicate, listener, ip);
3722 IntProcedure ip = new IntProcedure() {
3725 public void execute(ReadGraphImpl graph, int i) {
3726 procedure.execute(graph, querySupport.getResource(i));
3730 public void finished(ReadGraphImpl graph) {
3731 procedure.finished(graph);
3735 public void exception(ReadGraphImpl graph, Throwable t) {
3736 procedure.exception(graph, t);
3740 public String toString() {
3741 return "forEachObject with " + procedure;
3746 forEachObject(impl, subject, predicate, listener, ip);
3753 final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3755 assert(subject != null);
3756 assert(predicate != null);
3757 assert(procedure != null);
3759 forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3761 private Set<Resource> current = null;
3762 private Set<Resource> run = new HashSet<Resource>();
3765 public void execute(AsyncReadGraph graph, Resource result) {
3767 boolean found = false;
3769 if(current != null) {
3771 found = current.remove(result);
3775 if(!found) procedure.add(graph, result);
3782 public void finished(AsyncReadGraph graph) {
3784 if(current != null) {
3785 for(Resource r : current) procedure.remove(graph, r);
3790 run = new HashSet<Resource>();
3795 public boolean isDisposed() {
3796 return procedure.isDisposed();
3800 public void exception(AsyncReadGraph graph, Throwable t) {
3801 procedure.exception(graph, t);
3805 public String toString() {
3806 return "forObjectSet " + procedure;
3814 final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3816 assert(subject != null);
3817 assert(procedure != null);
3819 forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
3821 private Set<Resource> current = null;
3822 private Set<Resource> run = new HashSet<Resource>();
3825 public void execute(AsyncReadGraph graph, Resource result) {
3827 boolean found = false;
3829 if(current != null) {
3831 found = current.remove(result);
3835 if(!found) procedure.add(graph, result);
3842 public void finished(AsyncReadGraph graph) {
3844 if(current != null) {
3845 for(Resource r : current) procedure.remove(graph, r);
3850 run = new HashSet<Resource>();
3855 public boolean isDisposed() {
3856 return procedure.isDisposed();
3860 public void exception(AsyncReadGraph graph, Throwable t) {
3861 procedure.exception(graph, t);
3865 public String toString() {
3866 return "forPredicateSet " + procedure;
3874 final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
3876 assert(subject != null);
3877 assert(procedure != null);
3879 forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
3881 private Set<Resource> current = null;
3882 private Set<Resource> run = new HashSet<Resource>();
3885 public void execute(AsyncReadGraph graph, Resource result) {
3887 boolean found = false;
3889 if(current != null) {
3891 found = current.remove(result);
3895 if(!found) procedure.add(graph, result);
3902 public void finished(AsyncReadGraph graph) {
3904 if(current != null) {
3905 for(Resource r : current) procedure.remove(graph, r);
3910 run = new HashSet<Resource>();
3915 public boolean isDisposed() {
3916 return procedure.isDisposed();
3920 public void exception(AsyncReadGraph graph, Throwable t) {
3921 procedure.exception(graph, t);
3925 public String toString() {
3926 return "forPrincipalTypeSet " + procedure;
3934 final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
3936 assert(subject != null);
3937 assert(predicate != null);
3938 assert(procedure != null);
3940 forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
3942 private Set<Resource> current = null;
3943 private Set<Resource> run = new HashSet<Resource>();
3946 public void execute(AsyncReadGraph graph, Resource result) {
3948 boolean found = false;
3950 if(current != null) {
3952 found = current.remove(result);
3956 if(!found) procedure.add(graph, result);
3963 public void finished(AsyncReadGraph graph) {
3965 if(current != null) {
3966 for(Resource r : current) procedure.remove(graph, r);
3971 run = new HashSet<Resource>();
3976 public boolean isDisposed() {
3977 return procedure.isDisposed();
3981 public void exception(AsyncReadGraph graph, Throwable t) {
3982 procedure.exception(graph, t);
3986 public String toString() {
3987 return "forObjectSet " + procedure;
3995 final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
3997 assert(subject != null);
3998 assert(predicate != null);
3999 assert(procedure != null);
4001 forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
4003 private Set<Statement> current = null;
4004 private Set<Statement> run = new HashSet<Statement>();
4007 public void execute(AsyncReadGraph graph, Statement result) {
4009 boolean found = false;
4011 if(current != null) {
4013 found = current.remove(result);
4017 if(!found) procedure.add(graph, result);
4024 public void finished(AsyncReadGraph graph) {
4026 if(current != null) {
4027 for(Statement s : current) procedure.remove(graph, s);
4032 run = new HashSet<Statement>();
4037 public boolean isDisposed() {
4038 return procedure.isDisposed();
4042 public void exception(AsyncReadGraph graph, Throwable t) {
4043 procedure.exception(graph, t);
4047 public String toString() {
4048 return "forStatementSet " + procedure;
4056 final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
4057 final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
4059 assert(subject != null);
4060 assert(predicate != null);
4061 assert(procedure != null);
4063 final ListenerBase listener = getListenerBase(procedure);
4065 // impl.state.barrier.inc();
4067 AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedure() {
4070 public void execute(ReadGraphImpl graph, int s, int p, int o) {
4072 procedure.execute(graph, querySupport.getResource(o));
4073 } catch (Throwable t2) {
4074 Logger.defaultLogError(t2);
4079 public void finished(ReadGraphImpl graph) {
4081 procedure.finished(graph);
4082 } catch (Throwable t2) {
4083 Logger.defaultLogError(t2);
4085 // impl.state.barrier.dec();
4089 public void exception(ReadGraphImpl graph, Throwable t) {
4091 procedure.exception(graph, t);
4092 } catch (Throwable t2) {
4093 Logger.defaultLogError(t2);
4095 // impl.state.barrier.dec();
4103 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4105 assert(subject != null);
4106 assert(procedure != null);
4108 final ListenerBase listener = getListenerBase(procedure);
4110 IntProcedure ip = new IntProcedure() {
4113 public void execute(ReadGraphImpl graph, int i) {
4115 procedure.execute(graph, querySupport.getResource(i));
4116 } catch (Throwable t2) {
4117 Logger.defaultLogError(t2);
4122 public void finished(ReadGraphImpl graph) {
4124 procedure.finished(graph);
4125 } catch (Throwable t2) {
4126 Logger.defaultLogError(t2);
4128 // impl.state.barrier.dec(this);
4132 public void exception(ReadGraphImpl graph, Throwable t) {
4134 procedure.exception(graph, t);
4135 } catch (Throwable t2) {
4136 Logger.defaultLogError(t2);
4138 // impl.state.barrier.dec(this);
4143 int sId = querySupport.getId(subject);
4145 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
4146 // else impl.state.barrier.inc(null, null);
4148 PrincipalTypes.queryEach(impl, sId, this, impl.parent, listener, ip);
4153 final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
4155 assert(subject != null);
4156 assert(procedure != null);
4158 final ListenerBase listener = getListenerBase(procedure);
4160 // impl.state.barrier.inc();
4162 PrincipalTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
4165 public void execute(ReadGraphImpl graph, int i) {
4167 procedure.execute(querySupport.getResource(i));
4168 } catch (Throwable t2) {
4169 Logger.defaultLogError(t2);
4174 public void finished(ReadGraphImpl graph) {
4176 procedure.finished();
4177 } catch (Throwable t2) {
4178 Logger.defaultLogError(t2);
4180 // impl.state.barrier.dec();
4184 public void exception(ReadGraphImpl graph, Throwable t) {
4186 procedure.exception(t);
4187 } catch (Throwable t2) {
4188 Logger.defaultLogError(t2);
4190 // impl.state.barrier.dec();
4197 final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
4199 assert(subject != null);
4200 assert(procedure != null);
4202 final ListenerBase listener = getListenerBase(procedure);
4204 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
4206 AtomicBoolean first = new AtomicBoolean(true);
4209 public void execute(final ReadGraphImpl graph, IntSet set) {
4211 if(first.compareAndSet(true, false)) {
4212 procedure.execute(graph, set);
4213 // impl.state.barrier.dec(this);
4215 procedure.execute(impl.newRestart(graph), set);
4217 } catch (Throwable t2) {
4218 Logger.defaultLogError(t2);
4223 public void exception(ReadGraphImpl graph, Throwable t) {
4225 if(first.compareAndSet(true, false)) {
4226 procedure.exception(graph, t);
4227 // impl.state.barrier.dec(this);
4229 procedure.exception(impl.newRestart(graph), t);
4231 } catch (Throwable t2) {
4232 Logger.defaultLogError(t2);
4238 int sId = querySupport.getId(subject);
4240 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Types" + sId);
4241 // else impl.state.barrier.inc(null, null);
4243 Types.queryEach(impl, sId, this, impl.parent, listener, ip);
4248 final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
4250 assert(subject != null);
4252 return Types.queryEach2(impl, querySupport.getId(subject), this, impl.parent);
4257 final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {
4259 assert(subject != null);
4260 assert(procedure != null);
4262 final ListenerBase listener = getListenerBase(procedure);
4264 // impl.state.barrier.inc();
4266 RelationInfoQuery.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<RelationInfo>() {
4268 AtomicBoolean first = new AtomicBoolean(true);
4271 public void execute(final ReadGraphImpl graph, RelationInfo set) {
4273 if(first.compareAndSet(true, false)) {
4274 procedure.execute(graph, set);
4275 // impl.state.barrier.dec();
4277 procedure.execute(impl.newRestart(graph), set);
4279 } catch (Throwable t2) {
4280 Logger.defaultLogError(t2);
4285 public void exception(ReadGraphImpl graph, Throwable t) {
4287 if(first.compareAndSet(true, false)) {
4288 procedure.exception(graph, t);
4289 // impl.state.barrier.dec("ReadGraphSupportImpl.1353");
4291 procedure.exception(impl.newRestart(graph), t);
4293 } catch (Throwable t2) {
4294 Logger.defaultLogError(t2);
4303 final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
4305 assert(subject != null);
4306 assert(procedure != null);
4308 final ListenerBase listener = getListenerBase(procedure);
4310 // impl.state.barrier.inc();
4312 SuperTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<IntSet>() {
4314 AtomicBoolean first = new AtomicBoolean(true);
4317 public void execute(final ReadGraphImpl graph, IntSet set) {
4318 // final HashSet<Resource> result = new HashSet<Resource>();
4319 // set.forEach(new TIntProcedure() {
4322 // public boolean execute(int type) {
4323 // result.add(querySupport.getResource(type));
4329 if(first.compareAndSet(true, false)) {
4330 procedure.execute(graph, set);
4331 // impl.state.barrier.dec();
4333 procedure.execute(impl.newRestart(graph), set);
4335 } catch (Throwable t2) {
4336 Logger.defaultLogError(t2);
4341 public void exception(ReadGraphImpl graph, Throwable t) {
4343 if(first.compareAndSet(true, false)) {
4344 procedure.exception(graph, t);
4345 // impl.state.barrier.dec();
4347 procedure.exception(impl.newRestart(graph), t);
4349 } catch (Throwable t2) {
4350 Logger.defaultLogError(t2);
4359 final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4361 assert(subject != null);
4362 assert(procedure != null);
4364 final ListenerBase listener = getListenerBase(procedure);
4366 IntProcedure ip = new IntProcedureAdapter() {
4369 public void execute(final ReadGraphImpl graph, int superRelation) {
4371 procedure.execute(graph, querySupport.getResource(superRelation));
4372 } catch (Throwable t2) {
4373 Logger.defaultLogError(t2);
4378 public void finished(final ReadGraphImpl graph) {
4380 procedure.finished(graph);
4381 } catch (Throwable t2) {
4382 Logger.defaultLogError(t2);
4384 // impl.state.barrier.dec(this);
4389 public void exception(ReadGraphImpl graph, Throwable t) {
4391 procedure.exception(graph, t);
4392 } catch (Throwable t2) {
4393 Logger.defaultLogError(t2);
4395 // impl.state.barrier.dec(this);
4400 int sId = querySupport.getId(subject);
4402 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
4403 // else impl.state.barrier.inc(null, null);
4405 DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
4410 final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
4412 assert(subject != null);
4413 assert(procedure != null);
4415 final ListenerBase listener = getListenerBase(procedure);
4417 // impl.state.barrier.inc();
4419 PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
4424 final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
4426 assert(subject != null);
4427 assert(procedure != null);
4429 final ListenerBase listener = getListenerBase(procedure);
4431 InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
4434 public void execute(final ReadGraphImpl graph, IntSet set) {
4435 // final HashSet<Resource> result = new HashSet<Resource>();
4436 // set.forEach(new TIntProcedure() {
4439 // public boolean execute(int type) {
4440 // result.add(querySupport.getResource(type));
4446 procedure.execute(graph, set);
4447 } catch (Throwable t2) {
4448 Logger.defaultLogError(t2);
4450 // impl.state.barrier.dec(this);
4454 public void exception(ReadGraphImpl graph, Throwable t) {
4456 procedure.exception(graph, t);
4457 } catch (Throwable t2) {
4458 Logger.defaultLogError(t2);
4460 // impl.state.barrier.dec(this);
4465 int sId = querySupport.getId(subject);
4467 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
4468 // else impl.state.barrier.inc(null, null);
4470 SuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
4474 final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
4476 int sId = querySupport.getId(subject);
4477 return ValueQuery.queryEach(impl, sId, impl.parent);
4481 final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
4483 return ValueQuery.queryEach(impl, subject, impl.parent);
4488 final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
4490 assert(subject != null);
4492 int sId = querySupport.getId(subject);
4494 if(procedure != null) {
4496 final ListenerBase listener = getListenerBase(procedure);
4498 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
4500 AtomicBoolean first = new AtomicBoolean(true);
4503 public void execute(ReadGraphImpl graph, byte[] result) {
4505 if(first.compareAndSet(true, false)) {
4506 procedure.execute(graph, result);
4507 // impl.state.barrier.dec(this);
4509 procedure.execute(impl.newRestart(graph), result);
4511 } catch (Throwable t2) {
4512 Logger.defaultLogError(t2);
4517 public void exception(ReadGraphImpl graph, Throwable t) {
4519 if(first.compareAndSet(true, false)) {
4520 procedure.exception(graph, t);
4521 // impl.state.barrier.dec(this);
4523 procedure.exception(impl.newRestart(graph), t);
4525 } catch (Throwable t2) {
4526 Logger.defaultLogError(t2);
4532 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
4533 // else impl.state.barrier.inc(null, null);
4535 return ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
4539 return ValueQuery.queryEach(impl, sId, impl.parent, null, null);
4546 final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
4548 assert(subject != null);
4549 assert(procedure != null);
4551 final ListenerBase listener = getListenerBase(procedure);
4553 if(impl.parent != null || listener != null) {
4555 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
4557 AtomicBoolean first = new AtomicBoolean(true);
4560 public void execute(ReadGraphImpl graph, byte[] result) {
4562 if(first.compareAndSet(true, false)) {
4563 procedure.execute(graph, result);
4564 // impl.state.barrier.dec(this);
4566 procedure.execute(impl.newRestart(graph), result);
4568 } catch (Throwable t2) {
4569 Logger.defaultLogError(t2);
4574 public void exception(ReadGraphImpl graph, Throwable t) {
4576 if(first.compareAndSet(true, false)) {
4577 procedure.exception(graph, t);
4578 // impl.state.barrier.dec(this);
4580 procedure.exception(impl.newRestart(graph), t);
4582 } catch (Throwable t2) {
4583 Logger.defaultLogError(t2);
4589 int sId = querySupport.getId(subject);
4591 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
4592 // else impl.state.barrier.inc(null, null);
4594 ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
4598 InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
4601 public void execute(ReadGraphImpl graph, byte[] result) {
4603 procedure.execute(graph, result);
4608 public void exception(ReadGraphImpl graph, Throwable t) {
4610 procedure.exception(graph, t);
4616 int sId = querySupport.getId(subject);
4618 ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);
4625 final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
4627 assert(relation != null);
4628 assert(procedure != null);
4630 final ListenerBase listener = getListenerBase(procedure);
4632 IntProcedure ip = new IntProcedure() {
4634 private int result = 0;
4636 final AtomicBoolean found = new AtomicBoolean(false);
4637 final AtomicBoolean done = new AtomicBoolean(false);
4640 public void finished(ReadGraphImpl graph) {
4642 // Shall fire exactly once!
4643 if(done.compareAndSet(false, true)) {
4646 procedure.exception(graph, new NoInverseException(""));
4647 // impl.state.barrier.dec(this);
4649 procedure.execute(graph, querySupport.getResource(result));
4650 // impl.state.barrier.dec(this);
4652 } catch (Throwable t) {
4653 Logger.defaultLogError(t);
4660 public void execute(ReadGraphImpl graph, int i) {
4662 if(found.compareAndSet(false, true)) {
4665 // Shall fire exactly once!
4666 if(done.compareAndSet(false, true)) {
4668 procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
4669 // impl.state.barrier.dec(this);
4670 } catch (Throwable t) {
4671 Logger.defaultLogError(t);
4679 public void exception(ReadGraphImpl graph, Throwable t) {
4680 // Shall fire exactly once!
4681 if(done.compareAndSet(false, true)) {
4683 procedure.exception(graph, t);
4684 // impl.state.barrier.dec(this);
4685 } catch (Throwable t2) {
4686 Logger.defaultLogError(t2);
4693 int sId = querySupport.getId(relation);
4695 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
4696 // else impl.state.barrier.inc(null, null);
4698 Objects.runner(impl, sId, getInverseOf(), impl.parent, listener, ip);
4703 final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
4706 assert(procedure != null);
4708 InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
4711 public void execute(ReadGraphImpl graph, Integer result) {
4713 procedure.execute(graph, querySupport.getResource(result));
4714 } catch (Throwable t2) {
4715 Logger.defaultLogError(t2);
4717 // impl.state.barrier.dec(this);
4721 public void exception(ReadGraphImpl graph, Throwable t) {
4724 procedure.exception(graph, t);
4725 } catch (Throwable t2) {
4726 Logger.defaultLogError(t2);
4728 // impl.state.barrier.dec(this);
4733 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
4734 // else impl.state.barrier.inc(null, null);
4736 forResource(impl, id, impl.parent, ip);
4741 final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
4744 assert(procedure != null);
4746 // impl.state.barrier.inc();
4748 forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
4751 public void execute(ReadGraphImpl graph, Integer result) {
4753 procedure.execute(graph, querySupport.getResource(result));
4754 } catch (Throwable t2) {
4755 Logger.defaultLogError(t2);
4757 // impl.state.barrier.dec();
4761 public void exception(ReadGraphImpl graph, Throwable t) {
4763 procedure.exception(graph, t);
4764 } catch (Throwable t2) {
4765 Logger.defaultLogError(t2);
4767 // impl.state.barrier.dec();
4775 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4777 assert(subject != null);
4778 assert(procedure != null);
4780 final ListenerBase listener = getListenerBase(procedure);
4782 // impl.state.barrier.inc();
4784 DirectPredicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
4786 boolean found = false;
4789 public void execute(ReadGraphImpl graph, int object) {
4794 public void finished(ReadGraphImpl graph) {
4796 procedure.execute(graph, found);
4797 } catch (Throwable t2) {
4798 Logger.defaultLogError(t2);
4800 // impl.state.barrier.dec();
4804 public void exception(ReadGraphImpl graph, Throwable t) {
4806 procedure.exception(graph, t);
4807 } catch (Throwable t2) {
4808 Logger.defaultLogError(t2);
4810 // impl.state.barrier.dec();
4818 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
4820 assert(subject != null);
4821 assert(predicate != null);
4822 assert(procedure != null);
4824 AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
4826 boolean found = false;
4829 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4834 synchronized public void finished(AsyncReadGraph graph) {
4836 procedure.execute(graph, found);
4837 } catch (Throwable t2) {
4838 Logger.defaultLogError(t2);
4840 // impl.state.barrier.dec(this);
4844 public void exception(AsyncReadGraph graph, Throwable t) {
4846 procedure.exception(graph, t);
4847 } catch (Throwable t2) {
4848 Logger.defaultLogError(t2);
4850 // impl.state.barrier.dec(this);
4855 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
4856 // else impl.state.barrier.inc(null, null);
4858 forEachObject(impl, subject, predicate, ip);
4863 final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
4865 assert(subject != null);
4866 assert(predicate != null);
4867 assert(procedure != null);
4869 // impl.state.barrier.inc();
4871 forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
4873 boolean found = false;
4876 synchronized public void execute(AsyncReadGraph graph, Resource resource) {
4877 if(resource.equals(object)) found = true;
4881 synchronized public void finished(AsyncReadGraph graph) {
4883 procedure.execute(graph, found);
4884 } catch (Throwable t2) {
4885 Logger.defaultLogError(t2);
4887 // impl.state.barrier.dec();
4891 public void exception(AsyncReadGraph graph, Throwable t) {
4893 procedure.exception(graph, t);
4894 } catch (Throwable t2) {
4895 Logger.defaultLogError(t2);
4897 // impl.state.barrier.dec();
4905 final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
4907 assert(subject != null);
4908 assert(procedure != null);
4910 final ListenerBase listener = getListenerBase(procedure);
4912 // impl.state.barrier.inc();
4914 ValueQuery.queryEach(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
4917 public void execute(ReadGraphImpl graph, byte[] object) {
4918 boolean result = object != null;
4920 procedure.execute(graph, result);
4921 } catch (Throwable t2) {
4922 Logger.defaultLogError(t2);
4924 // impl.state.barrier.dec();
4928 public void exception(ReadGraphImpl graph, Throwable t) {
4930 procedure.exception(graph, t);
4931 } catch (Throwable t2) {
4932 Logger.defaultLogError(t2);
4934 // impl.state.barrier.dec();
4942 final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
4944 assert(subject != null);
4945 assert(procedure != null);
4947 final ListenerBase listener = getListenerBase(procedure);
4949 // impl.state.barrier.inc();
4951 OrderedSet.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {
4954 public void exception(ReadGraphImpl graph, Throwable t) {
4956 procedure.exception(graph, t);
4957 } catch (Throwable t2) {
4958 Logger.defaultLogError(t2);
4960 // impl.state.barrier.dec();
4964 public void execute(ReadGraphImpl graph, int i) {
4966 procedure.execute(graph, querySupport.getResource(i));
4967 } catch (Throwable t2) {
4968 Logger.defaultLogError(t2);
4973 public void finished(ReadGraphImpl graph) {
4975 procedure.finished(graph);
4976 } catch (Throwable t2) {
4977 Logger.defaultLogError(t2);
4979 // impl.state.barrier.dec();
4987 final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) {
4989 assert(request != null);
4990 assert(procedure != null);
4992 // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(request, "#" + request.toString() + ".1999");
4993 // else impl.state.barrier.inc(null, null);
4995 runAsyncRead(impl, request, parent, listener, procedure);
5000 final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
5002 assert(graph != null);
5003 assert(request != null);
5005 final ReadEntry entry = cache.readMap.get(request);
5006 if(entry != null && entry.isReady()) {
5007 return (T)entry.get(graph, this, null);
5009 return request.perform(graph);
5014 final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
5016 assert(graph != null);
5017 assert(request != null);
5019 final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
5020 if(entry != null && entry.isReady()) {
5021 if(entry.isExcepted()) {
5022 Throwable t = (Throwable)entry.getResult();
5023 if(t instanceof DatabaseException) throw (DatabaseException)t;
5024 else throw new DatabaseException(t);
5026 return (T)entry.getResult();
5030 final DataContainer<T> result = new DataContainer<T>();
5031 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
5033 request.register(graph, new Listener<T>() {
5036 public void exception(Throwable t) {
5041 public void execute(T t) {
5046 public boolean isDisposed() {
5052 Throwable t = exception.get();
5054 if(t instanceof DatabaseException) throw (DatabaseException)t;
5055 else throw new DatabaseException(t);
5058 return result.get();
5065 final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
5067 assert(graph != null);
5068 assert(request != null);
5070 final AsyncReadEntry entry = cache.asyncReadMap.get(request);
5071 if(entry != null && entry.isReady()) {
5072 if(entry.isExcepted()) {
5073 procedure.exception(graph, (Throwable)entry.getResult());
5075 procedure.execute(graph, (T)entry.getResult());
5078 request.perform(graph, procedure);
5084 final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
5086 assert(request != null);
5087 assert(procedure != null);
5091 queryMultiRead(impl, request, parent, listener, procedure);
5093 } catch (DatabaseException e) {
5095 throw new IllegalStateException(e);
5102 final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
5104 assert(request != null);
5105 assert(procedure != null);
5107 // impl.state.barrier.inc();
5109 runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
5111 public void execute(AsyncReadGraph graph, T result) {
5114 procedure.execute(graph, result);
5115 } catch (Throwable t2) {
5116 Logger.defaultLogError(t2);
5121 public void finished(AsyncReadGraph graph) {
5124 procedure.finished(graph);
5125 } catch (Throwable t2) {
5126 Logger.defaultLogError(t2);
5129 // impl.state.barrier.dec();
5134 public String toString() {
5135 return procedure.toString();
5139 public void exception(AsyncReadGraph graph, Throwable t) {
5142 procedure.exception(graph, t);
5143 } catch (Throwable t2) {
5144 Logger.defaultLogError(t2);
5147 // impl.state.barrier.dec();
5156 final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {
5158 assert(request != null);
5159 assert(procedure != null);
5163 queryPrimitiveRead(impl, request, parent, listener, new Procedure<T>() {
5166 public void execute(T result) {
5168 procedure.execute(result);
5169 } catch (Throwable t2) {
5170 Logger.defaultLogError(t2);
5175 public String toString() {
5176 return procedure.toString();
5180 public void exception(Throwable t) {
5182 procedure.exception(t);
5183 } catch (Throwable t2) {
5184 Logger.defaultLogError(t2);
5190 } catch (DatabaseException e) {
5192 throw new IllegalStateException(e);
5199 public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
5201 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
5206 public VirtualGraph getProvider(Resource subject, Resource predicate) {
5208 return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
5213 public VirtualGraph getValueProvider(Resource subject) {
5215 return querySupport.getValueProvider(querySupport.getId(subject));
5219 public boolean resumeTasks(ReadGraphImpl graph) {
5221 return querySupport.resume(graph);
5225 public boolean isImmutable(int resourceId) {
5226 return querySupport.isImmutable(resourceId);
5229 public boolean isImmutable(Resource resource) {
5230 ResourceImpl impl = (ResourceImpl)resource;
5231 return isImmutable(impl.id);
5236 public Layer0 getL0(ReadGraph graph) {
5238 L0 = Layer0.getInstance(graph);