/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.impl.query; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.simantics.databoard.Bindings; import org.simantics.db.AsyncReadGraph; import org.simantics.db.DevelopmentKeys; import org.simantics.db.DirectStatements; import org.simantics.db.ReadGraph; import org.simantics.db.RelationInfo; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.Statement; import org.simantics.db.VirtualGraph; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.utils.Logger; import org.simantics.db.debug.ListenerReport; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ManyObjectsForFunctionalRelationException; import org.simantics.db.exception.NoInverseException; import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.impl.DebugPolicy; import org.simantics.db.impl.ResourceImpl; import org.simantics.db.impl.graph.MultiIntProcedure; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.ReadGraphSupport; import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.procedure.IntProcedureAdapter; import org.simantics.db.impl.procedure.InternalProcedure; import org.simantics.db.impl.procedure.TripleIntProcedureAdapter; import org.simantics.db.impl.support.ResourceSupport; import org.simantics.db.procedure.AsyncMultiListener; import org.simantics.db.procedure.AsyncMultiProcedure; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.procedure.AsyncSetListener; import org.simantics.db.procedure.Listener; import org.simantics.db.procedure.ListenerBase; import org.simantics.db.procedure.MultiProcedure; import org.simantics.db.procedure.Procedure; import org.simantics.db.procedure.StatementProcedure; import org.simantics.db.request.AsyncMultiRead; import org.simantics.db.request.AsyncRead; import org.simantics.db.request.ExternalRead; import org.simantics.db.request.MultiRead; import org.simantics.db.request.Read; import org.simantics.db.request.RequestFlags; import org.simantics.db.request.WriteTraits; import org.simantics.layer0.Layer0; import org.simantics.utils.DataContainer; import org.simantics.utils.Development; import org.simantics.utils.datastructures.Pair; import org.simantics.utils.datastructures.collections.CollectionUtils; import org.simantics.utils.datastructures.disposable.AbstractDisposable; import gnu.trove.map.hash.THashMap; import gnu.trove.procedure.TIntProcedure; import gnu.trove.procedure.TLongProcedure; import gnu.trove.procedure.TObjectProcedure; import gnu.trove.set.hash.THashSet; import gnu.trove.set.hash.TIntHashSet; @SuppressWarnings({"rawtypes", "unchecked"}) final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport { final public UnaryQueryHashMap directPredicatesMap; final public UnaryQueryHashMap principalTypesMap; final public THashMap uriToResourceMap; final public THashMap namespaceIndexMap22; final public UnaryQueryHashMap projectsMap; final public UnaryQueryHashMap> relationInfoMap; final public UnaryQueryHashMap> superTypesMap; final public UnaryQueryHashMap> typeHierarchyMap; final public UnaryQueryHashMap> superRelationsMap; final public UnaryQueryHashMap> typesMap; final public UnaryQueryHashMap> valueMap; final public DoubleKeyQueryHashMap directObjectsMap; final public DoubleKeyQueryHashMap objectsMap; final public UnaryQueryHashMap orderedSetMap; final public UnaryQueryHashMap predicatesMap; final public DoubleKeyQueryHashMap statementsMap; final public UnaryQueryHashMap assertedPredicatesMap; final public BinaryQueryHashMap assertedStatementsMap; final public StableHashMap externalReadMap; final public StableHashMap asyncReadMap; final public StableHashMap readMap; final public StableHashMap asyncMultiReadMap; final public StableHashMap multiReadMap; final private THashMap> listeners; public static int indent = 0; public int size = 0; // Garbage collection public int boundQueries = 0; // Statistics private int hits = 0; private int misses = 0; private int updates = 0; final private int functionalRelation; final private int superrelationOf; final private int instanceOf; final private int inverseOf; final private int asserts; final private int hasPredicate; final private int hasPredicateInverse; final private int hasObject; final private int inherits; final private int subrelationOf; final private int rootLibrary; /** * A cache for the root library resource. Initialized in * {@link #getRootLibraryResource()}. */ private volatile ResourceImpl rootLibraryResource; final private int library; final private int consistsOf; final private int hasName; AtomicInteger sleepers = new AtomicInteger(0); private boolean updating = false; static public boolean collecting = false; private boolean firingListeners = false; final public QuerySupport querySupport; final public Session session; final public ResourceSupport resourceSupport; private THashSet scheduledListeners = new THashSet(); QueryThread[] executors; public ArrayList[] queues; enum ThreadState { INIT, RUN, SLEEP, DISPOSED } public ThreadState[] threadStates; public ReentrantLock[] threadLocks; public Condition[] threadConditions; public ArrayList[] ownTasks; public ArrayList[] ownSyncTasks; ArrayList[] delayQueues; public boolean synch = true; final Object querySupportLock; public Long modificationCounter = 0L; public void close() { } final public void scheduleOwn(int caller, SessionTask request) { ownTasks[caller].add(request); } final public void scheduleAlways(int caller, SessionTask request) { int performer = request.thread; if(caller == performer) { ownTasks[caller].add(request); } else { schedule(caller, request); } } final public void schedule(int caller, SessionTask request) { int performer = request.thread; if(DebugPolicy.SCHEDULE) System.out.println("schedule " + request + " " + caller + " -> " + performer); assert(performer >= 0); assert(request != null); if(caller == performer) { request.run(caller); } else { ReentrantLock queueLock = threadLocks[performer]; queueLock.lock(); queues[performer].add(request); // This thread could have been sleeping if(queues[performer].size() == 1) { if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet(); threadConditions[performer].signalAll(); } queueLock.unlock(); } } final int THREADS; final public int THREAD_MASK; final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group"); public static abstract class SessionTask { final public int thread; final public int syncCaller; final public Object object; public SessionTask(WriteTraits object, int thread) { this.thread = thread; this.syncCaller = -1; this.object = object; } public SessionTask(Object object, int thread, int syncCaller) { this.thread = thread; this.syncCaller = syncCaller; this.object = object; } public abstract void run(int thread); @Override public String toString() { return "SessionTask[" + object + "]"; } } public static abstract class SessionRead extends SessionTask { final public Semaphore notify; final public DataContainer throwable; public SessionRead(Object object, DataContainer throwable, Semaphore notify, int thread) { super(object, thread, thread); this.throwable = throwable; this.notify = notify; } public SessionRead(Object object, DataContainer throwable, Semaphore notify, int thread, int syncThread) { super(object, thread, syncThread); this.throwable = throwable; this.notify = notify; } } long waitingTime = 0; static int koss = 0; static int koss2 = 0; public boolean resume(ReadGraphImpl graph) { return executors[0].runSynchronized(); } public QueryProcessor(final int threads, QuerySupport core, Set threadSet) throws DatabaseException { THREADS = threads; THREAD_MASK = threads - 1; querySupport = core; session = querySupport.getSession(); resourceSupport = querySupport.getSupport(); querySupportLock = core.getLock(); executors = new QueryThread[THREADS]; queues = new ArrayList[THREADS]; threadLocks = new ReentrantLock[THREADS]; threadConditions = new Condition[THREADS]; threadStates = new ThreadState[THREADS]; ownTasks = new ArrayList[THREADS]; ownSyncTasks = new ArrayList[THREADS]; delayQueues = new ArrayList[THREADS * THREADS]; // freeSchedule = new AtomicInteger(0); for (int i = 0; i < THREADS * THREADS; i++) { delayQueues[i] = new ArrayList(); } for (int i = 0; i < THREADS; i++) { // tasks[i] = new ArrayList(); ownTasks[i] = new ArrayList(); ownSyncTasks[i] = new ArrayList(); queues[i] = new ArrayList(); threadLocks[i] = new ReentrantLock(); threadConditions[i] = threadLocks[i].newCondition(); // limits[i] = false; threadStates[i] = ThreadState.INIT; } for (int i = 0; i < THREADS; i++) { final int index = i; executors[i] = new QueryThread(session, this, index, "Query Thread " + index); threadSet.add(executors[i]); } directPredicatesMap = new UnaryQueryHashMap(); valueMap = new UnaryQueryHashMap(); principalTypesMap = new UnaryQueryHashMap(); uriToResourceMap = new THashMap(); namespaceIndexMap22 = new THashMap(); projectsMap = new UnaryQueryHashMap(); relationInfoMap = new UnaryQueryHashMap(); typeHierarchyMap = new UnaryQueryHashMap(); superTypesMap = new UnaryQueryHashMap(); superRelationsMap = new UnaryQueryHashMap(); typesMap = new UnaryQueryHashMap(); objectsMap = new DoubleKeyQueryHashMap(); orderedSetMap = new UnaryQueryHashMap(); predicatesMap = new UnaryQueryHashMap(); statementsMap = new DoubleKeyQueryHashMap(); directObjectsMap = new DoubleKeyQueryHashMap(); assertedPredicatesMap = new UnaryQueryHashMap(); assertedStatementsMap = new BinaryQueryHashMap(); asyncReadMap = new StableHashMap(); readMap = new StableHashMap(); asyncMultiReadMap = new StableHashMap(); multiReadMap = new StableHashMap(); externalReadMap = new StableHashMap(); listeners = new THashMap>(10, 0.75f); // Now start threads for (int i = 0; i < THREADS; i++) { executors[i].start(); } // Make sure that query threads are up and running while(sleepers.get() != THREADS) { try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } rootLibrary = core.getBuiltin("http:/"); boolean builtinsInstalled = rootLibrary != 0; if (builtinsInstalled) { functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation); assert (functionalRelation != 0); } else functionalRelation = 0; if (builtinsInstalled) { instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf); assert (instanceOf != 0); } else instanceOf = 0; if (builtinsInstalled) { inverseOf = core.getBuiltin(Layer0.URIs.InverseOf); assert (inverseOf != 0); } else inverseOf = 0; if (builtinsInstalled) { inherits = core.getBuiltin(Layer0.URIs.Inherits); assert (inherits != 0); } else inherits = 0; if (builtinsInstalled) { asserts = core.getBuiltin(Layer0.URIs.Asserts); assert (asserts != 0); } else asserts = 0; if (builtinsInstalled) { hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate); assert (hasPredicate != 0); } else hasPredicate = 0; if (builtinsInstalled) { hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse); assert (hasPredicateInverse != 0); } else hasPredicateInverse = 0; if (builtinsInstalled) { hasObject = core.getBuiltin(Layer0.URIs.HasObject); assert (hasObject != 0); } else hasObject = 0; if (builtinsInstalled) { subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf); assert (subrelationOf != 0); } else subrelationOf = 0; if (builtinsInstalled) { superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf); assert (superrelationOf != 0); } else superrelationOf = 0; if (builtinsInstalled) { library = core.getBuiltin(Layer0.URIs.Library); assert (library != 0); } else library = 0; if (builtinsInstalled) { consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf); assert (consistsOf != 0); } else consistsOf = 0; if (builtinsInstalled) { hasName = core.getBuiltin(Layer0.URIs.HasName); assert (hasName != 0); } else hasName = 0; } final public void releaseWrite(ReadGraphImpl graph) { performDirtyUpdates(graph); modificationCounter++; } final public int getId(final Resource r) { return querySupport.getId(r); } public QuerySupport getCore() { return querySupport; } public int getFunctionalRelation() { return functionalRelation; } public int getInherits() { return inherits; } public int getInstanceOf() { return instanceOf; } public int getInverseOf() { return inverseOf; } public int getSubrelationOf() { return subrelationOf; } public int getSuperrelationOf() { return superrelationOf; } public int getAsserts() { return asserts; } public int getHasPredicate() { return hasPredicate; } public int getHasPredicateInverse() { return hasPredicateInverse; } public int getHasObject() { return hasObject; } public int getRootLibrary() { return rootLibrary; } public Resource getRootLibraryResource() { if (rootLibraryResource == null) { // Synchronization is not needed here, it doesn't matter if multiple // threads simultaneously set rootLibraryResource once. int root = getRootLibrary(); if (root == 0) throw new UnsupportedOperationException("database is not initialized, cannot get root library resource"); this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root); } return rootLibraryResource; } public int getLibrary() { return library; } public int getConsistsOf() { return consistsOf; } public int getHasName() { return hasName; } public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure procedure) { URIToResource.queryEach(graph, id, parent, null, new InternalProcedure() { @Override public void execute(ReadGraphImpl graph, Integer result) { if (result != null && result != 0) { procedure.execute(graph, result); return; } // Fall back to using the fixed builtins. result = querySupport.getBuiltin(id); if (result != 0) { procedure.execute(graph, result); return; } try { result = querySupport.getRandomAccessReference(id); } catch (ResourceNotFoundException e) { procedure.exception(graph, e); return; } if (result != 0) { procedure.execute(graph, result); } else { procedure.exception(graph, new ResourceNotFoundException(id)); } } @Override public void exception(ReadGraphImpl graph, Throwable t) { procedure.exception(graph, t); } }); } public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure procedure) { Integer result = querySupport.getBuiltin(id); if (result != 0) { procedure.execute(graph, result); } else { procedure.exception(graph, new ResourceNotFoundException(id)); } } public final void runAsyncRead(final ReadGraphImpl graph, final AsyncRead query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure procedure) { int hash = requestHash(query); AsyncReadEntry entry = asyncReadMap.get(query, hash); if(parent == null && listener == null) { if(entry != null && (entry.isReady() || entry.isExcepted())) { System.out.println("ready " + query); entry.performFromCache(graph, this, procedure); // graph.state.barrier.dec(query); return; } else { query.perform(graph, procedure); // graph.state.barrier.dec(query); return; } } if(entry == null) { entry = new AsyncReadEntry(query); entry.setPending(); entry.clearResult(querySupport); asyncReadMap.put(query, entry, hash); performForEach(graph, query, entry, parent, listener, procedure, false); } else { if(entry.isPending()) { synchronized(entry) { if(entry.isPending()) { throw new IllegalStateException(); // final AsyncBarrierImpl parentBarrier = graph.state.barrier; // if(entry.procs == null) entry.procs = new ArrayList>(); // entry.procs.add(new AsyncProcedure() { // // @Override // public void execute(AsyncReadGraph graph, T result) { // procedure.execute(graph, result); // parentBarrier.dec(query); // } // // @Override // public void exception(AsyncReadGraph graph, Throwable throwable) { // procedure.exception(graph, throwable); // parentBarrier.dec(query); // } // // }); // if(graph.parent != null || listener != null) { // registerDependencies(graph, entry, parent, listener, procedure, false); // } // // query.perform(graph, procedure); // // return; } } } if(entry.isReady()) { entry.performFromCache(graph, this, procedure); registerDependencies(graph, entry, parent, listener, procedure, false); } else { performForEach(graph, query, entry, parent, listener, procedure, false); } } } final static void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure procedure) { MultiReadEntry entry = cached != null ? cached : provider.multiReadMap.get(query); if(entry == null) { entry = new MultiReadEntry(query); entry.setPending(); entry.clearResult(provider.querySupport); provider.multiReadMap.put(query, entry); provider.performForEach(graph, query, entry, parent, listener, procedure, false); } else { if(entry.isPending()) { synchronized(entry) { if(entry.isPending()) { throw new IllegalStateException(); // if(entry.procs == null) entry.procs = new ArrayList, AsyncBarrier>>(); // entry.procs.add(new Pair(procedure, parentBarrier)); // if(graph.parent != null || listener != null) { // provider.registerDependencies(graph, entry, parent, listener, procedure, false); // } // If this was synchronized we must wait here until completion // if(graph.state.synchronizedExecution) { // while(entry.isPending()) { // graph.resumeTasks(graph.callerThread, null, null); // } // } // // return; } } entry.performFromCache(graph, provider, procedure); // graph.state.barrier.dec(query); return; } else { provider.performForEach(graph, query, entry, parent, listener, procedure, false); } } } public final void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure procedure) { int hash = requestHash(query); AsyncMultiReadEntry entry = asyncMultiReadMap.get(query, hash); if(parent == null && listener == null) { if(entry != null && (entry.isReady() || entry.isExcepted())) { System.out.println("ready " + query); entry.performFromCache(graph, this, procedure); return; } else { query.perform(graph, procedure); return; } } if(entry == null) { entry = new AsyncMultiReadEntry(query); entry.setPending(); entry.clearResult(querySupport); asyncMultiReadMap.put(query, entry, hash); performForEach(graph, query, entry, parent, listener, procedure, false); } else { if(entry.isPending()) { synchronized(entry) { if(entry.isPending()) { throw new IllegalStateException(); // if(entry.procs == null) entry.procs = new ArrayList>(); // entry.procs.add(procedure); // if(graph.parent != null || listener != null) { // registerDependencies(graph, entry, parent, listener, procedure, false); // } // return; } } } performForEach(graph, query, entry, parent, listener, procedure, false); } } final static void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final Procedure procedure) { final ExternalReadEntry entry = cached != null ? cached : provider.externalReadMap.get(query); if(entry == null) { provider.performForEach(graph, query, new ExternalReadEntry(query), parent, listener, procedure, false); } else { if(entry.isPending()) { synchronized(entry) { if(entry.isPending()) { throw new IllegalStateException(); // if(entry.procs == null) entry.procs = new ArrayList>(); // entry.procs.add(procedure); // return; } } } provider.performForEach(graph, query, entry, parent, listener, procedure, false); } } public int requestHash(Object object) { try { return object.hashCode(); } catch (Throwable t) { Logger.defaultLogError(t); return 0; } } @Override public T queryRead(final ReadGraphImpl graph, final Read query, final CacheEntry parent, final AsyncProcedure procedure, final ListenerBase listener) throws Throwable { assert(query != null); ReadEntry entry = readMap.get(query); if(entry != null) { if(parent == null && (listener == null || listener.isDisposed()) && entry.isReady()) { return (T)entry.get(graph, this, procedure); } else if (entry.isPending()) { throw new IllegalStateException(); } } if(entry == null) { entry = new ReadEntry(query); entry.setPending(); entry.clearResult(querySupport); readMap.put(query, entry); return (T)performForEach(graph, query, entry, parent, listener, procedure, false); } else { if(entry.isPending()) { throw new IllegalStateException(); } else { return (T)performForEach(graph, query, entry, parent, listener, procedure, false); } } } public void queryMultiRead(final ReadGraphImpl graph, final MultiRead query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure procedure) { assert(query != null); assert(procedure != null); final MultiReadEntry entry = multiReadMap.get(query); if(parent == null && !(listener != null)) { if(entry != null && entry.isReady()) { entry.performFromCache(graph, this, procedure); return; } } runMultiRead(graph, entry, query, parent, this, listener, procedure); } public void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead query, final CacheEntry parent, final ListenerBase listener, final Procedure procedure) { assert(query != null); assert(procedure != null); final ExternalReadEntry entry = externalReadMap.get(query); if(parent == null && !(listener != null)) { if(entry != null && entry.isReady()) { entry.performFromCache(procedure); return; } } runPrimitiveRead(graph, entry, query, parent, this, listener, procedure); } public void performForEach(ReadGraphImpl parentGraph, final AsyncRead query, final AsyncReadEntry entry, final CacheEntry parent, final ListenerBase base, final AsyncProcedure procedure, boolean inferredDependency) { if (DebugPolicy.PERFORM) System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); assert (!dirty); assert (!collecting); assert(!entry.isDiscarded()); final ListenerEntry listenerEntry = registerDependencies(parentGraph, entry, parent, base, procedure, inferredDependency); // FRESH, REFUTED, EXCEPTED go here if (!entry.isReady()) { entry.setPending(); size++; try { final ReadGraphImpl finalParentGraph = parentGraph; query.perform(parentGraph.withParent(entry), new AsyncProcedure() { @Override public void execute(AsyncReadGraph returnGraph, T result) { ReadGraphImpl impl = (ReadGraphImpl)returnGraph; //AsyncReadGraph resumeGraph = finalParentGraph.newAsync(); entry.addOrSet(finalParentGraph, result); if(listenerEntry != null) { primeListenerEntry(listenerEntry, result); } try { procedure.execute(finalParentGraph, result); } catch (Throwable t) { t.printStackTrace(); } // parentBarrier.dec(query); } @Override public void exception(AsyncReadGraph returnGraph, Throwable t) { ReadGraphImpl impl = (ReadGraphImpl)returnGraph; // AsyncReadGraph resumeGraph = finalParentGraph.newAsync(); entry.except(finalParentGraph, t); try { procedure.exception(finalParentGraph, t); } catch (Throwable t2) { t2.printStackTrace(); } // parentBarrier.dec(query); } @Override public String toString() { return procedure.toString(); } }); } catch (Throwable t) { entry.except(t); try { procedure.exception(parentGraph, t); } catch (Throwable t2) { t2.printStackTrace(); } // parentBarrier.dec(query); } misses++; } else { entry.performFromCache(parentGraph, this, new AsyncProcedure() { @Override public void exception(AsyncReadGraph graph, Throwable throwable) { procedure.exception(graph, throwable); } @Override public void execute(AsyncReadGraph graph, T result) { procedure.execute(graph, result); if(listenerEntry != null) { primeListenerEntry(listenerEntry, result); } } }); // parentBarrier.dec(query); hits++; } assert (!entry.isDiscarded()); } public T performForEach(final ReadGraphImpl graph, final Read query, final ReadEntry entry, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure procedure, boolean inferredDependency) throws Throwable { if (DebugPolicy.PERFORM) System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); assert (!dirty); assert (!collecting); entry.assertNotDiscarded(); if(entry.isReady()) { // EXCEPTED goes here // if(procedure != null) entry.performFromCache(graph, this, procedure); // parentBarrier.dec(query); hits++; ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency); T result = (T)entry.get(graph, this, procedure); if(listenerEntry != null) primeListenerEntry(listenerEntry, result); return result; } else { // FRESH, REFUTED, PENDING go here entry.setPending(); size++; misses++; ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency); final ReadGraphImpl performGraph = graph.newSync(entry); try { if(Development.DEVELOPMENT) Development.recordHistogram("run " + query); T result = query.perform(performGraph); entry.addOrSet(performGraph, result); if(listenerEntry != null) primeListenerEntry(listenerEntry, result); return (T)entry.get(graph, this, procedure); } catch (Throwable t) { entry.except(t); return (T)entry.get(graph, this, procedure); } } } public void performForEach(final ReadGraphImpl graph, final MultiRead query, final MultiReadEntry entry, CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure procedure, boolean inferredDependency) { if (DebugPolicy.PERFORM) System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); assert (!dirty); assert (!collecting); assert(!entry.isPending()); assert(!entry.isDiscarded()); // FRESH, REFUTED, EXCEPTED go here if (!entry.isReady()) { entry.setPending(); entry.clearResult(querySupport); multiReadMap.put(query, entry); size++; final ReadGraphImpl newGraph = graph.newSync(entry); // newGraph.state.barrier.inc(); try { query.perform(newGraph, new AsyncMultiProcedure() { @Override public void execute(AsyncReadGraph graph, T result) { entry.addOrSet(result); try { procedure.execute(graph, result); } catch (Throwable t) { t.printStackTrace(); } } @Override public void finished(AsyncReadGraph graph) { entry.finish(graph); try { procedure.finished(graph); } catch (Throwable t) { t.printStackTrace(); } // newGraph.state.barrier.dec(); // parentBarrier.dec(); } @Override public void exception(AsyncReadGraph graph, Throwable t) { entry.except(t); try { procedure.exception(graph, t); } catch (Throwable t2) { t2.printStackTrace(); } // newGraph.state.barrier.dec(); // parentBarrier.dec(); } }); } catch (DatabaseException e) { entry.except(e); try { procedure.exception(graph, e); } catch (Throwable t2) { t2.printStackTrace(); } // newGraph.state.barrier.dec(); // parentBarrier.dec(); } catch (Throwable t) { DatabaseException e = new DatabaseException(t); entry.except(e); try { procedure.exception(graph, e); } catch (Throwable t2) { t2.printStackTrace(); } // newGraph.state.barrier.dec(); // parentBarrier.dec(); } misses++; } else { entry.performFromCache(graph, this, procedure); hits++; } assert (!entry.isDiscarded()); registerDependencies(graph, entry, parent, listener, procedure, inferredDependency); } public void performForEach(final ReadGraphImpl callerGraph, AsyncMultiRead query, final AsyncMultiReadEntry entry, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure procedure, boolean inferredDependency) { if (DebugPolicy.PERFORM) System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); assert (!dirty); assert (!collecting); try { assert(!entry.isDiscarded()); // FRESH, REFUTED, EXCEPTED go here if (!entry.isReady()) { size++; try { ReadGraphImpl performGraph = callerGraph.withAsyncParent(entry); query.perform(performGraph, new AsyncMultiProcedure() { @Override public void execute(AsyncReadGraph graph, T result) { ReadGraphImpl impl = (ReadGraphImpl)graph; // ReadGraphImpl executeGraph = callerGraph.newAsync(); entry.addOrSet(result); try { procedure.execute(callerGraph, result); } catch (Throwable t) { t.printStackTrace(); } } @Override public void finished(AsyncReadGraph graph) { ReadGraphImpl impl = (ReadGraphImpl)graph; // ReadGraphImpl executeGraph = callerGraph.newAsync(); entry.finish(callerGraph); try { procedure.finished(callerGraph); } catch (Throwable t) { t.printStackTrace(); } } @Override public void exception(AsyncReadGraph graph, Throwable t) { ReadGraphImpl impl = (ReadGraphImpl)graph; // ReadGraphImpl executeGraph = callerGraph.newAsync(); entry.except(callerGraph, t); try { procedure.exception(callerGraph, t); } catch (Throwable t2) { t2.printStackTrace(); } } }); } catch (Throwable t) { entry.except(t); try { procedure.exception(callerGraph, t); } catch (Throwable t2) { t2.printStackTrace(); } } misses++; } else { entry.performFromCache(callerGraph, this, procedure); hits++; } assert (!entry.isDiscarded()); registerDependencies(callerGraph, entry, parent, listener, procedure, inferredDependency); } catch (Throwable t) { Logger.defaultLogError(t); } finally { } } public void performForEach(ReadGraphImpl graph, final ExternalRead query, final ExternalReadEntry entry, final CacheEntry parent, final ListenerBase base, final Procedure procedure, boolean inferredDependency) { if (DebugPolicy.PERFORM) System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); assert (!dirty); assert (!collecting); assert(!entry.isPending()); assert(!entry.isDiscarded()); registerDependencies(graph, entry, parent, base, procedure, inferredDependency); // FRESH, REFUTED, EXCEPTED go here if (!entry.isReady()) { entry.setPending(); entry.clearResult(querySupport); externalReadMap.put(query, entry); size++; try { query.register(graph, new Listener() { AtomicBoolean used = new AtomicBoolean(false); @Override public void execute(T result) { // Just for safety if(entry.isDiscarded()) return; if(entry.isExcepted()) entry.setPending(); if(used.compareAndSet(false, true)) { entry.addOrSet(QueryProcessor.this, result); procedure.execute(result); } else { entry.queue(result); updatePrimitive(query); } } @Override public void exception(Throwable t) { entry.except(t); if(used.compareAndSet(false, true)) { procedure.exception(t); } else { // entry.queue(result); updatePrimitive(query); } } @Override public String toString() { return procedure.toString(); } @Override public boolean isDisposed() { return entry.isDiscarded() || !isBound(entry); } }); } catch (Throwable t) { entry.except(t); procedure.exception(t); } misses++; } else { entry.performFromCache(procedure); hits++; } assert (!entry.isDiscarded()); } private boolean isBound(ExternalReadEntry entry) { if(entry.hasParents()) return true; else if(hasListener(entry)) return true; else return false; } synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { if (parent != null && !inferred) { try { if(!child.isImmutable(graph)) child.addParent(parent); } catch (DatabaseException e) { Logger.defaultLogError(e); } if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent); } if (listener != null) { return registerListener(child, listener, procedure); } else { return null; } } public void performForEach(ReadGraphImpl graph, BinaryQuery query, CacheEntry parent, ListenerBase listener, Procedure procedure) { if (DebugPolicy.PERFORM) System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); assert (!dirty); assert (!collecting); try { registerDependencies(graph, query, parent, listener, procedure, false); // FRESH, REFUTED, EXCEPTED go here if (!query.isReady()) { boolean fresh = query.isFresh(); if(fresh) { size++; } query.computeForEach(graph, this, procedure, true); misses++; } else { query.performFromCache(graph, this, procedure); hits++; } } catch (Throwable t) { Logger.defaultLogError(t); } } public Object performForEach(ReadGraphImpl graph, UnaryQuery query, CacheEntry parent, ListenerBase listener, Procedure procedure) { if (DebugPolicy.PERFORM) System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); assert (!dirty); assert (!collecting); try { assert(query.assertNotDiscarded()); registerDependencies(graph, query, parent, listener, procedure, false); // FRESH, REFUTED, EXCEPTED go here if (!query.isReady()) { size++; misses++; return query.computeForEach(graph, this, procedure, true); } else { hits++; return query.performFromCache(graph, this, procedure); } } catch (Throwable t) { Logger.defaultLogError(t); return null; } } static class Dummy implements InternalProcedure, IntProcedure { @Override public void execute(ReadGraphImpl graph, int i) { } @Override public void finished(ReadGraphImpl graph) { } @Override public void execute(ReadGraphImpl graph, Object result) { } @Override public void exception(ReadGraphImpl graph, Throwable throwable) { } } private static final Dummy dummy = new Dummy(); public Object performForEach2(ReadGraphImpl graph, UnaryQuery query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable { if (DebugPolicy.PERFORM) System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); assert (!dirty); assert (!collecting); assert(query.assertNotDiscarded()); registerDependencies(graph, query, parent, listener, procedure, false); // FRESH, REFUTED, EXCEPTED go here if (!query.isReady()) { size++; misses++; query.computeForEach(graph, this, (Procedure)dummy, true); return query.get(graph, this, null); } else { hits++; return query.get(graph, this, procedure); } } public void performForEach(ReadGraphImpl graph, StringQuery query, CacheEntry parent, final ListenerBase listener, Procedure procedure) { if (DebugPolicy.PERFORM) System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); assert (!dirty); assert (!collecting); try { if(query.isDiscarded()) { System.err.println("aff"); } assert(!query.isDiscarded()); // FRESH, REFUTED, EXCEPTED go here if (!query.isReady()) { query.computeForEach(graph.withAsyncParent(query), this, procedure); size++; misses++; } else { query.performFromCache(graph, this, procedure); hits++; } assert (!query.isDiscarded()); registerDependencies(graph, query, parent, listener, procedure, false); } catch (Throwable t) { t.printStackTrace(); Logger.defaultLogError(t); } } interface QueryCollectorSupport { public CacheCollectionResult allCaches(); public Collection getRootList(); public int getCurrentSize(); public int calculateCurrentSize(); public CacheEntryBase iterate(int level); public void remove(); public void setLevel(CacheEntryBase entry, int level); public boolean start(boolean flush); } interface QueryCollector { public void collect(int youngTarget, int allowedTimeInMs); } class QueryCollectorSupportImpl implements QueryCollectorSupport { private static final boolean DEBUG = false; private static final double ITERATION_RATIO = 0.2; private CacheCollectionResult iteration = new CacheCollectionResult(); private boolean fresh = true; private boolean needDataInStart = true; QueryCollectorSupportImpl() { iteration.restart(); } public CacheCollectionResult allCaches() { CacheCollectionResult result = new CacheCollectionResult(); QueryProcessor.this.allCaches(result); result.restart(); return result; } public boolean start(boolean flush) { // We need new data from query maps fresh = true; if(needDataInStart || flush) { // Last run ended after processing all queries => refresh data restart(flush ? 0.0 : ITERATION_RATIO); } else { // continue with previous big data } // Notify caller about iteration situation return iteration.isAtStart(); } private void restart(double targetRatio) { needDataInStart = true; long start = System.nanoTime(); if(fresh) { // We need new data from query maps int iterationSize = iteration.size()+1; int diff = calculateCurrentSize()-iterationSize; double ratio = (double)diff / (double)iterationSize; boolean dirty = Math.abs(ratio) >= targetRatio; if(dirty) { iteration = allCaches(); if(DEBUG) { System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") "); for(int i=0;i getRootList() { ArrayList result = new ArrayList(); for (Object e : valueMap.values()) { result.add((CacheEntry) e); } for (Object e : directPredicatesMap.values()) { result.add((CacheEntry) e); } for (Object e : objectsMap.values()) { result.add((CacheEntry) e); } for (Object e : directObjectsMap.values()) { result.add((CacheEntry) e); } for (Object e : principalTypesMap.values()) { result.add((CacheEntry) e); } for (Object e : superRelationsMap.values()) { result.add((CacheEntry) e); } for (Object e : superTypesMap.values()) { result.add((CacheEntry) e); } for (Object e : typesMap.values()) { result.add((CacheEntry) e); } for (Object e : objectsMap.values()) { result.add((CacheEntry) e); } for (Object e : assertedStatementsMap.values()) { result.add((CacheEntry) e); } for (Object e : readMap.values()) { if(e instanceof CacheEntry) { result.add((CacheEntry) e); } else { System.err.println("e=" + e); } } for (Object e : asyncReadMap.values()) { if(e instanceof CacheEntry) { result.add((CacheEntry) e); } else { System.err.println("e=" + e); } } for (Object e : externalReadMap.values()) { result.add((CacheEntry) e); } for (Object e : orderedSetMap.values()) { result.add((CacheEntry) e); } return result; } @Override public int calculateCurrentSize() { int realSize = 0; realSize += directPredicatesMap.size(); realSize += principalTypesMap.size(); realSize += uriToResourceMap.size(); realSize += namespaceIndexMap22.size(); realSize += projectsMap.size(); realSize += relationInfoMap.size(); realSize += superTypesMap.size(); realSize += typeHierarchyMap.size(); realSize += superRelationsMap.size(); realSize += typesMap.size(); realSize += valueMap.size(); realSize += directObjectsMap.size(); realSize += objectsMap.size(); realSize += orderedSetMap.size(); realSize += predicatesMap.size(); realSize += statementsMap.size(); realSize += assertedPredicatesMap.size(); realSize += assertedStatementsMap.size(); realSize += externalReadMap.size(); realSize += asyncReadMap.size(); realSize += readMap.size(); realSize += asyncMultiReadMap.size(); realSize += multiReadMap.size(); size = realSize; return realSize; } @Override public int getCurrentSize() { return size; } } // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600); private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl(); private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport); public int querySize() { return size; } public void gc(int youngTarget, int allowedTimeInMs) { collector.collect(youngTarget, allowedTimeInMs); } public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { assert (entry != null); if (base.isDisposed()) return null; return addListener(entry, base, procedure); } private void primeListenerEntry(final ListenerEntry entry, final Object result) { entry.setLastKnown(result); } private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { assert (entry != null); assert (procedure != null); ArrayList list = listeners.get(entry); if (list == null) { list = new ArrayList(1); listeners.put(entry, list); } ListenerEntry result = new ListenerEntry(entry, base, procedure); int currentIndex = list.indexOf(result); // There was already a listener if(currentIndex > -1) { ListenerEntry current = list.get(currentIndex); if(!current.base.isDisposed()) return null; list.set(currentIndex, result); } else { list.add(result); } if(DebugPolicy.LISTENER) { new Exception().printStackTrace(); System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); } return result; } private void scheduleListener(ListenerEntry entry) { assert (entry != null); if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure); scheduledListeners.add(entry); } private void removeListener(ListenerEntry entry) { assert (entry != null); ArrayList list = listeners.get(entry.entry); if(list == null) return; boolean success = list.remove(entry); assert (success); if (list.isEmpty()) listeners.remove(entry.entry); } private boolean hasListener(CacheEntry entry) { if(listeners.get(entry) != null) return true; return false; } boolean hasListenerAfterDisposing(CacheEntry entry) { if(listeners.get(entry) != null) { ArrayList entries = listeners.get(entry); ArrayList list = null; for (ListenerEntry e : entries) { if (e.base.isDisposed()) { if(list == null) list = new ArrayList(); list.add(e); } } if(list != null) { for (ListenerEntry e : list) { entries.remove(e); } } if (entries.isEmpty()) { listeners.remove(entry); return false; } return true; } return false; } List getListenerEntries(CacheEntry entry) { hasListenerAfterDisposing(entry); if(listeners.get(entry) != null) return listeners.get(entry); else return Collections.emptyList(); } void processListenerReport(CacheEntry entry, Map> workarea) { if(!workarea.containsKey(entry)) { HashSet ls = new HashSet(); for(ListenerEntry e : getListenerEntries(entry)) ls.add(e.base); workarea.put(entry, ls); for(CacheEntry parent : entry.getParents(this)) { processListenerReport(parent, workarea); ls.addAll(workarea.get(parent)); } } } public synchronized ListenerReport getListenerReport() throws IOException { class ListenerReportImpl implements ListenerReport { Map> workarea = new HashMap>(); @Override public void print(PrintStream b) { Map hist = new HashMap(); for(Map.Entry> e : workarea.entrySet()) { for(ListenerBase l : e.getValue()) { Integer i = hist.get(l); hist.put(l, i != null ? i-1 : -1); } } for(Pair p : CollectionUtils.valueSortedEntries(hist)) { b.print("" + -p.second + " " + p.first + "\n"); } b.flush(); } } ListenerReportImpl result = new ListenerReportImpl(); Collection all = allCaches(new CacheCollectionResult()).toCollection(); for(CacheEntryBase entry : all) { hasListenerAfterDisposing(entry); } for(CacheEntryBase entry : all) { processListenerReport(entry, result.workarea); } return result; } public synchronized String reportListeners(File file) throws IOException { if (!isAlive()) return "Disposed!"; PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); ListenerReport report = getListenerReport(); report.print(b); return "Done reporting listeners."; } void processParentReport(CacheEntry entry, Map> workarea) { if(entry.isDiscarded()) return; if(workarea.containsKey(entry)) return; Iterable parents = entry.getParents(this); HashSet ps = new HashSet(); for(CacheEntry e : parents) { if(e.isDiscarded()) continue; ps.add(e); processParentReport(e, workarea); } workarea.put(entry, ps); } public synchronized String reportQueryActivity(File file) throws IOException { System.err.println("reportQueries " + file.getAbsolutePath()); if (!isAlive()) return "Disposed!"; PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); List> entries = CollectionUtils.valueSortedEntries(Development.histogram); Collections.reverse(entries); for(Pair entry : entries) { b.println(entry.first + ": " + entry.second); } b.close(); Development.histogram.clear(); return "OK"; } public synchronized String reportQueries(File file) throws IOException { System.err.println("reportQueries " + file.getAbsolutePath()); if (!isAlive()) return "Disposed!"; PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); long start = System.nanoTime(); // ArrayList all = ; Map> workarea = new HashMap>(); Collection caches = allCaches(new CacheCollectionResult()).toCollection(); for(CacheEntryBase entry : caches) { processParentReport(entry, workarea); } // for(CacheEntry e : all) System.err.println("entry: " + e); long duration = System.nanoTime() - start; System.err.println("Query root set in " + 1e-9*duration + "s."); start = System.nanoTime(); HashMap flagMap = new HashMap(); int listeners = 0; for(CacheEntry entry : workarea.keySet()) { boolean listener = hasListenerAfterDisposing(entry); boolean hasParents = entry.getParents(this).iterator().hasNext(); if(listener) { // Bound flagMap.put(entry, 0); } else if (!hasParents) { // Unbound flagMap.put(entry, 1); } else { // Unknown flagMap.put(entry, 2); } // // Write leaf bit // entry.flags |= 4; } boolean done = true; int loops = 0; do { done = true; long start2 = System.nanoTime(); int boundCounter = 0; int unboundCounter = 0; int unknownCounter = 0; for(CacheEntry entry : workarea.keySet()) { //System.err.println("process " + entry); int flags = flagMap.get(entry); int bindStatus = flags & 3; if(bindStatus == 0) boundCounter++; else if(bindStatus == 1) unboundCounter++; else if(bindStatus == 2) unknownCounter++; if(bindStatus < 2) continue; int newStatus = 1; for(CacheEntry parent : entry.getParents(this)) { if(parent.isDiscarded()) flagMap.put(parent, 1); int flags2 = flagMap.get(parent); int bindStatus2 = flags2 & 3; // Parent is bound => child is bound if(bindStatus2 == 0) { newStatus = 0; break; } // Parent is unknown => child is unknown else if (bindStatus2 == 2) { newStatus = 2; done = false; break; } } flagMap.put(entry, newStatus); } duration = System.nanoTime() - start2; System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s."); b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s."); } while(!done && loops++ < 20); if(loops >= 20) { for(CacheEntry entry : workarea.keySet()) { int bindStatus = flagMap.get(entry); if(bindStatus == 2) System.err.println("Undefined bind status for " + entry); } } duration = System.nanoTime() - start; System.err.println("Query analysis in " + 1e-9*duration + "s."); Map, Integer> counts = new HashMap, Integer>(); for(CacheEntry entry : workarea.keySet()) { Class clazz = entry.getClass(); if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass(); else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass(); else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass(); else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass(); else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass(); Integer c = counts.get(clazz); if(c == null) counts.put(clazz, -1); else counts.put(clazz, c-1); } b.print("// Simantics DB client query report file\n"); b.print("// This file contains the following information\n"); b.print("// -The amount of cached query instances per query class\n"); b.print("// -The sizes of retained child sets\n"); b.print("// -List of parents for each query (search for 'P ')\n"); b.print("// -Followed by status, where\n"); b.print("// -0=bound\n"); b.print("// -1=free\n"); b.print("// -2=unknown\n"); b.print("// -L=has listener\n"); b.print("// -List of children for each query (search for 'C ')\n"); b.print("----------------------------------------\n"); b.print("// Queries by class\n"); for(Pair, Integer> p : CollectionUtils.valueSortedEntries(counts)) { b.print(-p.second + " " + p.first.getName() + "\n"); } Map hist = new HashMap(); for(CacheEntry e : workarea.keySet()) hist.put(e, -1); boolean changed = true; int iter = 0; while(changed && iter++<50) { changed = false; Map newHist = new HashMap(); for(CacheEntry e : workarea.keySet()) newHist.put(e, -1); for(Map.Entry> e : workarea.entrySet()) { Integer c = hist.get(e.getKey()); for(CacheEntry p : e.getValue()) { Integer i = newHist.get(p); newHist.put(p, i+c); } } for(CacheEntry e : workarea.keySet()) { Integer value = newHist.get(e); Integer old = hist.get(e); if(!value.equals(old)) { hist.put(e, value); // System.err.println("hist " + e + ": " + old + " => " + value); changed = true; } } System.err.println("Retained set iteration " + iter); } b.print("// Queries by retained set\n"); for(Pair p : CollectionUtils.valueSortedEntries(hist)) { b.print("" + -p.second + " " + p.first + "\n"); } HashMap> inverse = new HashMap>(); b.print("// Entry parent listing\n"); for(CacheEntry entry : workarea.keySet()) { int status = flagMap.get(entry); boolean hasListener = hasListenerAfterDisposing(entry); b.print("Q " + entry.toString()); if(hasListener) { b.print(" (L" + status + ")"); listeners++; } else { b.print(" (" + status + ")"); } b.print("\n"); for(CacheEntry parent : workarea.get(entry)) { Collection inv = inverse.get(parent); if(inv == null) { inv = new ArrayList(); inverse.put(parent, inv); } inv.add(entry); b.print(" " + parent.toString()); b.print("\n"); } } b.print("// Entry child listing\n"); for(Map.Entry> entry : inverse.entrySet()) { b.print("C " + entry.getKey().toString()); b.print("\n"); for(CacheEntry child : entry.getValue()) { Integer h = hist.get(child); if(h != null) { b.print(" " + h); } else { b.print(" "); } b.print(" " + child.toString()); b.print("\n"); } } b.print("#queries: " + workarea.keySet().size() + "\n"); b.print("#listeners: " + listeners + "\n"); b.close(); return "Dumped " + workarea.keySet().size() + " queries."; } class UpdateEntry { public CacheEntry caller; public CacheEntry entry; public int indent; public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) { this.caller = caller; this.entry = entry; this.indent = indent; } }; boolean removeQuery(CacheEntry entry) { // This entry has been removed before. No need to do anything here. if(entry.isDiscarded()) return false; assert (!entry.isDiscarded()); Query query = entry.getQuery(); query.removeEntry(this); updates++; size--; if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0) boundQueries--; entry.discard(); return true; } /** * * @return true if this entry is being listened */ private boolean updateQuery(UpdateEntry e, LinkedList todo, IdentityHashMap immediates) throws DatabaseException { assert (e != null); CacheEntry entry = e.entry; // System.err.println("updateQuery " + entry); /* * If the dependency graph forms a DAG, some entries are inserted in the * todo list many times. They only need to be processed once though. */ if (entry.isDiscarded()) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { System.out.print("D"); for (int i = 0; i < e.indent; i++) System.out.print(" "); System.out.println(entry.getQuery()); } } // System.err.println(" => DISCARDED"); return false; } if (entry.isRefuted()) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { System.out.print("R"); for (int i = 0; i < e.indent; i++) System.out.print(" "); System.out.println(entry.getQuery()); } } return false; } if (entry.isExcepted()) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { System.out.print("E"); } } } if (entry.isPending()) { if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { System.out.print("P"); } } } updates++; if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { System.out.print("U "); for (int i = 0; i < e.indent; i++) System.out.print(" "); System.out.print(entry.getQuery()); } } Query query = entry.getQuery(); int type = query.type(); boolean hasListener = hasListener(entry); if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { if(hasListener(entry)) { System.out.println(" (L)"); } else { System.out.println(""); } } } if(entry.isPending() || entry.isExcepted()) { // If updated if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) { immediates.put(entry, entry); } else { if(hasListener) { entry.refute(); } else { removeQuery(entry); } } } else { // If updated if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) { immediates.put(entry, entry); } else { if(hasListener) { entry.refute(); } else { removeQuery(entry); } } } // System.err.println(" => FOO " + type); if (hasListener) { ArrayList entries = listeners.get(entry); if(entries != null) { for (ListenerEntry le : entries) { scheduleListener(le); } } } // If invalid, update parents if (type == RequestFlags.INVALIDATE) { updateParents(e.indent, entry, todo); } return hasListener; } private void updateParents(int indent, CacheEntry entry, LinkedList todo) { Iterable oldParents = entry.getParents(this); for (CacheEntry parent : oldParents) { // System.err.println("updateParents " + entry + " => " + parent); if(!parent.isDiscarded()) todo.push(new UpdateEntry(entry, parent, indent + 2)); } } private boolean pruneListener(ListenerEntry entry) { if (entry.base.isDisposed()) { removeListener(entry); return true; } else { return false; } } /** * @param av1 an array (guaranteed) * @param av2 any object * @return true if the two arrays are equal */ private final boolean arrayEquals(Object av1, Object av2) { if (av2 == null) return false; Class c1 = av1.getClass().getComponentType(); Class c2 = av2.getClass().getComponentType(); if (c2 == null || !c1.equals(c2)) return false; boolean p1 = c1.isPrimitive(); boolean p2 = c2.isPrimitive(); if (p1 != p2) return false; if (!p1) return Arrays.equals((Object[]) av1, (Object[]) av2); if (boolean.class.equals(c1)) return Arrays.equals((boolean[]) av1, (boolean[]) av2); else if (byte.class.equals(c1)) return Arrays.equals((byte[]) av1, (byte[]) av2); else if (int.class.equals(c1)) return Arrays.equals((int[]) av1, (int[]) av2); else if (long.class.equals(c1)) return Arrays.equals((long[]) av1, (long[]) av2); else if (float.class.equals(c1)) return Arrays.equals((float[]) av1, (float[]) av2); else if (double.class.equals(c1)) return Arrays.equals((double[]) av1, (double[]) av2); throw new RuntimeException("??? Contact application querySupport."); } final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) { try { Query query = entry.getQuery(); if(DebugPolicy.RECOMPUTE) System.out.println("R " + query); entry.prepareRecompute(querySupport); ReadGraphImpl parentGraph = graph.withParent(entry); query.recompute(parentGraph, this, entry); if(entry.isExcepted()) return ListenerEntry.NO_VALUE; Object newValue = entry.getResult(); if (ListenerEntry.NO_VALUE == oldValue) { if(DebugPolicy.CHANGES) { System.out.println("C " + query); System.out.println("- " + oldValue); System.out.println("- " + newValue); } return newValue; } boolean changed = false; if (newValue != null) { if (newValue.getClass().isArray()) { changed = !arrayEquals(newValue, oldValue); } else { changed = !newValue.equals(oldValue); } } else changed = (oldValue != null); if(DebugPolicy.CHANGES && changed) { System.out.println("C " + query); System.out.println("- " + oldValue); System.out.println("- " + newValue); } return changed ? newValue : ListenerEntry.NOT_CHANGED; } catch (Throwable t) { Logger.defaultLogError(t); entry.except(t); return ListenerEntry.NO_VALUE; } } public boolean hasScheduledUpdates() { return !scheduledListeners.isEmpty(); } public void performScheduledUpdates(WriteGraphImpl graph) { assert (!updating); assert (!collecting); assert (!firingListeners); firingListeners = true; try { // Performing may cause further events to be scheduled. while (!scheduledListeners.isEmpty()) { // graph.restart(); // graph.state.barrier.inc(); // Clone current events to make new entries possible during // firing. THashSet entries = scheduledListeners; scheduledListeners = new THashSet(); ArrayList schedule = new ArrayList(); for (ListenerEntry listenerEntry : entries) { if (pruneListener(listenerEntry)) { if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure); continue; } final CacheEntry entry = listenerEntry.entry; assert (entry != null); Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown()); if (newValue != ListenerEntry.NOT_CHANGED) { if(DebugPolicy.LISTENER) System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); schedule.add(listenerEntry); listenerEntry.setLastKnown(entry.getResult()); } } for(ListenerEntry listenerEntry : schedule) { final CacheEntry entry = listenerEntry.entry; if(DebugPolicy.LISTENER) System.out.println("Firing " + listenerEntry.procedure); try { if(DebugPolicy.LISTENER) System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); entry.performFromCache(graph, this, listenerEntry.procedure); } catch (Throwable t) { t.printStackTrace(); } } // graph.state.barrier.dec(); // graph.waitAsync(null); // graph.state.barrier.assertReady(); } } finally { firingListeners = false; } } /** * * @return true if this entry still has listeners */ public boolean update(final ReadGraphImpl graph, final CacheEntry entry) { assert (!collecting); assert (!updating); updating = true; boolean hadListeners = false; boolean listenersUnknown = false; try { assert(entry != null); LinkedList todo = new LinkedList(); IdentityHashMap immediates = new IdentityHashMap(); todo.add(new UpdateEntry(null, entry, 0)); while(true) { // Walk the tree and collect immediate updates while (!todo.isEmpty()) { UpdateEntry e = todo.pop(); hadListeners |= updateQuery(e, todo, immediates); } if(immediates.isEmpty()) break; // Evaluate all immediate updates and collect parents to update for(CacheEntry immediate : immediates.values()) { if(immediate.isDiscarded()) { continue; } if(immediate.isExcepted()) { Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE); if (newValue != ListenerEntry.NOT_CHANGED) updateParents(0, immediate, todo); } else { Object oldValue = immediate.getResult(); Object newValue = compareTo(graph, immediate, oldValue); if (newValue != ListenerEntry.NOT_CHANGED) { updateParents(0, immediate, todo); } else { // If not changed, keep the old value immediate.setResult(oldValue); listenersUnknown = true; } } } immediates.clear(); } } catch (Throwable t) { Logger.defaultLogError(t); } assert (updating); updating = false; return hadListeners | listenersUnknown; } volatile public boolean dirty = false; private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet(); private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet(); private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet(); // Maybe use a mutex from util.concurrent? private Object primitiveUpdateLock = new Object(); private THashSet scheduledPrimitiveUpdates = new THashSet(); public void performDirtyUpdates(final ReadGraphImpl graph) { dirty = false; lastInvalidate = 0; if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { System.err.println("== Query update =="); } } // Special case - one statement if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) { long arg0 = scheduledObjectUpdates.getFirst(); final int subject = (int)(arg0 >>> 32); final int predicate = (int)(arg0 & 0xffffffff); for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o); for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o); for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o); if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) { PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject); if(principalTypes != null) update(graph, principalTypes); Types types = Types.entry(QueryProcessor.this, subject); if(types != null) update(graph, types); } if(predicate == subrelationOf) { SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject); if(superRelations != null) update(graph, superRelations); } DirectPredicates dp = DirectPredicates.entry(QueryProcessor.this, subject); if(dp != null) update(graph, dp); OrderedSet os = OrderedSet.entry(QueryProcessor.this, predicate); if(os != null) update(graph, os); scheduledObjectUpdates.clear(); return; } // Special case - one value if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) { int arg0 = scheduledValueUpdates.getFirst(); ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0); if(valueQuery != null) update(graph, valueQuery); scheduledValueUpdates.clear(); return; } final TIntHashSet predicates = new TIntHashSet(); final TIntHashSet orderedSets = new TIntHashSet(); THashSet primitiveUpdates; synchronized (primitiveUpdateLock) { primitiveUpdates = scheduledPrimitiveUpdates; scheduledPrimitiveUpdates = new THashSet(); } primitiveUpdates.forEach(new TObjectProcedure() { @Override public boolean execute(Object arg0) { ExternalReadEntry query = (ExternalReadEntry)externalReadMap.get(arg0); if (query != null) { boolean listening = update(graph, query); if (!listening && !query.hasParents()) { externalReadMap.remove(arg0); query.discard(); } } return true; } }); scheduledValueUpdates.forEach(new TIntProcedure() { @Override public boolean execute(int arg0) { ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0); if(valueQuery != null) update(graph, valueQuery); return true; } }); scheduledInvalidates.forEach(new TIntProcedure() { @Override public boolean execute(int resource) { ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, resource); if(valueQuery != null) update(graph, valueQuery); PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, resource); if(principalTypes != null) update(graph, principalTypes); Types types = Types.entry(QueryProcessor.this, resource); if(types != null) update(graph, types); SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource); if(superRelations != null) update(graph, superRelations); predicates.add(resource); return true; } }); scheduledObjectUpdates.forEach(new TLongProcedure() { @Override public boolean execute(long arg0) { final int subject = (int)(arg0 >>> 32); final int predicate = (int)(arg0 & 0xffffffff); if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) { PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject); if(principalTypes != null) update(graph, principalTypes); Types types = Types.entry(QueryProcessor.this, subject); if(types != null) update(graph, types); } if(predicate == subrelationOf) { SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject); if(superRelations != null) update(graph, superRelations); } predicates.add(subject); orderedSets.add(predicate); return true; } }); predicates.forEach(new TIntProcedure() { @Override public boolean execute(final int subject) { for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o); for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o); for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o); DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject); if(entry != null) update(graph, entry); return true; } }); orderedSets.forEach(new TIntProcedure() { @Override public boolean execute(int orderedSet) { OrderedSet entry = OrderedSet.entry(QueryProcessor.this, orderedSet); if(entry != null) update(graph, entry); return true; } }); // for (Integer subject : predicates) { // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject); // if(entry != null) update(graph, entry); // } if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { System.err.println("== Query update ends =="); } } scheduledValueUpdates.clear(); scheduledObjectUpdates.clear(); scheduledInvalidates.clear(); } public void updateValue(final int resource) { scheduledValueUpdates.add(resource); dirty = true; } public void updateStatements(final int resource, final int predicate) { scheduledObjectUpdates.add((((long)resource) << 32) + predicate); dirty = true; } private int lastInvalidate = 0; public void invalidateResource(final int resource) { if(lastInvalidate == resource) return; scheduledValueUpdates.add(resource); lastInvalidate = resource; dirty = true; } public void updatePrimitive(final ExternalRead primitive) { // External reads may be updated from arbitrary threads. // Synchronize to prevent race-conditions. synchronized (primitiveUpdateLock) { scheduledPrimitiveUpdates.add(primitive); } querySupport.dirtyPrimitives(); } @Override public synchronized String toString() { return "QueryProvider [size = " + size + ", hits = " + hits + " misses = " + misses + ", updates = " + updates + "]"; } @Override protected void doDispose() { for(int index = 0; index < THREADS; index++) { executors[index].dispose(); } // First just wait for(int i=0;i<100;i++) { boolean alive = false; for(int index = 0; index < THREADS; index++) { alive |= executors[index].isAlive(); } if(!alive) return; try { Thread.sleep(5); } catch (InterruptedException e) { Logger.defaultLogError(e); } } // Then start interrupting for(int i=0;i<100;i++) { boolean alive = false; for(int index = 0; index < THREADS; index++) { alive |= executors[index].isAlive(); } if(!alive) return; for(int index = 0; index < THREADS; index++) { executors[index].interrupt(); } } // // Then just destroy // for(int index = 0; index < THREADS; index++) { // executors[index].destroy(); // } for(int index = 0; index < THREADS; index++) { try { executors[index].join(5000); } catch (InterruptedException e) { Logger.defaultLogError("QueryThread " + index + " will not die.", e); } executors[index] = null; } } public int getHits() { return hits; } public int getMisses() { return misses; } public int getSize() { return size; } public Set getReferencedClusters() { HashSet result = new HashSet(); for (CacheEntry entry : objectsMap.values()) { Objects query = (Objects) entry.getQuery(); result.add(querySupport.getClusterId(query.r1())); } for (CacheEntry entry : directPredicatesMap.values()) { DirectPredicates query = (DirectPredicates) entry.getQuery(); result.add(querySupport.getClusterId(query.id)); } for (CacheEntry entry : valueMap.values()) { ValueQuery query = (ValueQuery) entry.getQuery(); result.add(querySupport.getClusterId(query.id)); } return result; } public void assertDone() { } CacheCollectionResult allCaches(CacheCollectionResult result) { int level = Integer.MAX_VALUE; directPredicatesMap.values(level, result); principalTypesMap.values(level, result); for(CacheEntryBase e : uriToResourceMap.values()) if(e.getLevel() <= level) result.add(e); for(CacheEntryBase e : namespaceIndexMap22.values()) if(e.getLevel() <= level) result.add(e); projectsMap.values(level, result); relationInfoMap.values(level, result); superTypesMap.values(level, result); typeHierarchyMap.values(level, result); superRelationsMap.values(level, result); typesMap.values(level, result); valueMap.values(level, result); directObjectsMap.values(level, result); objectsMap.values(level, result); orderedSetMap.values(level, result); predicatesMap.values(level, result); statementsMap.values(level, result); assertedPredicatesMap.values(level, result); assertedStatementsMap.values(level, result); externalReadMap.values(level, result); asyncReadMap.values(level, result); readMap.values(level, result); asyncMultiReadMap.values(level, result); multiReadMap.values(level, result); return result; } public void printDiagnostics() { } public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) { querySupport.requestCluster(graph, clusterId, runnable); } public int clean() { collector.collect(0, Integer.MAX_VALUE); return size; } public void clean(final Collection> requests) { QueryCollectorSupport collectorSupport = new QueryCollectorSupport() { Iterator> iterator = requests.iterator(); @Override public CacheCollectionResult allCaches() { throw new UnsupportedOperationException(); } @Override public CacheEntryBase iterate(int level) { if(iterator.hasNext()) { ExternalRead request = iterator.next(); ExternalReadEntry entry = externalReadMap.get(request); if (entry != null) return entry; else return iterate(level); } else { iterator = requests.iterator(); return null; } } @Override public void remove() { throw new UnsupportedOperationException(); } @Override public void setLevel(CacheEntryBase entry, int level) { throw new UnsupportedOperationException(); } @Override public Collection getRootList() { ArrayList result = new ArrayList(requests.size()); for (ExternalRead request : requests) { ExternalReadEntry entry = externalReadMap.get(request); if (entry != null) result.add(entry); } return result; } @Override public int getCurrentSize() { return size; } @Override public int calculateCurrentSize() { // This tells the collector to attempt collecting everything. return Integer.MAX_VALUE; } @Override public boolean start(boolean flush) { return true; } }; new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE); } public void scanPending() { ArrayList entries = new ArrayList(); entries.addAll(directPredicatesMap.values()); entries.addAll(principalTypesMap.values()); entries.addAll(uriToResourceMap.values()); entries.addAll(namespaceIndexMap22.values()); entries.addAll(projectsMap.values()); entries.addAll(relationInfoMap.values()); entries.addAll(superTypesMap.values()); entries.addAll(superRelationsMap.values()); entries.addAll(typesMap.values()); entries.addAll(valueMap.values()); entries.addAll(directObjectsMap.values()); entries.addAll(objectsMap.values()); entries.addAll(orderedSetMap.values()); entries.addAll(predicatesMap.values()); entries.addAll(orderedSetMap.values()); entries.addAll(statementsMap.values()); // entries.addAll(assertedObjectsMap.values()); entries.addAll(assertedPredicatesMap.values()); entries.addAll(assertedStatementsMap.values()); entries.addAll(externalReadMap.values()); entries.addAll(asyncReadMap.values()); entries.addAll(externalReadMap.values()); entries.addAll(readMap.values()); entries.addAll(asyncMultiReadMap.values()); entries.addAll(multiReadMap.values()); entries.addAll(readMap.values()); System.out.println(entries.size() + " entries."); for(Object e : entries) { if(e instanceof CacheEntry) { CacheEntry en = (CacheEntry)e; if(en.isPending()) System.out.println("pending " + e); if(en.isExcepted()) System.out.println("excepted " + e); if(en.isDiscarded()) System.out.println("discarded " + e); if(en.isRefuted()) System.out.println("refuted " + e); if(en.isFresh()) System.out.println("fresh " + e); } else { //System.out.println("Unknown object " + e); } } } public ReadGraphImpl graphForVirtualRequest() { return ReadGraphImpl.createAsync(this); } private HashMap> builtinValues; public Class getBuiltinValue(Resource r) { if(builtinValues == null) initBuiltinValues(); return builtinValues.get(r); } Exception callerException = null; public interface AsyncBarrier { public void inc(); public void dec(); // public void inc(String debug); // public void dec(String debug); } // final public QueryProcessor processor; // final public QuerySupport support; // boolean disposed = false; private void initBuiltinValues() { Layer0 b = getSession().peekService(Layer0.class); if(b == null) return; builtinValues = new HashMap>(); builtinValues.put(b.String, String.class); builtinValues.put(b.Double, Double.class); builtinValues.put(b.Float, Float.class); builtinValues.put(b.Long, Long.class); builtinValues.put(b.Integer, Integer.class); builtinValues.put(b.Byte, Byte.class); builtinValues.put(b.Boolean, Boolean.class); builtinValues.put(b.StringArray, String[].class); builtinValues.put(b.DoubleArray, double[].class); builtinValues.put(b.FloatArray, float[].class); builtinValues.put(b.LongArray, long[].class); builtinValues.put(b.IntegerArray, int[].class); builtinValues.put(b.ByteArray, byte[].class); builtinValues.put(b.BooleanArray, boolean[].class); } // public ReadGraphSupportImpl(final QueryProcessor provider2) { // // if (null == provider2) { // this.processor = null; // support = null; // return; // } // this.processor = provider2; // support = provider2.getCore(); // initBuiltinValues(); // // } // final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) { // return new ReadGraphSupportImpl(impl.processor); // } @Override final public Session getSession() { return session; } final public ResourceSupport getResourceSupport() { return resourceSupport; } @Override final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); IntProcedure ip = new IntProcedure() { AtomicBoolean first = new AtomicBoolean(true); @Override public void execute(ReadGraphImpl graph, int i) { try { if(first.get()) { procedure.execute(graph, querySupport.getResource(i)); } else { procedure.execute(impl.newRestart(graph), querySupport.getResource(i)); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { if(first.compareAndSet(true, false)) { procedure.finished(graph); // impl.state.barrier.dec(this); } else { procedure.finished(impl.newRestart(graph)); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { if(first.compareAndSet(true, false)) { procedure.exception(graph, t); // impl.state.barrier.dec(this); } else { procedure.exception(impl.newRestart(graph), t); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } }; int sId = querySupport.getId(subject); // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Predicates#" + sId); // else impl.state.barrier.inc(null, null); Predicates.queryEach(impl, sId, this, impl.parent, listener, ip); } @Override final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); Predicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() { @Override public void execute(ReadGraphImpl graph, int i) { try { procedure.execute(querySupport.getResource(i)); } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { procedure.finished(); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } @Override final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable { assert(subject != null); return Predicates.queryEach2(impl, querySupport.getId(subject), this, impl.parent); } @Override final public void forEachStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() { @Override public void execute(ReadGraphImpl graph, int s, int p, int o) { try { procedure.execute(querySupport.getStatement(s, p, o)); } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { procedure.finished(); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } @Override final public void forEachStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() { boolean first = true; @Override public void execute(ReadGraphImpl graph, int s, int p, int o) { try { if(first) { procedure.execute(graph, querySupport.getStatement(s, p, o)); } else { procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o)); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { if(first) { first = false; procedure.finished(graph); // impl.state.barrier.dec(this); } else { procedure.finished(impl.newRestart(graph)); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { if(first) { first = false; procedure.exception(graph, t); // impl.state.barrier.dec(this); } else { procedure.exception(impl.newRestart(graph), t); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } }; int sId = querySupport.getId(subject); int pId = querySupport.getId(predicate); // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId); // else impl.state.barrier.inc(null, null); Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc); } @Override final public void forEachStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final StatementProcedure procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() { boolean first = true; @Override public void execute(ReadGraphImpl graph, int s, int p, int o) { try { if(first) { procedure.execute(graph, s, p, o); } else { procedure.execute(impl.newRestart(graph), s, p, o); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { if(first) { first = false; procedure.finished(graph); // impl.state.barrier.dec(this); } else { procedure.finished(impl.newRestart(graph)); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { if(first) { first = false; procedure.exception(graph, t); // impl.state.barrier.dec(this); } else { procedure.exception(impl.newRestart(graph), t); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } }; int sId = querySupport.getId(subject); int pId = querySupport.getId(predicate); // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId); // else impl.state.barrier.inc(null, null); Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc); } @Override final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); forEachStatement(impl, subject, predicate, new AsyncMultiListener() { private Set current = null; private Set run = new HashSet(); @Override public void execute(AsyncReadGraph graph, Statement result) { boolean found = false; if(current != null) { found = current.remove(result); } if(!found) procedure.add(graph, result); run.add(result); } @Override public void finished(AsyncReadGraph graph) { if(current != null) { for(Statement r : current) procedure.remove(graph, r); } current = run; run = new HashSet(); } @Override public void exception(AsyncReadGraph graph, Throwable t) { procedure.exception(graph, t); } @Override public boolean isDisposed() { return procedure.isDisposed(); } }); } @Override final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() { @Override public void execute(ReadGraphImpl graph, int s, int p, int o) { try { procedure.execute(graph, querySupport.getStatement(s, p, o)); } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { procedure.finished(graph); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } private static ListenerBase getListenerBase(Object procedure) { if(procedure instanceof ListenerBase) return (ListenerBase)procedure; else return null; } @Override final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); Objects.runner(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() { @Override public void execute(ReadGraphImpl graph, int i) { try { procedure.execute(querySupport.getResource(i)); } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { procedure.finished(); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public void exception(ReadGraphImpl graph, Throwable t) { System.out.println("forEachObject exception " + t); try { procedure.exception(t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } // @Override // final public void forEachDirectObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure procedure) { // // assert(subject != null); // assert(predicate != null); // assert(procedure != null); // // final ListenerBase listener = getListenerBase(procedure); // // int sId = querySupport.getId(subject); // int pId = querySupport.getId(predicate); // // MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, support); // // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectObjects" + sId + "#" + pId); // else impl.state.barrier.inc(null, null); // // // final Exception caller = new Exception(); // // // final Pair exceptions = Pair.make(callerException, new Exception()); // // DirectObjects.queryEach(impl, sId, pId, processor, impl.parent, listener, proc); // // } @Override final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport); int sId = querySupport.getId(subject); // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectPredicates" + sId); // else impl.state.barrier.inc(null, null); DirectPredicates.queryEach(impl, sId, this, impl.parent, listener, proc); } @Override final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure); } @Override final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure, boolean ignoreVirtual) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual); } private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE); @Override final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure procedure) { forEachObject(impl, subject, predicate, new AsyncMultiProcedure() { private Resource single = null; @Override public synchronized void execute(AsyncReadGraph graph, Resource result) { if(single == null) { single = result; } else { single = INVALID_RESOURCE; } } @Override public synchronized void finished(AsyncReadGraph graph) { if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null); else procedure.execute(graph, single); } @Override public synchronized void exception(AsyncReadGraph graph, Throwable throwable) { procedure.exception(graph, throwable); } }); } final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) { final int sId = querySupport.getId(subject); final int pId = querySupport.getId(predicate); Objects.runner(impl, sId, pId, impl.parent, listener, procedure); } final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException { final int sId = querySupport.getId(subject); final int pId = querySupport.getId(predicate); return Objects.runner2(impl, sId, pId, impl.parent); } final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure procedure) { assert(subject != null); assert(predicate != null); final ListenerBase listener = getListenerBase(procedure); if(impl.parent != null || listener != null) { IntProcedure ip = new IntProcedure() { AtomicBoolean first = new AtomicBoolean(true); @Override public void execute(ReadGraphImpl graph, int i) { try { if(first.get()) { procedure.execute(impl, querySupport.getResource(i)); } else { procedure.execute(impl.newRestart(graph), querySupport.getResource(i)); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { if(first.compareAndSet(true, false)) { procedure.finished(impl); // impl.state.barrier.dec(this); } else { procedure.finished(impl.newRestart(graph)); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(this); } @Override public String toString() { return "forEachObject with " + procedure; } }; // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate); // else impl.state.barrier.inc(null, null); forEachObject(impl, subject, predicate, listener, ip); } else { IntProcedure ip = new IntProcedure() { @Override public void execute(ReadGraphImpl graph, int i) { procedure.execute(graph, querySupport.getResource(i)); } @Override public void finished(ReadGraphImpl graph) { procedure.finished(graph); } @Override public void exception(ReadGraphImpl graph, Throwable t) { procedure.exception(graph, t); } @Override public String toString() { return "forEachObject with " + procedure; } }; forEachObject(impl, subject, predicate, listener, ip); } } @Override final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); forEachObject(impl, subject, predicate, new AsyncMultiListener() { private Set current = null; private Set run = new HashSet(); @Override public void execute(AsyncReadGraph graph, Resource result) { boolean found = false; if(current != null) { found = current.remove(result); } if(!found) procedure.add(graph, result); run.add(result); } @Override public void finished(AsyncReadGraph graph) { if(current != null) { for(Resource r : current) procedure.remove(graph, r); } current = run; run = new HashSet(); } @Override public boolean isDisposed() { return procedure.isDisposed(); } @Override public void exception(AsyncReadGraph graph, Throwable t) { procedure.exception(graph, t); } @Override public String toString() { return "forObjectSet " + procedure; } }); } @Override final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener procedure) { assert(subject != null); assert(procedure != null); forEachPredicate(impl, subject, new AsyncMultiListener() { private Set current = null; private Set run = new HashSet(); @Override public void execute(AsyncReadGraph graph, Resource result) { boolean found = false; if(current != null) { found = current.remove(result); } if(!found) procedure.add(graph, result); run.add(result); } @Override public void finished(AsyncReadGraph graph) { if(current != null) { for(Resource r : current) procedure.remove(graph, r); } current = run; run = new HashSet(); } @Override public boolean isDisposed() { return procedure.isDisposed(); } @Override public void exception(AsyncReadGraph graph, Throwable t) { procedure.exception(graph, t); } @Override public String toString() { return "forPredicateSet " + procedure; } }); } @Override final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener procedure) { assert(subject != null); assert(procedure != null); forEachPrincipalType(impl, subject, new AsyncMultiListener() { private Set current = null; private Set run = new HashSet(); @Override public void execute(AsyncReadGraph graph, Resource result) { boolean found = false; if(current != null) { found = current.remove(result); } if(!found) procedure.add(graph, result); run.add(result); } @Override public void finished(AsyncReadGraph graph) { if(current != null) { for(Resource r : current) procedure.remove(graph, r); } current = run; run = new HashSet(); } @Override public boolean isDisposed() { return procedure.isDisposed(); } @Override public void exception(AsyncReadGraph graph, Throwable t) { procedure.exception(graph, t); } @Override public String toString() { return "forPrincipalTypeSet " + procedure; } }); } @Override final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener() { private Set current = null; private Set run = new HashSet(); @Override public void execute(AsyncReadGraph graph, Resource result) { boolean found = false; if(current != null) { found = current.remove(result); } if(!found) procedure.add(graph, result); run.add(result); } @Override public void finished(AsyncReadGraph graph) { if(current != null) { for(Resource r : current) procedure.remove(graph, r); } current = run; run = new HashSet(); } @Override public boolean isDisposed() { return procedure.isDisposed(); } @Override public void exception(AsyncReadGraph graph, Throwable t) { procedure.exception(graph, t); } @Override public String toString() { return "forObjectSet " + procedure; } }); } @Override final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener() { private Set current = null; private Set run = new HashSet(); @Override public void execute(AsyncReadGraph graph, Statement result) { boolean found = false; if(current != null) { found = current.remove(result); } if(!found) procedure.add(graph, result); run.add(result); } @Override public void finished(AsyncReadGraph graph) { if(current != null) { for(Statement s : current) procedure.remove(graph, s); } current = run; run = new HashSet(); } @Override public boolean isDisposed() { return procedure.isDisposed(); } @Override public void exception(AsyncReadGraph graph, Throwable t) { procedure.exception(graph, t); } @Override public String toString() { return "forStatementSet " + procedure; } }); } @Override final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedure() { @Override public void execute(ReadGraphImpl graph, int s, int p, int o) { try { procedure.execute(graph, querySupport.getResource(o)); } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { procedure.finished(graph); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } @Override final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); IntProcedure ip = new IntProcedure() { @Override public void execute(ReadGraphImpl graph, int i) { try { procedure.execute(graph, querySupport.getResource(i)); } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { procedure.finished(graph); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(this); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(this); } }; int sId = querySupport.getId(subject); // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId); // else impl.state.barrier.inc(null, null); PrincipalTypes.queryEach(impl, sId, this, impl.parent, listener, ip); } @Override final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); PrincipalTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() { @Override public void execute(ReadGraphImpl graph, int i) { try { procedure.execute(querySupport.getResource(i)); } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { procedure.finished(); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure> procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); InternalProcedure ip = new InternalProcedure() { AtomicBoolean first = new AtomicBoolean(true); @Override public void execute(final ReadGraphImpl graph, IntSet set) { try { if(first.compareAndSet(true, false)) { procedure.execute(graph, set); // impl.state.barrier.dec(this); } else { procedure.execute(impl.newRestart(graph), set); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { if(first.compareAndSet(true, false)) { procedure.exception(graph, t); // impl.state.barrier.dec(this); } else { procedure.exception(impl.newRestart(graph), t); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } }; int sId = querySupport.getId(subject); // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Types" + sId); // else impl.state.barrier.inc(null, null); Types.queryEach(impl, sId, this, impl.parent, listener, ip); } @Override final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable { assert(subject != null); return Types.queryEach2(impl, querySupport.getId(subject), this, impl.parent); } @Override final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); RelationInfoQuery.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure() { AtomicBoolean first = new AtomicBoolean(true); @Override public void execute(final ReadGraphImpl graph, RelationInfo set) { try { if(first.compareAndSet(true, false)) { procedure.execute(graph, set); // impl.state.barrier.dec(); } else { procedure.execute(impl.newRestart(graph), set); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { if(first.compareAndSet(true, false)) { procedure.exception(graph, t); // impl.state.barrier.dec("ReadGraphSupportImpl.1353"); } else { procedure.exception(impl.newRestart(graph), t); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } }); } @Override final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure> procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); SuperTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure() { AtomicBoolean first = new AtomicBoolean(true); @Override public void execute(final ReadGraphImpl graph, IntSet set) { // final HashSet result = new HashSet(); // set.forEach(new TIntProcedure() { // // @Override // public boolean execute(int type) { // result.add(querySupport.getResource(type)); // return true; // } // // }); try { if(first.compareAndSet(true, false)) { procedure.execute(graph, set); // impl.state.barrier.dec(); } else { procedure.execute(impl.newRestart(graph), set); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { if(first.compareAndSet(true, false)) { procedure.exception(graph, t); // impl.state.barrier.dec(); } else { procedure.exception(impl.newRestart(graph), t); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } }); } @Override final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); IntProcedure ip = new IntProcedureAdapter() { @Override public void execute(final ReadGraphImpl graph, int superRelation) { try { procedure.execute(graph, querySupport.getResource(superRelation)); } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(final ReadGraphImpl graph) { try { procedure.finished(graph); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(this); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(this); } }; int sId = querySupport.getId(subject); // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId); // else impl.state.barrier.inc(null, null); DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip); } @Override final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure); } @Override final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure> procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); InternalProcedure ip = new InternalProcedure() { @Override public void execute(final ReadGraphImpl graph, IntSet set) { // final HashSet result = new HashSet(); // set.forEach(new TIntProcedure() { // // @Override // public boolean execute(int type) { // result.add(querySupport.getResource(type)); // return true; // } // // }); try { procedure.execute(graph, set); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(this); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(this); } }; int sId = querySupport.getId(subject); // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId); // else impl.state.barrier.inc(null, null); SuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip); } final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException { int sId = querySupport.getId(subject); return ValueQuery.queryEach(impl, sId, impl.parent); } final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException { return ValueQuery.queryEach(impl, subject, impl.parent); } @Override final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { assert(subject != null); int sId = querySupport.getId(subject); if(procedure != null) { final ListenerBase listener = getListenerBase(procedure); InternalProcedure ip = new InternalProcedure() { AtomicBoolean first = new AtomicBoolean(true); @Override public void execute(ReadGraphImpl graph, byte[] result) { try { if(first.compareAndSet(true, false)) { procedure.execute(graph, result); // impl.state.barrier.dec(this); } else { procedure.execute(impl.newRestart(graph), result); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { if(first.compareAndSet(true, false)) { procedure.exception(graph, t); // impl.state.barrier.dec(this); } else { procedure.exception(impl.newRestart(graph), t); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } }; // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId); // else impl.state.barrier.inc(null, null); return ValueQuery.queryEach(impl, sId, impl.parent, listener, ip); } else { return ValueQuery.queryEach(impl, sId, impl.parent, null, null); } } @Override final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); if(impl.parent != null || listener != null) { InternalProcedure ip = new InternalProcedure() { AtomicBoolean first = new AtomicBoolean(true); @Override public void execute(ReadGraphImpl graph, byte[] result) { try { if(first.compareAndSet(true, false)) { procedure.execute(graph, result); // impl.state.barrier.dec(this); } else { procedure.execute(impl.newRestart(graph), result); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { if(first.compareAndSet(true, false)) { procedure.exception(graph, t); // impl.state.barrier.dec(this); } else { procedure.exception(impl.newRestart(graph), t); } } catch (Throwable t2) { Logger.defaultLogError(t2); } } }; int sId = querySupport.getId(subject); // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId); // else impl.state.barrier.inc(null, null); ValueQuery.queryEach(impl, sId, impl.parent, listener, ip); } else { InternalProcedure ip = new InternalProcedure() { @Override public void execute(ReadGraphImpl graph, byte[] result) { procedure.execute(graph, result); } @Override public void exception(ReadGraphImpl graph, Throwable t) { procedure.exception(graph, t); } }; int sId = querySupport.getId(subject); ValueQuery.queryEach(impl, sId, impl.parent, listener, ip); } } @Override final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure procedure) { assert(relation != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); IntProcedure ip = new IntProcedure() { private int result = 0; final AtomicBoolean found = new AtomicBoolean(false); final AtomicBoolean done = new AtomicBoolean(false); @Override public void finished(ReadGraphImpl graph) { // Shall fire exactly once! if(done.compareAndSet(false, true)) { try { if(result == 0) { procedure.exception(graph, new NoInverseException("")); // impl.state.barrier.dec(this); } else { procedure.execute(graph, querySupport.getResource(result)); // impl.state.barrier.dec(this); } } catch (Throwable t) { Logger.defaultLogError(t); } } } @Override public void execute(ReadGraphImpl graph, int i) { if(found.compareAndSet(false, true)) { this.result = i; } else { // Shall fire exactly once! if(done.compareAndSet(false, true)) { try { procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result)); // impl.state.barrier.dec(this); } catch (Throwable t) { Logger.defaultLogError(t); } } } } @Override public void exception(ReadGraphImpl graph, Throwable t) { // Shall fire exactly once! if(done.compareAndSet(false, true)) { try { procedure.exception(graph, t); // impl.state.barrier.dec(this); } catch (Throwable t2) { Logger.defaultLogError(t2); } } } }; int sId = querySupport.getId(relation); // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId); // else impl.state.barrier.inc(null, null); Objects.runner(impl, sId, getInverseOf(), impl.parent, listener, ip); } @Override final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure procedure) { assert(id != null); assert(procedure != null); InternalProcedure ip = new InternalProcedure() { @Override public void execute(ReadGraphImpl graph, Integer result) { try { procedure.execute(graph, querySupport.getResource(result)); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(this); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(this); } }; // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource"); // else impl.state.barrier.inc(null, null); forResource(impl, id, impl.parent, ip); } @Override final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure procedure) { assert(id != null); assert(procedure != null); // impl.state.barrier.inc(); forBuiltin(impl, id, impl.parent, new InternalProcedure() { @Override public void execute(ReadGraphImpl graph, Integer result) { try { procedure.execute(graph, querySupport.getResource(result)); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } @Override final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); DirectPredicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() { boolean found = false; @Override public void execute(ReadGraphImpl graph, int object) { found = true; } @Override public void finished(ReadGraphImpl graph) { try { procedure.execute(graph, found); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } @Override final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); AsyncMultiProcedure ip = new AsyncMultiProcedureAdapter() { boolean found = false; @Override synchronized public void execute(AsyncReadGraph graph, Resource resource) { found = true; } @Override synchronized public void finished(AsyncReadGraph graph) { try { procedure.execute(graph, found); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(this); } @Override public void exception(AsyncReadGraph graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(this); } }; // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate); // else impl.state.barrier.inc(null, null); forEachObject(impl, subject, predicate, ip); } @Override final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure procedure) { assert(subject != null); assert(predicate != null); assert(procedure != null); // impl.state.barrier.inc(); forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter() { boolean found = false; @Override synchronized public void execute(AsyncReadGraph graph, Resource resource) { if(resource.equals(object)) found = true; } @Override synchronized public void finished(AsyncReadGraph graph) { try { procedure.execute(graph, found); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public void exception(AsyncReadGraph graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } @Override final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); ValueQuery.queryEach(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure() { @Override public void execute(ReadGraphImpl graph, byte[] object) { boolean result = object != null; try { procedure.execute(graph, result); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } @Override final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { assert(subject != null); assert(procedure != null); final ListenerBase listener = getListenerBase(procedure); // impl.state.barrier.inc(); OrderedSet.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() { @Override public void exception(ReadGraphImpl graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public void execute(ReadGraphImpl graph, int i) { try { procedure.execute(graph, querySupport.getResource(i)); } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(ReadGraphImpl graph) { try { procedure.finished(graph); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } @Override final public void query(final ReadGraphImpl impl, final AsyncRead request, final CacheEntry parent, final AsyncProcedure procedure, ListenerBase listener) { assert(request != null); assert(procedure != null); // if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(request, "#" + request.toString() + ".1999"); // else impl.state.barrier.inc(null, null); runAsyncRead(impl, request, parent, listener, procedure); } @Override final public T tryQuery(final ReadGraphImpl graph, final Read request) throws DatabaseException { assert(graph != null); assert(request != null); final ReadEntry entry = readMap.get(request); if(entry != null && entry.isReady()) { return (T)entry.get(graph, this, null); } else { return request.perform(graph); } } final public T tryQuery(final ReadGraphImpl graph, final ExternalRead request) throws DatabaseException { assert(graph != null); assert(request != null); final ExternalReadEntry entry = externalReadMap.get(request); if(entry != null && entry.isReady()) { if(entry.isExcepted()) { Throwable t = (Throwable)entry.getResult(); if(t instanceof DatabaseException) throw (DatabaseException)t; else throw new DatabaseException(t); } else { return (T)entry.getResult(); } } else { final DataContainer result = new DataContainer(); final DataContainer exception = new DataContainer(); request.register(graph, new Listener() { @Override public void exception(Throwable t) { exception.set(t); } @Override public void execute(T t) { result.set(t); } @Override public boolean isDisposed() { return true; } }); Throwable t = exception.get(); if(t != null) { if(t instanceof DatabaseException) throw (DatabaseException)t; else throw new DatabaseException(t); } return result.get(); } } @Override final public void tryQuery(final ReadGraphImpl graph, final AsyncRead request, AsyncProcedure procedure) { assert(graph != null); assert(request != null); final AsyncReadEntry entry = asyncReadMap.get(request); if(entry != null && entry.isReady()) { if(entry.isExcepted()) { procedure.exception(graph, (Throwable)entry.getResult()); } else { procedure.execute(graph, (T)entry.getResult()); } } else { request.perform(graph, procedure); } } @Override final public void query(final ReadGraphImpl impl, final MultiRead request, final CacheEntry parent, final AsyncMultiProcedure procedure, ListenerBase listener) { assert(request != null); assert(procedure != null); // impl.state.barrier.inc(null, null); queryMultiRead(impl, request, parent, listener, procedure); } @Override final public void query(final ReadGraphImpl impl, final AsyncMultiRead request, final CacheEntry parent, final AsyncMultiProcedure procedure, ListenerBase listener) { assert(request != null); assert(procedure != null); // impl.state.barrier.inc(); runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure() { public void execute(AsyncReadGraph graph, T result) { try { procedure.execute(graph, result); } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public void finished(AsyncReadGraph graph) { try { procedure.finished(graph); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } @Override public String toString() { return procedure.toString(); } @Override public void exception(AsyncReadGraph graph, Throwable t) { try { procedure.exception(graph, t); } catch (Throwable t2) { Logger.defaultLogError(t2); } // impl.state.barrier.dec(); } }); } @Override final public void query(final ReadGraphImpl impl, final ExternalRead request, final CacheEntry parent, final Procedure procedure, ListenerBase listener) { assert(request != null); assert(procedure != null); queryPrimitiveRead(impl, request, parent, listener, new Procedure() { @Override public void execute(T result) { try { procedure.execute(result); } catch (Throwable t2) { Logger.defaultLogError(t2); } } @Override public String toString() { return procedure.toString(); } @Override public void exception(Throwable t) { try { procedure.exception(t); } catch (Throwable t2) { Logger.defaultLogError(t2); } } }); } @Override public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) { return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object)); } @Override public VirtualGraph getProvider(Resource subject, Resource predicate) { return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate)); } @Override public VirtualGraph getValueProvider(Resource subject) { return querySupport.getValueProvider(querySupport.getId(subject)); } public boolean resumeTasks(ReadGraphImpl graph) { return querySupport.resume(graph); } public boolean isImmutable(int resourceId) { return querySupport.isImmutable(resourceId); } public boolean isImmutable(Resource resource) { ResourceImpl impl = (ResourceImpl)resource; return isImmutable(impl.id); } private Layer0 L0; public Layer0 getL0(ReadGraph graph) { if(L0 == null) { L0 = Layer0.getInstance(graph); } return L0; } }