X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=55855ee3e823bed936c00c558848ceaf05a04915;hp=d5d307534930b28df0bd2d18057309866ec87da0;hb=cb12888f3415720276da57f2d162e13ea5b3f46c;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index d5d307534..55855ee3e 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -1,5412 +1,4497 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.db.impl.query; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.simantics.databoard.Bindings; -import org.simantics.db.AsyncReadGraph; -import org.simantics.db.DevelopmentKeys; -import org.simantics.db.DirectStatements; -import org.simantics.db.ReadGraph; -import org.simantics.db.RelationInfo; -import org.simantics.db.Resource; -import org.simantics.db.Session; -import org.simantics.db.Statement; -import org.simantics.db.VirtualGraph; -import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; -import org.simantics.db.common.utils.Logger; -import org.simantics.db.debug.ListenerReport; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.exception.ManyObjectsForFunctionalRelationException; -import org.simantics.db.exception.NoInverseException; -import org.simantics.db.exception.ResourceNotFoundException; -import org.simantics.db.impl.DebugPolicy; -import org.simantics.db.impl.ResourceImpl; -import org.simantics.db.impl.graph.MultiIntProcedure; -import org.simantics.db.impl.graph.ReadGraphImpl; -import org.simantics.db.impl.graph.ReadGraphSupport; -import org.simantics.db.impl.graph.WriteGraphImpl; -import org.simantics.db.impl.procedure.IntProcedureAdapter; -import org.simantics.db.impl.procedure.InternalProcedure; -import org.simantics.db.impl.procedure.TripleIntProcedureAdapter; -import org.simantics.db.impl.support.ResourceSupport; -import org.simantics.db.procedure.AsyncMultiListener; -import org.simantics.db.procedure.AsyncMultiProcedure; -import org.simantics.db.procedure.AsyncProcedure; -import org.simantics.db.procedure.AsyncSetListener; -import org.simantics.db.procedure.Listener; -import org.simantics.db.procedure.ListenerBase; -import org.simantics.db.procedure.MultiProcedure; -import org.simantics.db.procedure.Procedure; -import org.simantics.db.procedure.StatementProcedure; -import org.simantics.db.request.AsyncMultiRead; -import org.simantics.db.request.AsyncRead; -import org.simantics.db.request.ExternalRead; -import org.simantics.db.request.MultiRead; -import org.simantics.db.request.Read; -import org.simantics.db.request.RequestFlags; -import org.simantics.db.request.WriteTraits; -import org.simantics.layer0.Layer0; -import org.simantics.utils.DataContainer; -import org.simantics.utils.Development; -import org.simantics.utils.datastructures.Pair; -import org.simantics.utils.datastructures.collections.CollectionUtils; -import org.simantics.utils.datastructures.disposable.AbstractDisposable; - -import gnu.trove.map.hash.THashMap; -import gnu.trove.procedure.TIntProcedure; -import gnu.trove.procedure.TLongProcedure; -import gnu.trove.procedure.TObjectProcedure; -import gnu.trove.set.hash.THashSet; -import gnu.trove.set.hash.TIntHashSet; - -@SuppressWarnings({"rawtypes", "unchecked"}) -final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport { - - final public UnaryQueryHashMap directPredicatesMap; - final public UnaryQueryHashMap principalTypesMap; - final public THashMap uriToResourceMap; - final public THashMap namespaceIndexMap22; - final public UnaryQueryHashMap projectsMap; - final public UnaryQueryHashMap> relationInfoMap; - final public UnaryQueryHashMap> superTypesMap; - final public UnaryQueryHashMap> typeHierarchyMap; - final public UnaryQueryHashMap> superRelationsMap; - final public UnaryQueryHashMap> typesMap; - final public UnaryQueryHashMap> valueMap; - final public DoubleKeyQueryHashMap directObjectsMap; - final public DoubleKeyQueryHashMap objectsMap; - final public UnaryQueryHashMap orderedSetMap; - final public UnaryQueryHashMap predicatesMap; - final public DoubleKeyQueryHashMap statementsMap; - final public UnaryQueryHashMap assertedPredicatesMap; - final public BinaryQueryHashMap assertedStatementsMap; - final public StableHashMap externalReadMap; - final public StableHashMap asyncReadMap; - final public StableHashMap readMap; - final public StableHashMap asyncMultiReadMap; - final public StableHashMap multiReadMap; - - final private THashMap> listeners; - - public static int indent = 0; - - public int size = 0; - - - // Garbage collection - - public int boundQueries = 0; - - // Statistics - private int hits = 0; - - private int misses = 0; - - private int updates = 0; - - final private int functionalRelation; - - final private int superrelationOf; - - final private int instanceOf; - - final private int inverseOf; - - final private int asserts; - - final private int hasPredicate; - - final private int hasPredicateInverse; - - final private int hasObject; - - final private int inherits; - - final private int subrelationOf; - - final private int rootLibrary; - - /** - * A cache for the root library resource. Initialized in - * {@link #getRootLibraryResource()}. - */ - private volatile ResourceImpl rootLibraryResource; - - final private int library; - - final private int consistsOf; - - final private int hasName; - - AtomicInteger sleepers = new AtomicInteger(0); - - private boolean updating = false; - - static public boolean collecting = false; - - private boolean firingListeners = false; - - final public QuerySupport querySupport; - final public Session session; - final public ResourceSupport resourceSupport; - - private THashSet scheduledListeners = new THashSet(); - - QueryThread[] executors; - - public ArrayList[] queues; - - enum ThreadState { - - INIT, RUN, SLEEP, DISPOSED - - } - - public ThreadState[] threadStates; - public ReentrantLock[] threadLocks; - public Condition[] threadConditions; - - public ArrayList[] ownTasks; - - public ArrayList[] ownSyncTasks; - - ArrayList[] delayQueues; - - public boolean synch = true; - - final Object querySupportLock; - - public Long modificationCounter = 0L; - - public void close() { - } - - final public void scheduleOwn(int caller, SessionTask request) { - ownTasks[caller].add(request); - } - - final public void scheduleAlways(int caller, SessionTask request) { - - int performer = request.thread; - if(caller == performer) { - ownTasks[caller].add(request); - } else { - schedule(caller, request); - } - - } - - final public void schedule(int caller, SessionTask request) { - - int performer = request.thread; - - if(DebugPolicy.SCHEDULE) - System.out.println("schedule " + request + " " + caller + " -> " + performer); - - assert(performer >= 0); - - assert(request != null); - - if(caller == performer) { - request.run(caller); - } else { - ReentrantLock queueLock = threadLocks[performer]; - queueLock.lock(); - queues[performer].add(request); - // This thread could have been sleeping - if(queues[performer].size() == 1) { - if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet(); - threadConditions[performer].signalAll(); - } - queueLock.unlock(); - } - - } - - - final int THREADS; - - final public int THREAD_MASK; - final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group"); - - public static abstract class SessionTask { - - final public int thread; - final public int syncCaller; - final public Object object; - - public SessionTask(WriteTraits object, int thread) { - this.thread = thread; - this.syncCaller = -1; - this.object = object; - } - - public SessionTask(Object object, int thread, int syncCaller) { - this.thread = thread; - this.syncCaller = syncCaller; - this.object = object; - } - - public abstract void run(int thread); - - @Override - public String toString() { - return "SessionTask[" + object + "]"; - } - - } - - public static abstract class SessionRead extends SessionTask { - - final public Semaphore notify; - final public DataContainer throwable; - - public SessionRead(Object object, DataContainer throwable, Semaphore notify, int thread) { - super(object, thread, thread); - this.throwable = throwable; - this.notify = notify; - } - - public SessionRead(Object object, DataContainer throwable, Semaphore notify, int thread, int syncThread) { - super(object, thread, syncThread); - this.throwable = throwable; - this.notify = notify; - } - - } - - long waitingTime = 0; - - static int koss = 0; - static int koss2 = 0; - - public boolean resume(ReadGraphImpl graph) { - return executors[0].runSynchronized(); - } - - public QueryProcessor(final int threads, QuerySupport core, Set threadSet) - throws DatabaseException { - - THREADS = threads; - THREAD_MASK = threads - 1; - - querySupport = core; - session = querySupport.getSession(); - resourceSupport = querySupport.getSupport(); - querySupportLock = core.getLock(); - - executors = new QueryThread[THREADS]; - queues = new ArrayList[THREADS]; - threadLocks = new ReentrantLock[THREADS]; - threadConditions = new Condition[THREADS]; - threadStates = new ThreadState[THREADS]; - ownTasks = new ArrayList[THREADS]; - ownSyncTasks = new ArrayList[THREADS]; - delayQueues = new ArrayList[THREADS * THREADS]; - - // freeSchedule = new AtomicInteger(0); - - for (int i = 0; i < THREADS * THREADS; i++) { - delayQueues[i] = new ArrayList(); - } - - for (int i = 0; i < THREADS; i++) { - - // tasks[i] = new ArrayList(); - ownTasks[i] = new ArrayList(); - ownSyncTasks[i] = new ArrayList(); - queues[i] = new ArrayList(); - threadLocks[i] = new ReentrantLock(); - threadConditions[i] = threadLocks[i].newCondition(); - // limits[i] = false; - threadStates[i] = ThreadState.INIT; - - } - - for (int i = 0; i < THREADS; i++) { - - final int index = i; - - executors[i] = new QueryThread(session, this, index, "Query Thread " + index); - - threadSet.add(executors[i]); - - } - - directPredicatesMap = new UnaryQueryHashMap(); - valueMap = new UnaryQueryHashMap(); - principalTypesMap = new UnaryQueryHashMap(); - uriToResourceMap = new THashMap(); - namespaceIndexMap22 = new THashMap(); - projectsMap = new UnaryQueryHashMap(); - relationInfoMap = new UnaryQueryHashMap(); - typeHierarchyMap = new UnaryQueryHashMap(); - superTypesMap = new UnaryQueryHashMap(); - superRelationsMap = new UnaryQueryHashMap(); - typesMap = new UnaryQueryHashMap(); - objectsMap = new DoubleKeyQueryHashMap(); - orderedSetMap = new UnaryQueryHashMap(); - predicatesMap = new UnaryQueryHashMap(); - statementsMap = new DoubleKeyQueryHashMap(); - directObjectsMap = new DoubleKeyQueryHashMap(); - assertedPredicatesMap = new UnaryQueryHashMap(); - assertedStatementsMap = new BinaryQueryHashMap(); - asyncReadMap = new StableHashMap(); - readMap = new StableHashMap(); - asyncMultiReadMap = new StableHashMap(); - multiReadMap = new StableHashMap(); - externalReadMap = new StableHashMap(); - listeners = new THashMap>(10, 0.75f); - - // Now start threads - for (int i = 0; i < THREADS; i++) { - executors[i].start(); - } - - // Make sure that query threads are up and running - while(sleepers.get() != THREADS) { - try { - Thread.sleep(5); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - rootLibrary = core.getBuiltin("http:/"); - boolean builtinsInstalled = rootLibrary != 0; - - if (builtinsInstalled) { - functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation); - assert (functionalRelation != 0); - } else - functionalRelation = 0; - - if (builtinsInstalled) { - instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf); - assert (instanceOf != 0); - } else - instanceOf = 0; - - if (builtinsInstalled) { - inverseOf = core.getBuiltin(Layer0.URIs.InverseOf); - assert (inverseOf != 0); - } else - inverseOf = 0; - - - if (builtinsInstalled) { - inherits = core.getBuiltin(Layer0.URIs.Inherits); - assert (inherits != 0); - } else - inherits = 0; - - if (builtinsInstalled) { - asserts = core.getBuiltin(Layer0.URIs.Asserts); - assert (asserts != 0); - } else - asserts = 0; - - if (builtinsInstalled) { - hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate); - assert (hasPredicate != 0); - } else - hasPredicate = 0; - - if (builtinsInstalled) { - hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse); - assert (hasPredicateInverse != 0); - } else - hasPredicateInverse = 0; - - if (builtinsInstalled) { - hasObject = core.getBuiltin(Layer0.URIs.HasObject); - assert (hasObject != 0); - } else - hasObject = 0; - - if (builtinsInstalled) { - subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf); - assert (subrelationOf != 0); - } else - subrelationOf = 0; - - if (builtinsInstalled) { - superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf); - assert (superrelationOf != 0); - } else - superrelationOf = 0; - - if (builtinsInstalled) { - library = core.getBuiltin(Layer0.URIs.Library); - assert (library != 0); - } else - library = 0; - - if (builtinsInstalled) { - consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf); - assert (consistsOf != 0); - } else - consistsOf = 0; - - if (builtinsInstalled) { - hasName = core.getBuiltin(Layer0.URIs.HasName); - assert (hasName != 0); - } else - hasName = 0; - - } - - final public void releaseWrite(ReadGraphImpl graph) { - performDirtyUpdates(graph); - modificationCounter++; - } - - final public int getId(final Resource r) { - return querySupport.getId(r); - } - - public QuerySupport getCore() { - return querySupport; - } - - public int getFunctionalRelation() { - return functionalRelation; - } - - public int getInherits() { - return inherits; - } - - public int getInstanceOf() { - return instanceOf; - } - - public int getInverseOf() { - return inverseOf; - } - - public int getSubrelationOf() { - return subrelationOf; - } - - public int getSuperrelationOf() { - return superrelationOf; - } - - public int getAsserts() { - return asserts; - } - - public int getHasPredicate() { - return hasPredicate; - } - - public int getHasPredicateInverse() { - return hasPredicateInverse; - } - - public int getHasObject() { - return hasObject; - } - - public int getRootLibrary() { - return rootLibrary; - } - - public Resource getRootLibraryResource() { - if (rootLibraryResource == null) { - // Synchronization is not needed here, it doesn't matter if multiple - // threads simultaneously set rootLibraryResource once. - int root = getRootLibrary(); - if (root == 0) - throw new UnsupportedOperationException("database is not initialized, cannot get root library resource"); - this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root); - } - return rootLibraryResource; - } - - public int getLibrary() { - return library; - } - - public int getConsistsOf() { - return consistsOf; - } - - public int getHasName() { - return hasName; - } - - public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure procedure) { - - URIToResource.queryEach(graph, id, parent, null, new InternalProcedure() { - - @Override - public void execute(ReadGraphImpl graph, Integer result) { - - if (result != null && result != 0) { - procedure.execute(graph, result); - return; - } - - // Fall back to using the fixed builtins. - result = querySupport.getBuiltin(id); - if (result != 0) { - procedure.execute(graph, result); - return; - } - - try { - result = querySupport.getRandomAccessReference(id); - } catch (ResourceNotFoundException e) { - procedure.exception(graph, e); - return; - } - - if (result != 0) { - procedure.execute(graph, result); - } else { - procedure.exception(graph, new ResourceNotFoundException(id)); - } - - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - procedure.exception(graph, t); - } - - }); - - } - - public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure procedure) { - - Integer result = querySupport.getBuiltin(id); - if (result != 0) { - procedure.execute(graph, result); - } else { - procedure.exception(graph, new ResourceNotFoundException(id)); - } - - } - - public final void runAsyncRead(final ReadGraphImpl graph, final AsyncRead query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure procedure) { - - int hash = requestHash(query); - - AsyncReadEntry entry = asyncReadMap.get(query, hash); - - if(parent == null && listener == null) { - if(entry != null && (entry.isReady() || entry.isExcepted())) { - System.out.println("ready " + query); - entry.performFromCache(graph, this, procedure); -// graph.state.barrier.dec(query); - return; - } else { - query.perform(graph, procedure); -// graph.state.barrier.dec(query); - return; - } - } - - if(entry == null) { - - entry = new AsyncReadEntry(query); - entry.setPending(); - entry.clearResult(querySupport); - asyncReadMap.put(query, entry, hash); - - performForEach(graph, query, entry, parent, listener, procedure, false); - - } else { - - if(entry.isPending()) { - synchronized(entry) { - if(entry.isPending()) { - throw new IllegalStateException(); - // final AsyncBarrierImpl parentBarrier = graph.state.barrier; - // if(entry.procs == null) entry.procs = new ArrayList>(); - // entry.procs.add(new AsyncProcedure() { - // - // @Override - // public void execute(AsyncReadGraph graph, T result) { - // procedure.execute(graph, result); - // parentBarrier.dec(query); - // } - // - // @Override - // public void exception(AsyncReadGraph graph, Throwable throwable) { - // procedure.exception(graph, throwable); - // parentBarrier.dec(query); - // } - // - // }); -// if(graph.parent != null || listener != null) { -// registerDependencies(graph, entry, parent, listener, procedure, false); -// } -// -// query.perform(graph, procedure); -// -// return; - - } - } - } - - if(entry.isReady()) { - entry.performFromCache(graph, this, procedure); - registerDependencies(graph, entry, parent, listener, procedure, false); - } else { - performForEach(graph, query, entry, parent, listener, procedure, false); - } - - } - - } - - - final static void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure procedure) { - - MultiReadEntry entry = cached != null ? cached : provider.multiReadMap.get(query); - if(entry == null) { - - entry = new MultiReadEntry(query); - entry.setPending(); - entry.clearResult(provider.querySupport); - - provider.multiReadMap.put(query, entry); - - provider.performForEach(graph, query, entry, parent, listener, procedure, false); - - } else { - - if(entry.isPending()) { - - synchronized(entry) { - - if(entry.isPending()) { - throw new IllegalStateException(); - -// if(entry.procs == null) entry.procs = new ArrayList, AsyncBarrier>>(); -// entry.procs.add(new Pair(procedure, parentBarrier)); -// if(graph.parent != null || listener != null) { -// provider.registerDependencies(graph, entry, parent, listener, procedure, false); -// } - - // If this was synchronized we must wait here until completion - // if(graph.state.synchronizedExecution) { - // while(entry.isPending()) { - // graph.resumeTasks(graph.callerThread, null, null); - // } - // } -// -// return; - - } - } - - entry.performFromCache(graph, provider, procedure); -// graph.state.barrier.dec(query); - return; - - } else { - - provider.performForEach(graph, query, entry, parent, listener, procedure, false); - - } - - } - - } - - public final void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure procedure) { - - int hash = requestHash(query); - - AsyncMultiReadEntry entry = asyncMultiReadMap.get(query, hash); - - if(parent == null && listener == null) { - if(entry != null && (entry.isReady() || entry.isExcepted())) { - System.out.println("ready " + query); - entry.performFromCache(graph, this, procedure); - return; - } else { - query.perform(graph, procedure); - return; - } - } - - if(entry == null) { - - entry = new AsyncMultiReadEntry(query); - entry.setPending(); - entry.clearResult(querySupport); - - asyncMultiReadMap.put(query, entry, hash); - - performForEach(graph, query, entry, parent, listener, procedure, false); - - } else { - - if(entry.isPending()) { - - synchronized(entry) { - if(entry.isPending()) { - throw new IllegalStateException(); -// if(entry.procs == null) entry.procs = new ArrayList>(); -// entry.procs.add(procedure); -// if(graph.parent != null || listener != null) { -// registerDependencies(graph, entry, parent, listener, procedure, false); -// } -// return; - } - } - } - - performForEach(graph, query, entry, parent, listener, procedure, false); - - } - - } - - final static void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final Procedure procedure) { - - final ExternalReadEntry entry = cached != null ? cached : provider.externalReadMap.get(query); - if(entry == null) { - provider.performForEach(graph, query, new ExternalReadEntry(query), parent, listener, procedure, false); - } else { - if(entry.isPending()) { - synchronized(entry) { - if(entry.isPending()) { - throw new IllegalStateException(); -// if(entry.procs == null) entry.procs = new ArrayList>(); -// entry.procs.add(procedure); -// return; - } - } - } - provider.performForEach(graph, query, entry, parent, listener, procedure, false); - } - - } - - public int requestHash(Object object) { - try { - return object.hashCode(); - } catch (Throwable t) { - Logger.defaultLogError(t); - return 0; - } - } - - @Override - public T queryRead(final ReadGraphImpl graph, final Read query, final CacheEntry parent, final AsyncProcedure procedure, final ListenerBase listener) throws Throwable { - - assert(query != null); - - ReadEntry entry = readMap.get(query); - - if(entry != null) { - if(parent == null && (listener == null || listener.isDisposed()) && entry.isReady()) { - return (T)entry.get(graph, this, procedure); - } else if (entry.isPending()) { - throw new IllegalStateException(); - } - } - - if(entry == null) { - - entry = new ReadEntry(query); - entry.setPending(); - entry.clearResult(querySupport); - - readMap.put(query, entry); - - return (T)performForEach(graph, query, entry, parent, listener, procedure, false); - - } else { - - if(entry.isPending()) { - throw new IllegalStateException(); - } else { - return (T)performForEach(graph, query, entry, parent, listener, procedure, false); - } - - } - - } - - public void queryMultiRead(final ReadGraphImpl graph, final MultiRead query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure procedure) { - - assert(query != null); - assert(procedure != null); - - final MultiReadEntry entry = multiReadMap.get(query); - - if(parent == null && !(listener != null)) { - if(entry != null && entry.isReady()) { - entry.performFromCache(graph, this, procedure); - return; - } - } - - runMultiRead(graph, entry, query, parent, this, listener, procedure); - - } - - public void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead query, final CacheEntry parent, final ListenerBase listener, final Procedure procedure) { - - assert(query != null); - assert(procedure != null); - - final ExternalReadEntry entry = externalReadMap.get(query); - - if(parent == null && !(listener != null)) { - if(entry != null && entry.isReady()) { - entry.performFromCache(procedure); - return; - } - } - - runPrimitiveRead(graph, entry, query, parent, this, listener, procedure); - - } - - public void performForEach(ReadGraphImpl parentGraph, final AsyncRead query, final AsyncReadEntry entry, final CacheEntry parent, final ListenerBase base, final AsyncProcedure procedure, - boolean inferredDependency) { - - if (DebugPolicy.PERFORM) - System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); - - assert (!dirty); - assert (!collecting); - - assert(!entry.isDiscarded()); - - final ListenerEntry listenerEntry = registerDependencies(parentGraph, entry, parent, base, procedure, inferredDependency); - - // FRESH, REFUTED, EXCEPTED go here - if (!entry.isReady()) { - - entry.setPending(); - - size++; - - try { - - final ReadGraphImpl finalParentGraph = parentGraph; - - query.perform(parentGraph.withParent(entry), new AsyncProcedure() { - - @Override - public void execute(AsyncReadGraph returnGraph, T result) { - ReadGraphImpl impl = (ReadGraphImpl)returnGraph; - //AsyncReadGraph resumeGraph = finalParentGraph.newAsync(); - entry.addOrSet(finalParentGraph, result); - if(listenerEntry != null) { - primeListenerEntry(listenerEntry, result); - } - try { - procedure.execute(finalParentGraph, result); - } catch (Throwable t) { - t.printStackTrace(); - } -// parentBarrier.dec(query); - } - - @Override - public void exception(AsyncReadGraph returnGraph, Throwable t) { - ReadGraphImpl impl = (ReadGraphImpl)returnGraph; -// AsyncReadGraph resumeGraph = finalParentGraph.newAsync(); - entry.except(finalParentGraph, t); - try { - procedure.exception(finalParentGraph, t); - } catch (Throwable t2) { - t2.printStackTrace(); - } -// parentBarrier.dec(query); - } - - @Override - public String toString() { - return procedure.toString(); - } - - }); - - } catch (Throwable t) { - - entry.except(t); - try { - procedure.exception(parentGraph, t); - } catch (Throwable t2) { - t2.printStackTrace(); - } -// parentBarrier.dec(query); - - } - - misses++; - - } else { - - entry.performFromCache(parentGraph, this, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - procedure.exception(graph, throwable); - } - - @Override - public void execute(AsyncReadGraph graph, T result) { - procedure.execute(graph, result); - if(listenerEntry != null) { - primeListenerEntry(listenerEntry, result); - } - } - - }); - -// parentBarrier.dec(query); - - hits++; - - } - - assert (!entry.isDiscarded()); - - } - - public T performForEach(final ReadGraphImpl graph, final Read query, final ReadEntry entry, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure procedure, - boolean inferredDependency) throws Throwable { - - if (DebugPolicy.PERFORM) - System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); - - assert (!dirty); - assert (!collecting); - - entry.assertNotDiscarded(); - - if(entry.isReady()) { - - // EXCEPTED goes here - -// if(procedure != null) entry.performFromCache(graph, this, procedure); -// parentBarrier.dec(query); - hits++; - - ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency); - - T result = (T)entry.get(graph, this, procedure); - - if(listenerEntry != null) primeListenerEntry(listenerEntry, result); - - return result; - - } else { - - // FRESH, REFUTED, PENDING go here - - entry.setPending(); - - size++; - misses++; - - ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency); - - final ReadGraphImpl performGraph = graph.newSync(entry); - - try { - - if(Development.DEVELOPMENT) - Development.recordHistogram("run " + query); - - T result = query.perform(performGraph); - entry.addOrSet(performGraph, result); - - if(listenerEntry != null) primeListenerEntry(listenerEntry, result); - - return (T)entry.get(graph, this, procedure); - - } catch (Throwable t) { - - entry.except(t); - return (T)entry.get(graph, this, procedure); - - } - - } - - } - - public void performForEach(final ReadGraphImpl graph, final MultiRead query, final MultiReadEntry entry, CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure procedure, - boolean inferredDependency) { - - if (DebugPolicy.PERFORM) - System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); - - assert (!dirty); - assert (!collecting); - - assert(!entry.isPending()); - assert(!entry.isDiscarded()); - - // FRESH, REFUTED, EXCEPTED go here - if (!entry.isReady()) { - - entry.setPending(); - entry.clearResult(querySupport); - - multiReadMap.put(query, entry); - size++; - - final ReadGraphImpl newGraph = graph.newSync(entry); -// newGraph.state.barrier.inc(); - - try { - - query.perform(newGraph, new AsyncMultiProcedure() { - - @Override - public void execute(AsyncReadGraph graph, T result) { - entry.addOrSet(result); - try { - procedure.execute(graph, result); - } catch (Throwable t) { - t.printStackTrace(); - } - } - - @Override - public void finished(AsyncReadGraph graph) { - entry.finish(graph); - try { - procedure.finished(graph); - } catch (Throwable t) { - t.printStackTrace(); - } -// newGraph.state.barrier.dec(); -// parentBarrier.dec(); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - entry.except(t); - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - t2.printStackTrace(); - } -// newGraph.state.barrier.dec(); -// parentBarrier.dec(); - } - - }); - - } catch (DatabaseException e) { - - entry.except(e); - try { - procedure.exception(graph, e); - } catch (Throwable t2) { - t2.printStackTrace(); - } -// newGraph.state.barrier.dec(); -// parentBarrier.dec(); - - } catch (Throwable t) { - - DatabaseException e = new DatabaseException(t); - - entry.except(e); - try { - procedure.exception(graph, e); - } catch (Throwable t2) { - t2.printStackTrace(); - } -// newGraph.state.barrier.dec(); -// parentBarrier.dec(); - - } - - misses++; - - } else { - - entry.performFromCache(graph, this, procedure); - hits++; - - - } - - assert (!entry.isDiscarded()); - - registerDependencies(graph, entry, parent, listener, procedure, inferredDependency); - - } - - - public void performForEach(final ReadGraphImpl callerGraph, AsyncMultiRead query, final AsyncMultiReadEntry entry, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure procedure, - boolean inferredDependency) { - - if (DebugPolicy.PERFORM) - System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); - - assert (!dirty); - assert (!collecting); - - try { - - assert(!entry.isDiscarded()); - - // FRESH, REFUTED, EXCEPTED go here - if (!entry.isReady()) { - - size++; - - try { - - ReadGraphImpl performGraph = callerGraph.withAsyncParent(entry); - - query.perform(performGraph, new AsyncMultiProcedure() { - - @Override - public void execute(AsyncReadGraph graph, T result) { - ReadGraphImpl impl = (ReadGraphImpl)graph; -// ReadGraphImpl executeGraph = callerGraph.newAsync(); - entry.addOrSet(result); - try { - procedure.execute(callerGraph, result); - } catch (Throwable t) { - t.printStackTrace(); - } - } - - @Override - public void finished(AsyncReadGraph graph) { - ReadGraphImpl impl = (ReadGraphImpl)graph; -// ReadGraphImpl executeGraph = callerGraph.newAsync(); - entry.finish(callerGraph); - try { - procedure.finished(callerGraph); - } catch (Throwable t) { - t.printStackTrace(); - } - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - ReadGraphImpl impl = (ReadGraphImpl)graph; -// ReadGraphImpl executeGraph = callerGraph.newAsync(); - entry.except(callerGraph, t); - try { - procedure.exception(callerGraph, t); - } catch (Throwable t2) { - t2.printStackTrace(); - } - } - - }); - - } catch (Throwable t) { - - entry.except(t); - try { - procedure.exception(callerGraph, t); - } catch (Throwable t2) { - t2.printStackTrace(); - } - - } - - - misses++; - - } else { - - entry.performFromCache(callerGraph, this, procedure); - - hits++; - - } - - assert (!entry.isDiscarded()); - - registerDependencies(callerGraph, entry, parent, listener, procedure, inferredDependency); - - } catch (Throwable t) { - - Logger.defaultLogError(t); - - } finally { - - } - - } - - public void performForEach(ReadGraphImpl graph, final ExternalRead query, final ExternalReadEntry entry, final CacheEntry parent, final ListenerBase base, final Procedure procedure, - boolean inferredDependency) { - - if (DebugPolicy.PERFORM) - System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); - - assert (!dirty); - assert (!collecting); - - assert(!entry.isPending()); - assert(!entry.isDiscarded()); - - registerDependencies(graph, entry, parent, base, procedure, inferredDependency); - - // FRESH, REFUTED, EXCEPTED go here - if (!entry.isReady()) { - - entry.setPending(); - entry.clearResult(querySupport); - - externalReadMap.put(query, entry); - size++; - - try { - - query.register(graph, new Listener() { - - AtomicBoolean used = new AtomicBoolean(false); - - @Override - public void execute(T result) { - - // Just for safety - if(entry.isDiscarded()) return; - if(entry.isExcepted()) entry.setPending(); - - if(used.compareAndSet(false, true)) { - entry.addOrSet(QueryProcessor.this, result); - procedure.execute(result); - } else { - entry.queue(result); - updatePrimitive(query); - } - - } - - @Override - public void exception(Throwable t) { - - entry.except(t); - - if(used.compareAndSet(false, true)) { - procedure.exception(t); - } else { -// entry.queue(result); - updatePrimitive(query); - } - - } - - @Override - public String toString() { - return procedure.toString(); - } - - @Override - public boolean isDisposed() { - return entry.isDiscarded() || !isBound(entry); - } - - }); - - } catch (Throwable t) { - - entry.except(t); - procedure.exception(t); - - } - - misses++; - - } else { - - entry.performFromCache(procedure); - - hits++; - - } - - assert (!entry.isDiscarded()); - - } - - private boolean isBound(ExternalReadEntry entry) { - if(entry.hasParents()) return true; - else if(hasListener(entry)) return true; - else return false; - } - - synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { - - if (parent != null && !inferred) { - try { - if(!child.isImmutable(graph)) - child.addParent(parent); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent); - } - - if (listener != null) { - return registerListener(child, listener, procedure); - } else { - return null; - } - - } - - public void performForEach(ReadGraphImpl graph, BinaryQuery query, CacheEntry parent, ListenerBase listener, Procedure procedure) { - - if (DebugPolicy.PERFORM) - System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); - - assert (!dirty); - assert (!collecting); - - try { - - registerDependencies(graph, query, parent, listener, procedure, false); - - // FRESH, REFUTED, EXCEPTED go here - if (!query.isReady()) { - - boolean fresh = query.isFresh(); - - if(fresh) { - size++; - } - - query.computeForEach(graph, this, procedure, true); - - misses++; - - } else { - - query.performFromCache(graph, this, procedure); - - hits++; - - } - - } catch (Throwable t) { - - Logger.defaultLogError(t); - - } - } - - public Object performForEach(ReadGraphImpl graph, UnaryQuery query, CacheEntry parent, ListenerBase listener, Procedure procedure) { - - if (DebugPolicy.PERFORM) - System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); - - assert (!dirty); - assert (!collecting); - - try { - - assert(query.assertNotDiscarded()); - - registerDependencies(graph, query, parent, listener, procedure, false); - - // FRESH, REFUTED, EXCEPTED go here - if (!query.isReady()) { - - size++; - misses++; - - return query.computeForEach(graph, this, procedure, true); - - - } else { - - hits++; - - return query.performFromCache(graph, this, procedure); - - } - - } catch (Throwable t) { - - Logger.defaultLogError(t); - return null; - - } - - } - - static class Dummy implements InternalProcedure, IntProcedure { - - @Override - public void execute(ReadGraphImpl graph, int i) { - } - - @Override - public void finished(ReadGraphImpl graph) { - } - - @Override - public void execute(ReadGraphImpl graph, Object result) { - } - - @Override - public void exception(ReadGraphImpl graph, Throwable throwable) { - } - - } - - private static final Dummy dummy = new Dummy(); - - public Object performForEach2(ReadGraphImpl graph, UnaryQuery query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable { - - if (DebugPolicy.PERFORM) - System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); - - assert (!dirty); - assert (!collecting); - - assert(query.assertNotDiscarded()); - - registerDependencies(graph, query, parent, listener, procedure, false); - - // FRESH, REFUTED, EXCEPTED go here - if (!query.isReady()) { - - size++; - misses++; - - query.computeForEach(graph, this, (Procedure)dummy, true); - return query.get(graph, this, null); - - } else { - - hits++; - - return query.get(graph, this, procedure); - - } - - } - - public void performForEach(ReadGraphImpl graph, StringQuery query, CacheEntry parent, final ListenerBase listener, Procedure procedure) { - - if (DebugPolicy.PERFORM) - System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); - - assert (!dirty); - assert (!collecting); - - try { - - if(query.isDiscarded()) { - System.err.println("aff"); - } - assert(!query.isDiscarded()); - - // FRESH, REFUTED, EXCEPTED go here - if (!query.isReady()) { - - query.computeForEach(graph.withAsyncParent(query), this, procedure); - - size++; - misses++; - - } else { - - query.performFromCache(graph, this, procedure); - - hits++; - - } - - assert (!query.isDiscarded()); - - registerDependencies(graph, query, parent, listener, procedure, false); - - } catch (Throwable t) { - - t.printStackTrace(); - Logger.defaultLogError(t); - - } - - } - - interface QueryCollectorSupport { - public CacheCollectionResult allCaches(); - public Collection getRootList(); - public int getCurrentSize(); - public int calculateCurrentSize(); - public CacheEntryBase iterate(int level); - public void remove(); - public void setLevel(CacheEntryBase entry, int level); - public boolean start(boolean flush); - } - - interface QueryCollector { - - public void collect(int youngTarget, int allowedTimeInMs); - - } - - class QueryCollectorSupportImpl implements QueryCollectorSupport { - - private static final boolean DEBUG = false; - private static final double ITERATION_RATIO = 0.2; - - private CacheCollectionResult iteration = new CacheCollectionResult(); - private boolean fresh = true; - private boolean needDataInStart = true; - - QueryCollectorSupportImpl() { - iteration.restart(); - } - - public CacheCollectionResult allCaches() { - CacheCollectionResult result = new CacheCollectionResult(); - QueryProcessor.this.allCaches(result); - result.restart(); - return result; - } - - public boolean start(boolean flush) { - // We need new data from query maps - fresh = true; - if(needDataInStart || flush) { - // Last run ended after processing all queries => refresh data - restart(flush ? 0.0 : ITERATION_RATIO); - } else { - // continue with previous big data - } - // Notify caller about iteration situation - return iteration.isAtStart(); - } - - private void restart(double targetRatio) { - - needDataInStart = true; - - long start = System.nanoTime(); - if(fresh) { - - // We need new data from query maps - - int iterationSize = iteration.size()+1; - int diff = calculateCurrentSize()-iterationSize; - - double ratio = (double)diff / (double)iterationSize; - boolean dirty = Math.abs(ratio) >= targetRatio; - - if(dirty) { - iteration = allCaches(); - if(DEBUG) { - System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") "); - for(int i=0;i getRootList() { - - ArrayList result = new ArrayList(); - - for (Object e : valueMap.values()) { - result.add((CacheEntry) e); - } - for (Object e : directPredicatesMap.values()) { - result.add((CacheEntry) e); - } - for (Object e : objectsMap.values()) { - result.add((CacheEntry) e); - } - for (Object e : directObjectsMap.values()) { - result.add((CacheEntry) e); - } - for (Object e : principalTypesMap.values()) { - result.add((CacheEntry) e); - } - for (Object e : superRelationsMap.values()) { - result.add((CacheEntry) e); - } - for (Object e : superTypesMap.values()) { - result.add((CacheEntry) e); - } - for (Object e : typesMap.values()) { - result.add((CacheEntry) e); - } - for (Object e : objectsMap.values()) { - result.add((CacheEntry) e); - } - for (Object e : assertedStatementsMap.values()) { - result.add((CacheEntry) e); - } - for (Object e : readMap.values()) { - if(e instanceof CacheEntry) { - result.add((CacheEntry) e); - } else { - System.err.println("e=" + e); - } - } - for (Object e : asyncReadMap.values()) { - if(e instanceof CacheEntry) { - result.add((CacheEntry) e); - } else { - System.err.println("e=" + e); - } - } - for (Object e : externalReadMap.values()) { - result.add((CacheEntry) e); - } - for (Object e : orderedSetMap.values()) { - result.add((CacheEntry) e); - } - - return result; - - } - - @Override - public int calculateCurrentSize() { - - int realSize = 0; - - realSize += directPredicatesMap.size(); - realSize += principalTypesMap.size(); - realSize += uriToResourceMap.size(); - realSize += namespaceIndexMap22.size(); - realSize += projectsMap.size(); - - realSize += relationInfoMap.size(); - realSize += superTypesMap.size(); - realSize += typeHierarchyMap.size(); - realSize += superRelationsMap.size(); - realSize += typesMap.size(); - - realSize += valueMap.size(); - realSize += directObjectsMap.size(); - realSize += objectsMap.size(); - realSize += orderedSetMap.size(); - realSize += predicatesMap.size(); - - realSize += statementsMap.size(); - realSize += assertedPredicatesMap.size(); - realSize += assertedStatementsMap.size(); - realSize += externalReadMap.size(); - realSize += asyncReadMap.size(); - - realSize += readMap.size(); - realSize += asyncMultiReadMap.size(); - realSize += multiReadMap.size(); - - size = realSize; - - return realSize; - - } - - @Override - public int getCurrentSize() { - return size; - } - - } - // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600); - - private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl(); - private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport); - - public int querySize() { - return size; - } - - public void gc(int youngTarget, int allowedTimeInMs) { - - collector.collect(youngTarget, allowedTimeInMs); - - } - - public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { - - assert (entry != null); - - if (base.isDisposed()) - return null; - - return addListener(entry, base, procedure); - - } - - private void primeListenerEntry(final ListenerEntry entry, final Object result) { - entry.setLastKnown(result); - } - - private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { - - assert (entry != null); - assert (procedure != null); - - ArrayList list = listeners.get(entry); - if (list == null) { - list = new ArrayList(1); - listeners.put(entry, list); - } - - ListenerEntry result = new ListenerEntry(entry, base, procedure); - int currentIndex = list.indexOf(result); - // There was already a listener - if(currentIndex > -1) { - ListenerEntry current = list.get(currentIndex); - if(!current.base.isDisposed()) return null; - list.set(currentIndex, result); - } else { - list.add(result); - } - - if(DebugPolicy.LISTENER) { - new Exception().printStackTrace(); - System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); - } - - return result; - - } - - private void scheduleListener(ListenerEntry entry) { - assert (entry != null); - if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure); - scheduledListeners.add(entry); - } - - private void removeListener(ListenerEntry entry) { - assert (entry != null); - ArrayList list = listeners.get(entry.entry); - if(list == null) return; - boolean success = list.remove(entry); - assert (success); - if (list.isEmpty()) - listeners.remove(entry.entry); - } - - private boolean hasListener(CacheEntry entry) { - if(listeners.get(entry) != null) return true; - return false; - } - - boolean hasListenerAfterDisposing(CacheEntry entry) { - if(listeners.get(entry) != null) { - ArrayList entries = listeners.get(entry); - ArrayList list = null; - for (ListenerEntry e : entries) { - if (e.base.isDisposed()) { - if(list == null) list = new ArrayList(); - list.add(e); - } - } - if(list != null) { - for (ListenerEntry e : list) { - entries.remove(e); - } - } - if (entries.isEmpty()) { - listeners.remove(entry); - return false; - } - return true; - } - return false; - } - - List getListenerEntries(CacheEntry entry) { - hasListenerAfterDisposing(entry); - if(listeners.get(entry) != null) - return listeners.get(entry); - else - return Collections.emptyList(); - } - - void processListenerReport(CacheEntry entry, Map> workarea) { - - if(!workarea.containsKey(entry)) { - - HashSet ls = new HashSet(); - for(ListenerEntry e : getListenerEntries(entry)) - ls.add(e.base); - - workarea.put(entry, ls); - - for(CacheEntry parent : entry.getParents(this)) { - processListenerReport(parent, workarea); - ls.addAll(workarea.get(parent)); - } - - } - - } - - public synchronized ListenerReport getListenerReport() throws IOException { - - class ListenerReportImpl implements ListenerReport { - - Map> workarea = new HashMap>(); - - @Override - public void print(PrintStream b) { - Map hist = new HashMap(); - for(Map.Entry> e : workarea.entrySet()) { - for(ListenerBase l : e.getValue()) { - Integer i = hist.get(l); - hist.put(l, i != null ? i-1 : -1); - } - } - - for(Pair p : CollectionUtils.valueSortedEntries(hist)) { - b.print("" + -p.second + " " + p.first + "\n"); - } - - b.flush(); - } - - } - - ListenerReportImpl result = new ListenerReportImpl(); - - Collection all = allCaches(new CacheCollectionResult()).toCollection(); - for(CacheEntryBase entry : all) { - hasListenerAfterDisposing(entry); - } - for(CacheEntryBase entry : all) { - processListenerReport(entry, result.workarea); - } - - return result; - - } - - public synchronized String reportListeners(File file) throws IOException { - - if (!isAlive()) - return "Disposed!"; - - PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); - ListenerReport report = getListenerReport(); - report.print(b); - - return "Done reporting listeners."; - - } - - void processParentReport(CacheEntry entry, Map> workarea) { - - if(entry.isDiscarded()) return; - if(workarea.containsKey(entry)) return; - - Iterable parents = entry.getParents(this); - HashSet ps = new HashSet(); - for(CacheEntry e : parents) { - if(e.isDiscarded()) continue; - ps.add(e); - processParentReport(e, workarea); - } - workarea.put(entry, ps); - - } - - public synchronized String reportQueryActivity(File file) throws IOException { - - System.err.println("reportQueries " + file.getAbsolutePath()); - - if (!isAlive()) - return "Disposed!"; - - PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); - - List> entries = CollectionUtils.valueSortedEntries(Development.histogram); - Collections.reverse(entries); - - for(Pair entry : entries) { - b.println(entry.first + ": " + entry.second); - } - - b.close(); - - Development.histogram.clear(); - - return "OK"; - - } - - public synchronized String reportQueries(File file) throws IOException { - - System.err.println("reportQueries " + file.getAbsolutePath()); - - if (!isAlive()) - return "Disposed!"; - - PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); - - long start = System.nanoTime(); - -// ArrayList all = ; - - Map> workarea = new HashMap>(); - Collection caches = allCaches(new CacheCollectionResult()).toCollection(); - for(CacheEntryBase entry : caches) { - processParentReport(entry, workarea); - } - - // for(CacheEntry e : all) System.err.println("entry: " + e); - - long duration = System.nanoTime() - start; - System.err.println("Query root set in " + 1e-9*duration + "s."); - - start = System.nanoTime(); - - HashMap flagMap = new HashMap(); - - int listeners = 0; - - for(CacheEntry entry : workarea.keySet()) { - boolean listener = hasListenerAfterDisposing(entry); - boolean hasParents = entry.getParents(this).iterator().hasNext(); - if(listener) { - // Bound - flagMap.put(entry, 0); - } else if (!hasParents) { - // Unbound - flagMap.put(entry, 1); - } else { - // Unknown - flagMap.put(entry, 2); - } - // // Write leaf bit - // entry.flags |= 4; - } - - boolean done = true; - int loops = 0; - - do { - - done = true; - - long start2 = System.nanoTime(); - - int boundCounter = 0; - int unboundCounter = 0; - int unknownCounter = 0; - - for(CacheEntry entry : workarea.keySet()) { - - //System.err.println("process " + entry); - - int flags = flagMap.get(entry); - int bindStatus = flags & 3; - - if(bindStatus == 0) boundCounter++; - else if(bindStatus == 1) unboundCounter++; - else if(bindStatus == 2) unknownCounter++; - - if(bindStatus < 2) continue; - - int newStatus = 1; - for(CacheEntry parent : entry.getParents(this)) { - - if(parent.isDiscarded()) flagMap.put(parent, 1); - - int flags2 = flagMap.get(parent); - int bindStatus2 = flags2 & 3; - // Parent is bound => child is bound - if(bindStatus2 == 0) { - newStatus = 0; - break; - } - // Parent is unknown => child is unknown - else if (bindStatus2 == 2) { - newStatus = 2; - done = false; - break; - } - } - - flagMap.put(entry, newStatus); - - } - - duration = System.nanoTime() - start2; - System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s."); - b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s."); - - } while(!done && loops++ < 20); - - if(loops >= 20) { - - for(CacheEntry entry : workarea.keySet()) { - - int bindStatus = flagMap.get(entry); - if(bindStatus == 2) System.err.println("Undefined bind status for " + entry); - - } - - } - - duration = System.nanoTime() - start; - System.err.println("Query analysis in " + 1e-9*duration + "s."); - - Map, Integer> counts = new HashMap, Integer>(); - - for(CacheEntry entry : workarea.keySet()) { - Class clazz = entry.getClass(); - if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass(); - else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass(); - else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass(); - else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass(); - else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass(); - Integer c = counts.get(clazz); - if(c == null) counts.put(clazz, -1); - else counts.put(clazz, c-1); - } - - b.print("// Simantics DB client query report file\n"); - b.print("// This file contains the following information\n"); - b.print("// -The amount of cached query instances per query class\n"); - b.print("// -The sizes of retained child sets\n"); - b.print("// -List of parents for each query (search for 'P ')\n"); - b.print("// -Followed by status, where\n"); - b.print("// -0=bound\n"); - b.print("// -1=free\n"); - b.print("// -2=unknown\n"); - b.print("// -L=has listener\n"); - b.print("// -List of children for each query (search for 'C ')\n"); - - b.print("----------------------------------------\n"); - - b.print("// Queries by class\n"); - for(Pair, Integer> p : CollectionUtils.valueSortedEntries(counts)) { - b.print(-p.second + " " + p.first.getName() + "\n"); - } - - Map hist = new HashMap(); - for(CacheEntry e : workarea.keySet()) - hist.put(e, -1); - - boolean changed = true; - int iter = 0; - while(changed && iter++<50) { - - changed = false; - - Map newHist = new HashMap(); - for(CacheEntry e : workarea.keySet()) - newHist.put(e, -1); - - for(Map.Entry> e : workarea.entrySet()) { - Integer c = hist.get(e.getKey()); - for(CacheEntry p : e.getValue()) { - Integer i = newHist.get(p); - newHist.put(p, i+c); - } - } - for(CacheEntry e : workarea.keySet()) { - Integer value = newHist.get(e); - Integer old = hist.get(e); - if(!value.equals(old)) { - hist.put(e, value); -// System.err.println("hist " + e + ": " + old + " => " + value); - changed = true; - } - } - - System.err.println("Retained set iteration " + iter); - - } - - b.print("// Queries by retained set\n"); - for(Pair p : CollectionUtils.valueSortedEntries(hist)) { - b.print("" + -p.second + " " + p.first + "\n"); - } - - HashMap> inverse = new HashMap>(); - - b.print("// Entry parent listing\n"); - for(CacheEntry entry : workarea.keySet()) { - int status = flagMap.get(entry); - boolean hasListener = hasListenerAfterDisposing(entry); - b.print("Q " + entry.toString()); - if(hasListener) { - b.print(" (L" + status + ")"); - listeners++; - } else { - b.print(" (" + status + ")"); - } - b.print("\n"); - for(CacheEntry parent : workarea.get(entry)) { - Collection inv = inverse.get(parent); - if(inv == null) { - inv = new ArrayList(); - inverse.put(parent, inv); - } - inv.add(entry); - b.print(" " + parent.toString()); - b.print("\n"); - } - } - - b.print("// Entry child listing\n"); - for(Map.Entry> entry : inverse.entrySet()) { - b.print("C " + entry.getKey().toString()); - b.print("\n"); - for(CacheEntry child : entry.getValue()) { - Integer h = hist.get(child); - if(h != null) { - b.print(" " + h); - } else { - b.print(" "); - } - b.print(" " + child.toString()); - b.print("\n"); - } - } - - b.print("#queries: " + workarea.keySet().size() + "\n"); - b.print("#listeners: " + listeners + "\n"); - - b.close(); - - return "Dumped " + workarea.keySet().size() + " queries."; - - } - - class UpdateEntry { - - public CacheEntry caller; - - public CacheEntry entry; - - public int indent; - - public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) { - this.caller = caller; - this.entry = entry; - this.indent = indent; - } - - }; - - boolean removeQuery(CacheEntry entry) { - - // This entry has been removed before. No need to do anything here. - if(entry.isDiscarded()) return false; - - assert (!entry.isDiscarded()); - - Query query = entry.getQuery(); - - query.removeEntry(this); - - updates++; - size--; - - if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0) - boundQueries--; - - entry.discard(); - - return true; - - } - - /** - * - * @return true if this entry is being listened - */ - private boolean updateQuery(UpdateEntry e, LinkedList todo, IdentityHashMap immediates) throws DatabaseException { - - assert (e != null); - - CacheEntry entry = e.entry; - -// System.err.println("updateQuery " + entry); - - /* - * If the dependency graph forms a DAG, some entries are inserted in the - * todo list many times. They only need to be processed once though. - */ - if (entry.isDiscarded()) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - System.out.print("D"); - for (int i = 0; i < e.indent; i++) - System.out.print(" "); - System.out.println(entry.getQuery()); - } - } -// System.err.println(" => DISCARDED"); - return false; - } - - if (entry.isRefuted()) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - System.out.print("R"); - for (int i = 0; i < e.indent; i++) - System.out.print(" "); - System.out.println(entry.getQuery()); - } - } - return false; - } - - if (entry.isExcepted()) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - System.out.print("E"); - } - } - } - - if (entry.isPending()) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - System.out.print("P"); - } - } - } - - updates++; - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - System.out.print("U "); - for (int i = 0; i < e.indent; i++) - System.out.print(" "); - System.out.print(entry.getQuery()); - } - } - - Query query = entry.getQuery(); - int type = query.type(); - - boolean hasListener = hasListener(entry); - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - if(hasListener(entry)) { - System.out.println(" (L)"); - } else { - System.out.println(""); - } - } - } - - if(entry.isPending() || entry.isExcepted()) { - - // If updated - if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) { - - immediates.put(entry, entry); - - } else { - - if(hasListener) { - entry.refute(); - } else { - removeQuery(entry); - } - - } - - } else { - - // If updated - if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) { - - immediates.put(entry, entry); - - } else { - - if(hasListener) { - entry.refute(); - } else { - removeQuery(entry); - } - - } - - } - -// System.err.println(" => FOO " + type); - - if (hasListener) { - ArrayList entries = listeners.get(entry); - if(entries != null) { - for (ListenerEntry le : entries) { - scheduleListener(le); - } - } - } - - // If invalid, update parents - if (type == RequestFlags.INVALIDATE) { - updateParents(e.indent, entry, todo); - } - - return hasListener; - - } - - private void updateParents(int indent, CacheEntry entry, LinkedList todo) { - - Iterable oldParents = entry.getParents(this); - for (CacheEntry parent : oldParents) { -// System.err.println("updateParents " + entry + " => " + parent); - if(!parent.isDiscarded()) - todo.push(new UpdateEntry(entry, parent, indent + 2)); - } - - } - - private boolean pruneListener(ListenerEntry entry) { - if (entry.base.isDisposed()) { - removeListener(entry); - return true; - } else { - return false; - } - } - - /** - * @param av1 an array (guaranteed) - * @param av2 any object - * @return true if the two arrays are equal - */ - private final boolean arrayEquals(Object av1, Object av2) { - if (av2 == null) - return false; - Class c1 = av1.getClass().getComponentType(); - Class c2 = av2.getClass().getComponentType(); - if (c2 == null || !c1.equals(c2)) - return false; - boolean p1 = c1.isPrimitive(); - boolean p2 = c2.isPrimitive(); - if (p1 != p2) - return false; - if (!p1) - return Arrays.equals((Object[]) av1, (Object[]) av2); - if (boolean.class.equals(c1)) - return Arrays.equals((boolean[]) av1, (boolean[]) av2); - else if (byte.class.equals(c1)) - return Arrays.equals((byte[]) av1, (byte[]) av2); - else if (int.class.equals(c1)) - return Arrays.equals((int[]) av1, (int[]) av2); - else if (long.class.equals(c1)) - return Arrays.equals((long[]) av1, (long[]) av2); - else if (float.class.equals(c1)) - return Arrays.equals((float[]) av1, (float[]) av2); - else if (double.class.equals(c1)) - return Arrays.equals((double[]) av1, (double[]) av2); - throw new RuntimeException("??? Contact application querySupport."); - } - - - - final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) { - - try { - - Query query = entry.getQuery(); - - if(DebugPolicy.RECOMPUTE) System.out.println("R " + query); - - entry.prepareRecompute(querySupport); - - ReadGraphImpl parentGraph = graph.withParent(entry); - - query.recompute(parentGraph, this, entry); - - if(entry.isExcepted()) return ListenerEntry.NO_VALUE; - - Object newValue = entry.getResult(); - - if (ListenerEntry.NO_VALUE == oldValue) { - if(DebugPolicy.CHANGES) { - System.out.println("C " + query); - System.out.println("- " + oldValue); - System.out.println("- " + newValue); - } - return newValue; - } - - boolean changed = false; - - if (newValue != null) { - if (newValue.getClass().isArray()) { - changed = !arrayEquals(newValue, oldValue); - } else { - changed = !newValue.equals(oldValue); - } - } else - changed = (oldValue != null); - - if(DebugPolicy.CHANGES && changed) { - System.out.println("C " + query); - System.out.println("- " + oldValue); - System.out.println("- " + newValue); - } - - return changed ? newValue : ListenerEntry.NOT_CHANGED; - - } catch (Throwable t) { - - Logger.defaultLogError(t); - entry.except(t); - return ListenerEntry.NO_VALUE; - - } - - } - - public boolean hasScheduledUpdates() { - return !scheduledListeners.isEmpty(); - } - - public void performScheduledUpdates(WriteGraphImpl graph) { - - assert (!updating); - assert (!collecting); - assert (!firingListeners); - - firingListeners = true; - - try { - - // Performing may cause further events to be scheduled. - while (!scheduledListeners.isEmpty()) { - -// graph.restart(); -// graph.state.barrier.inc(); - - // Clone current events to make new entries possible during - // firing. - THashSet entries = scheduledListeners; - scheduledListeners = new THashSet(); - - ArrayList schedule = new ArrayList(); - - for (ListenerEntry listenerEntry : entries) { - - if (pruneListener(listenerEntry)) { - if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure); - continue; - } - - final CacheEntry entry = listenerEntry.entry; - assert (entry != null); - - Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown()); - - if (newValue != ListenerEntry.NOT_CHANGED) { - if(DebugPolicy.LISTENER) - System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); - schedule.add(listenerEntry); - listenerEntry.setLastKnown(entry.getResult()); - } - - } - - for(ListenerEntry listenerEntry : schedule) { - final CacheEntry entry = listenerEntry.entry; - if(DebugPolicy.LISTENER) - System.out.println("Firing " + listenerEntry.procedure); - try { - if(DebugPolicy.LISTENER) - System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); - entry.performFromCache(graph, this, listenerEntry.procedure); - } catch (Throwable t) { - t.printStackTrace(); - } - } - -// graph.state.barrier.dec(); -// graph.waitAsync(null); -// graph.state.barrier.assertReady(); - - } - - } finally { - firingListeners = false; - } - - } - - /** - * - * @return true if this entry still has listeners - */ - public boolean update(final ReadGraphImpl graph, final CacheEntry entry) { - - assert (!collecting); - assert (!updating); - updating = true; - - boolean hadListeners = false; - boolean listenersUnknown = false; - - try { - - assert(entry != null); - LinkedList todo = new LinkedList(); - IdentityHashMap immediates = new IdentityHashMap(); - todo.add(new UpdateEntry(null, entry, 0)); - - while(true) { - - // Walk the tree and collect immediate updates - while (!todo.isEmpty()) { - UpdateEntry e = todo.pop(); - hadListeners |= updateQuery(e, todo, immediates); - } - - if(immediates.isEmpty()) break; - - // Evaluate all immediate updates and collect parents to update - for(CacheEntry immediate : immediates.values()) { - - if(immediate.isDiscarded()) { - continue; - } - - if(immediate.isExcepted()) { - - Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE); - if (newValue != ListenerEntry.NOT_CHANGED) - updateParents(0, immediate, todo); - - } else { - - Object oldValue = immediate.getResult(); - Object newValue = compareTo(graph, immediate, oldValue); - - if (newValue != ListenerEntry.NOT_CHANGED) { - updateParents(0, immediate, todo); - } else { - // If not changed, keep the old value - immediate.setResult(oldValue); - listenersUnknown = true; - } - - } - - } - immediates.clear(); - - } - - } catch (Throwable t) { - Logger.defaultLogError(t); - } - - assert (updating); - updating = false; - - return hadListeners | listenersUnknown; - - } - - volatile public boolean dirty = false; - - private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet(); - private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet(); - private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet(); - // Maybe use a mutex from util.concurrent? - private Object primitiveUpdateLock = new Object(); - private THashSet scheduledPrimitiveUpdates = new THashSet(); - - public void performDirtyUpdates(final ReadGraphImpl graph) { - - dirty = false; - lastInvalidate = 0; - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - System.err.println("== Query update =="); - } - } - - // Special case - one statement - if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) { - - long arg0 = scheduledObjectUpdates.getFirst(); - - final int subject = (int)(arg0 >>> 32); - final int predicate = (int)(arg0 & 0xffffffff); - - for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o); - for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o); - for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o); - - if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) { - PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject); - if(principalTypes != null) update(graph, principalTypes); - Types types = Types.entry(QueryProcessor.this, subject); - if(types != null) update(graph, types); - } - - if(predicate == subrelationOf) { - SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject); - if(superRelations != null) update(graph, superRelations); - } - - DirectPredicates dp = DirectPredicates.entry(QueryProcessor.this, subject); - if(dp != null) update(graph, dp); - OrderedSet os = OrderedSet.entry(QueryProcessor.this, predicate); - if(os != null) update(graph, os); - - scheduledObjectUpdates.clear(); - return; - - } - - // Special case - one value - if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) { - - int arg0 = scheduledValueUpdates.getFirst(); - - ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0); - if(valueQuery != null) update(graph, valueQuery); - - scheduledValueUpdates.clear(); - return; - - } - - final TIntHashSet predicates = new TIntHashSet(); - final TIntHashSet orderedSets = new TIntHashSet(); - - THashSet primitiveUpdates; - synchronized (primitiveUpdateLock) { - primitiveUpdates = scheduledPrimitiveUpdates; - scheduledPrimitiveUpdates = new THashSet(); - } - - primitiveUpdates.forEach(new TObjectProcedure() { - - @Override - public boolean execute(Object arg0) { - - ExternalReadEntry query = (ExternalReadEntry)externalReadMap.get(arg0); - if (query != null) { - boolean listening = update(graph, query); - if (!listening && !query.hasParents()) { - externalReadMap.remove(arg0); - query.discard(); - } - } - return true; - } - - }); - - scheduledValueUpdates.forEach(new TIntProcedure() { - - @Override - public boolean execute(int arg0) { - ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0); - if(valueQuery != null) update(graph, valueQuery); - return true; - } - - }); - - scheduledInvalidates.forEach(new TIntProcedure() { - - @Override - public boolean execute(int resource) { - - ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, resource); - if(valueQuery != null) update(graph, valueQuery); - - PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, resource); - if(principalTypes != null) update(graph, principalTypes); - Types types = Types.entry(QueryProcessor.this, resource); - if(types != null) update(graph, types); - - SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource); - if(superRelations != null) update(graph, superRelations); - - predicates.add(resource); - - return true; - } - - }); - - scheduledObjectUpdates.forEach(new TLongProcedure() { - - @Override - public boolean execute(long arg0) { - - final int subject = (int)(arg0 >>> 32); - final int predicate = (int)(arg0 & 0xffffffff); - - if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) { - PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject); - if(principalTypes != null) update(graph, principalTypes); - Types types = Types.entry(QueryProcessor.this, subject); - if(types != null) update(graph, types); - } - - if(predicate == subrelationOf) { - SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject); - if(superRelations != null) update(graph, superRelations); - } - - predicates.add(subject); - orderedSets.add(predicate); - - return true; - - } - - }); - - predicates.forEach(new TIntProcedure() { - - @Override - public boolean execute(final int subject) { - - for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o); - for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o); - for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o); - - DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject); - if(entry != null) update(graph, entry); - - return true; - - } - - }); - - orderedSets.forEach(new TIntProcedure() { - - @Override - public boolean execute(int orderedSet) { - - OrderedSet entry = OrderedSet.entry(QueryProcessor.this, orderedSet); - if(entry != null) update(graph, entry); - - return true; - - } - - }); - - // for (Integer subject : predicates) { - // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject); - // if(entry != null) update(graph, entry); - // } - - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - System.err.println("== Query update ends =="); - } - } - - scheduledValueUpdates.clear(); - scheduledObjectUpdates.clear(); - scheduledInvalidates.clear(); - - } - - public void updateValue(final int resource) { - scheduledValueUpdates.add(resource); - dirty = true; - } - - public void updateStatements(final int resource, final int predicate) { - scheduledObjectUpdates.add((((long)resource) << 32) + predicate); - dirty = true; - } - - private int lastInvalidate = 0; - - public void invalidateResource(final int resource) { - if(lastInvalidate == resource) return; - scheduledValueUpdates.add(resource); - lastInvalidate = resource; - dirty = true; - } - - public void updatePrimitive(final ExternalRead primitive) { - - // External reads may be updated from arbitrary threads. - // Synchronize to prevent race-conditions. - synchronized (primitiveUpdateLock) { - scheduledPrimitiveUpdates.add(primitive); - } - querySupport.dirtyPrimitives(); - - } - - @Override - public synchronized String toString() { - return "QueryProvider [size = " + size + ", hits = " + hits + " misses = " + misses + ", updates = " + updates + "]"; - } - - @Override - protected void doDispose() { - - for(int index = 0; index < THREADS; index++) { - executors[index].dispose(); - } - - // First just wait - for(int i=0;i<100;i++) { - - boolean alive = false; - for(int index = 0; index < THREADS; index++) { - alive |= executors[index].isAlive(); - } - if(!alive) return; - try { - Thread.sleep(5); - } catch (InterruptedException e) { - Logger.defaultLogError(e); - } - - } - - // Then start interrupting - for(int i=0;i<100;i++) { - - boolean alive = false; - for(int index = 0; index < THREADS; index++) { - alive |= executors[index].isAlive(); - } - if(!alive) return; - for(int index = 0; index < THREADS; index++) { - executors[index].interrupt(); - } - } - - // // Then just destroy - // for(int index = 0; index < THREADS; index++) { - // executors[index].destroy(); - // } - - for(int index = 0; index < THREADS; index++) { - try { - executors[index].join(5000); - } catch (InterruptedException e) { - Logger.defaultLogError("QueryThread " + index + " will not die.", e); - } - executors[index] = null; - } - - } - - public int getHits() { - return hits; - } - - public int getMisses() { - return misses; - } - - public int getSize() { - return size; - } - - public Set getReferencedClusters() { - HashSet result = new HashSet(); - for (CacheEntry entry : objectsMap.values()) { - Objects query = (Objects) entry.getQuery(); - result.add(querySupport.getClusterId(query.r1())); - } - for (CacheEntry entry : directPredicatesMap.values()) { - DirectPredicates query = (DirectPredicates) entry.getQuery(); - result.add(querySupport.getClusterId(query.id)); - } - for (CacheEntry entry : valueMap.values()) { - ValueQuery query = (ValueQuery) entry.getQuery(); - result.add(querySupport.getClusterId(query.id)); - } - return result; - } - - public void assertDone() { - } - - CacheCollectionResult allCaches(CacheCollectionResult result) { - - int level = Integer.MAX_VALUE; - directPredicatesMap.values(level, result); - principalTypesMap.values(level, result); - for(CacheEntryBase e : uriToResourceMap.values()) - if(e.getLevel() <= level) - result.add(e); - for(CacheEntryBase e : namespaceIndexMap22.values()) - if(e.getLevel() <= level) - result.add(e); - projectsMap.values(level, result); - - relationInfoMap.values(level, result); - superTypesMap.values(level, result); - typeHierarchyMap.values(level, result); - superRelationsMap.values(level, result); - typesMap.values(level, result); - - valueMap.values(level, result); - directObjectsMap.values(level, result); - objectsMap.values(level, result); - orderedSetMap.values(level, result); - predicatesMap.values(level, result); - - statementsMap.values(level, result); - assertedPredicatesMap.values(level, result); - assertedStatementsMap.values(level, result); - externalReadMap.values(level, result); - asyncReadMap.values(level, result); - - readMap.values(level, result); - asyncMultiReadMap.values(level, result); - multiReadMap.values(level, result); - - return result; - - } - - public void printDiagnostics() { - } - - public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) { - querySupport.requestCluster(graph, clusterId, runnable); - } - - public int clean() { - collector.collect(0, Integer.MAX_VALUE); - return size; - } - - public void clean(final Collection> requests) { - QueryCollectorSupport collectorSupport = new QueryCollectorSupport() { - Iterator> iterator = requests.iterator(); - @Override - public CacheCollectionResult allCaches() { - throw new UnsupportedOperationException(); - } - @Override - public CacheEntryBase iterate(int level) { - if(iterator.hasNext()) { - ExternalRead request = iterator.next(); - ExternalReadEntry entry = externalReadMap.get(request); - if (entry != null) return entry; - else return iterate(level); - } else { - iterator = requests.iterator(); - return null; - } - } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - @Override - public void setLevel(CacheEntryBase entry, int level) { - throw new UnsupportedOperationException(); - } - @Override - public Collection getRootList() { - ArrayList result = new ArrayList(requests.size()); - for (ExternalRead request : requests) { - ExternalReadEntry entry = externalReadMap.get(request); - if (entry != null) - result.add(entry); - } - return result; - } - @Override - public int getCurrentSize() { - return size; - } - @Override - public int calculateCurrentSize() { - // This tells the collector to attempt collecting everything. - return Integer.MAX_VALUE; - } - @Override - public boolean start(boolean flush) { - return true; - } - }; - new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE); - } - - public void scanPending() { - - ArrayList entries = new ArrayList(); - - entries.addAll(directPredicatesMap.values()); - entries.addAll(principalTypesMap.values()); - entries.addAll(uriToResourceMap.values()); - entries.addAll(namespaceIndexMap22.values()); - entries.addAll(projectsMap.values()); - entries.addAll(relationInfoMap.values()); - entries.addAll(superTypesMap.values()); - entries.addAll(superRelationsMap.values()); - entries.addAll(typesMap.values()); - entries.addAll(valueMap.values()); - entries.addAll(directObjectsMap.values()); - entries.addAll(objectsMap.values()); - entries.addAll(orderedSetMap.values()); - entries.addAll(predicatesMap.values()); - entries.addAll(orderedSetMap.values()); - entries.addAll(statementsMap.values()); - // entries.addAll(assertedObjectsMap.values()); - entries.addAll(assertedPredicatesMap.values()); - entries.addAll(assertedStatementsMap.values()); - entries.addAll(externalReadMap.values()); - entries.addAll(asyncReadMap.values()); - entries.addAll(externalReadMap.values()); - entries.addAll(readMap.values()); - entries.addAll(asyncMultiReadMap.values()); - entries.addAll(multiReadMap.values()); - entries.addAll(readMap.values()); - System.out.println(entries.size() + " entries."); - for(Object e : entries) { - if(e instanceof CacheEntry) { - CacheEntry en = (CacheEntry)e; - if(en.isPending()) System.out.println("pending " + e); - if(en.isExcepted()) System.out.println("excepted " + e); - if(en.isDiscarded()) System.out.println("discarded " + e); - if(en.isRefuted()) System.out.println("refuted " + e); - if(en.isFresh()) System.out.println("fresh " + e); - } else { - //System.out.println("Unknown object " + e); - } - } - - } - - public ReadGraphImpl graphForVirtualRequest() { - return ReadGraphImpl.createAsync(this); - } - - - private HashMap> builtinValues; - - public Class getBuiltinValue(Resource r) { - if(builtinValues == null) initBuiltinValues(); - return builtinValues.get(r); - } - - Exception callerException = null; - - public interface AsyncBarrier { - public void inc(); - public void dec(); - // public void inc(String debug); - // public void dec(String debug); - } - -// final public QueryProcessor processor; -// final public QuerySupport support; - - // boolean disposed = false; - - private void initBuiltinValues() { - - Layer0 b = getSession().peekService(Layer0.class); - if(b == null) return; - - builtinValues = new HashMap>(); - - builtinValues.put(b.String, String.class); - builtinValues.put(b.Double, Double.class); - builtinValues.put(b.Float, Float.class); - builtinValues.put(b.Long, Long.class); - builtinValues.put(b.Integer, Integer.class); - builtinValues.put(b.Byte, Byte.class); - builtinValues.put(b.Boolean, Boolean.class); - - builtinValues.put(b.StringArray, String[].class); - builtinValues.put(b.DoubleArray, double[].class); - builtinValues.put(b.FloatArray, float[].class); - builtinValues.put(b.LongArray, long[].class); - builtinValues.put(b.IntegerArray, int[].class); - builtinValues.put(b.ByteArray, byte[].class); - builtinValues.put(b.BooleanArray, boolean[].class); - - } - -// public ReadGraphSupportImpl(final QueryProcessor provider2) { -// -// if (null == provider2) { -// this.processor = null; -// support = null; -// return; -// } -// this.processor = provider2; -// support = provider2.getCore(); -// initBuiltinValues(); -// -// } - -// final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) { -// return new ReadGraphSupportImpl(impl.processor); -// } - - @Override - final public Session getSession() { - return session; - } - - final public ResourceSupport getResourceSupport() { - return resourceSupport; - } - - @Override - final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - IntProcedure ip = new IntProcedure() { - - AtomicBoolean first = new AtomicBoolean(true); - - @Override - public void execute(ReadGraphImpl graph, int i) { - try { - if(first.get()) { - procedure.execute(graph, querySupport.getResource(i)); - } else { - procedure.execute(impl.newRestart(graph), querySupport.getResource(i)); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(ReadGraphImpl graph) { - try { - if(first.compareAndSet(true, false)) { - procedure.finished(graph); -// impl.state.barrier.dec(this); - } else { - procedure.finished(impl.newRestart(graph)); - } - - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - if(first.compareAndSet(true, false)) { - procedure.exception(graph, t); -// impl.state.barrier.dec(this); - } else { - procedure.exception(impl.newRestart(graph), t); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - }; - - int sId = querySupport.getId(subject); - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Predicates#" + sId); -// else impl.state.barrier.inc(null, null); - - Predicates.queryEach(impl, sId, this, impl.parent, listener, ip); - - } - - @Override - final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - Predicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() { - - @Override - public void execute(ReadGraphImpl graph, int i) { - try { - procedure.execute(querySupport.getResource(i)); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(ReadGraphImpl graph) { - try { - procedure.finished(); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - }); - - } - - @Override - final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable { - - assert(subject != null); - - return Predicates.queryEach2(impl, querySupport.getId(subject), this, impl.parent); - - } - - - @Override - final public void forEachStatement(final ReadGraphImpl impl, final Resource subject, - final Resource predicate, final MultiProcedure procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() { - - @Override - public void execute(ReadGraphImpl graph, int s, int p, int o) { - try { - procedure.execute(querySupport.getStatement(s, p, o)); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(ReadGraphImpl graph) { - try { - procedure.finished(); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - }); - - } - - @Override - final public void forEachStatement(final ReadGraphImpl impl, final Resource subject, - final Resource predicate, final AsyncMultiProcedure procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() { - - boolean first = true; - - @Override - public void execute(ReadGraphImpl graph, int s, int p, int o) { - try { - if(first) { - procedure.execute(graph, querySupport.getStatement(s, p, o)); - } else { - procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o)); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(ReadGraphImpl graph) { - - try { - if(first) { - first = false; - procedure.finished(graph); -// impl.state.barrier.dec(this); - } else { - procedure.finished(impl.newRestart(graph)); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - - try { - if(first) { - first = false; - procedure.exception(graph, t); -// impl.state.barrier.dec(this); - } else { - procedure.exception(impl.newRestart(graph), t); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - - } - - }; - - int sId = querySupport.getId(subject); - int pId = querySupport.getId(predicate); - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId); -// else impl.state.barrier.inc(null, null); - - Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc); - - } - - @Override - final public void forEachStatement(final ReadGraphImpl impl, final Resource subject, - final Resource predicate, final StatementProcedure procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() { - - boolean first = true; - - @Override - public void execute(ReadGraphImpl graph, int s, int p, int o) { - try { - if(first) { - procedure.execute(graph, s, p, o); - } else { - procedure.execute(impl.newRestart(graph), s, p, o); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(ReadGraphImpl graph) { - - try { - if(first) { - first = false; - procedure.finished(graph); -// impl.state.barrier.dec(this); - } else { - procedure.finished(impl.newRestart(graph)); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - - try { - if(first) { - first = false; - procedure.exception(graph, t); -// impl.state.barrier.dec(this); - } else { - procedure.exception(impl.newRestart(graph), t); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - - } - - }; - - int sId = querySupport.getId(subject); - int pId = querySupport.getId(predicate); - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId); -// else impl.state.barrier.inc(null, null); - - Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc); - - } - - @Override - final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - - forEachStatement(impl, subject, predicate, new AsyncMultiListener() { - - private Set current = null; - private Set run = new HashSet(); - - @Override - public void execute(AsyncReadGraph graph, Statement result) { - - boolean found = false; - - if(current != null) { - - found = current.remove(result); - - } - - if(!found) procedure.add(graph, result); - - run.add(result); - - } - - @Override - public void finished(AsyncReadGraph graph) { - - if(current != null) { - for(Statement r : current) procedure.remove(graph, r); - } - - current = run; - - run = new HashSet(); - - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - procedure.exception(graph, t); - } - - @Override - public boolean isDisposed() { - return procedure.isDisposed(); - } - - }); - - } - - @Override - final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject, - final Resource predicate, final AsyncMultiProcedure procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() { - - @Override - public void execute(ReadGraphImpl graph, int s, int p, int o) { - try { - procedure.execute(graph, querySupport.getStatement(s, p, o)); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(ReadGraphImpl graph) { - try { - procedure.finished(graph); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - }); - - } - - private static ListenerBase getListenerBase(Object procedure) { - if(procedure instanceof ListenerBase) return (ListenerBase)procedure; - else return null; - } - - @Override - final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - Objects.runner(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() { - - @Override - public void execute(ReadGraphImpl graph, int i) { - try { - procedure.execute(querySupport.getResource(i)); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(ReadGraphImpl graph) { - try { - procedure.finished(); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - System.out.println("forEachObject exception " + t); - try { - procedure.exception(t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - }); - - } - - -// @Override -// final public void forEachDirectObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure procedure) { -// -// assert(subject != null); -// assert(predicate != null); -// assert(procedure != null); -// -// final ListenerBase listener = getListenerBase(procedure); -// -// int sId = querySupport.getId(subject); -// int pId = querySupport.getId(predicate); -// -// MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, support); -// -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectObjects" + sId + "#" + pId); -// else impl.state.barrier.inc(null, null); -// -// // final Exception caller = new Exception(); -// -// // final Pair exceptions = Pair.make(callerException, new Exception()); -// -// DirectObjects.queryEach(impl, sId, pId, processor, impl.parent, listener, proc); -// -// } - - @Override - final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport); - - int sId = querySupport.getId(subject); - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectPredicates" + sId); -// else impl.state.barrier.inc(null, null); - - DirectPredicates.queryEach(impl, sId, this, impl.parent, listener, proc); - - } - - @Override - final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure); - - } - - @Override - final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure, boolean ignoreVirtual) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual); - - } - - private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE); - - @Override - final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure procedure) { - - forEachObject(impl, subject, predicate, new AsyncMultiProcedure() { - - private Resource single = null; - - @Override - public synchronized void execute(AsyncReadGraph graph, Resource result) { - if(single == null) { - single = result; - } else { - single = INVALID_RESOURCE; - } - } - - @Override - public synchronized void finished(AsyncReadGraph graph) { - if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null); - else procedure.execute(graph, single); - } - - @Override - public synchronized void exception(AsyncReadGraph graph, Throwable throwable) { - procedure.exception(graph, throwable); - } - - }); - - } - - final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) { - - final int sId = querySupport.getId(subject); - final int pId = querySupport.getId(predicate); - - Objects.runner(impl, sId, pId, impl.parent, listener, procedure); - - } - - final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException { - - final int sId = querySupport.getId(subject); - final int pId = querySupport.getId(predicate); - - return Objects.runner2(impl, sId, pId, impl.parent); - - } - - final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure procedure) { - - assert(subject != null); - assert(predicate != null); - - final ListenerBase listener = getListenerBase(procedure); - - if(impl.parent != null || listener != null) { - - IntProcedure ip = new IntProcedure() { - - AtomicBoolean first = new AtomicBoolean(true); - - @Override - public void execute(ReadGraphImpl graph, int i) { - try { - if(first.get()) { - procedure.execute(impl, querySupport.getResource(i)); - } else { - procedure.execute(impl.newRestart(graph), querySupport.getResource(i)); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - - } - - @Override - public void finished(ReadGraphImpl graph) { - try { - if(first.compareAndSet(true, false)) { - procedure.finished(impl); -// impl.state.barrier.dec(this); - } else { - procedure.finished(impl.newRestart(graph)); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(this); - } - - @Override - public String toString() { - return "forEachObject with " + procedure; - } - - }; - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate); -// else impl.state.barrier.inc(null, null); - - forEachObject(impl, subject, predicate, listener, ip); - - } else { - - IntProcedure ip = new IntProcedure() { - - @Override - public void execute(ReadGraphImpl graph, int i) { - procedure.execute(graph, querySupport.getResource(i)); - } - - @Override - public void finished(ReadGraphImpl graph) { - procedure.finished(graph); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - procedure.exception(graph, t); - } - - @Override - public String toString() { - return "forEachObject with " + procedure; - } - - }; - - forEachObject(impl, subject, predicate, listener, ip); - - } - - } - - @Override - final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - - forEachObject(impl, subject, predicate, new AsyncMultiListener() { - - private Set current = null; - private Set run = new HashSet(); - - @Override - public void execute(AsyncReadGraph graph, Resource result) { - - boolean found = false; - - if(current != null) { - - found = current.remove(result); - - } - - if(!found) procedure.add(graph, result); - - run.add(result); - - } - - @Override - public void finished(AsyncReadGraph graph) { - - if(current != null) { - for(Resource r : current) procedure.remove(graph, r); - } - - current = run; - - run = new HashSet(); - - } - - @Override - public boolean isDisposed() { - return procedure.isDisposed(); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - procedure.exception(graph, t); - } - - @Override - public String toString() { - return "forObjectSet " + procedure; - } - - }); - - } - - @Override - final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener procedure) { - - assert(subject != null); - assert(procedure != null); - - forEachPredicate(impl, subject, new AsyncMultiListener() { - - private Set current = null; - private Set run = new HashSet(); - - @Override - public void execute(AsyncReadGraph graph, Resource result) { - - boolean found = false; - - if(current != null) { - - found = current.remove(result); - - } - - if(!found) procedure.add(graph, result); - - run.add(result); - - } - - @Override - public void finished(AsyncReadGraph graph) { - - if(current != null) { - for(Resource r : current) procedure.remove(graph, r); - } - - current = run; - - run = new HashSet(); - - } - - @Override - public boolean isDisposed() { - return procedure.isDisposed(); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - procedure.exception(graph, t); - } - - @Override - public String toString() { - return "forPredicateSet " + procedure; - } - - }); - - } - - @Override - final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener procedure) { - - assert(subject != null); - assert(procedure != null); - - forEachPrincipalType(impl, subject, new AsyncMultiListener() { - - private Set current = null; - private Set run = new HashSet(); - - @Override - public void execute(AsyncReadGraph graph, Resource result) { - - boolean found = false; - - if(current != null) { - - found = current.remove(result); - - } - - if(!found) procedure.add(graph, result); - - run.add(result); - - } - - @Override - public void finished(AsyncReadGraph graph) { - - if(current != null) { - for(Resource r : current) procedure.remove(graph, r); - } - - current = run; - - run = new HashSet(); - - } - - @Override - public boolean isDisposed() { - return procedure.isDisposed(); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - procedure.exception(graph, t); - } - - @Override - public String toString() { - return "forPrincipalTypeSet " + procedure; - } - - }); - - } - - @Override - final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - - forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener() { - - private Set current = null; - private Set run = new HashSet(); - - @Override - public void execute(AsyncReadGraph graph, Resource result) { - - boolean found = false; - - if(current != null) { - - found = current.remove(result); - - } - - if(!found) procedure.add(graph, result); - - run.add(result); - - } - - @Override - public void finished(AsyncReadGraph graph) { - - if(current != null) { - for(Resource r : current) procedure.remove(graph, r); - } - - current = run; - - run = new HashSet(); - - } - - @Override - public boolean isDisposed() { - return procedure.isDisposed(); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - procedure.exception(graph, t); - } - - @Override - public String toString() { - return "forObjectSet " + procedure; - } - - }); - - } - - @Override - final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - - forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener() { - - private Set current = null; - private Set run = new HashSet(); - - @Override - public void execute(AsyncReadGraph graph, Statement result) { - - boolean found = false; - - if(current != null) { - - found = current.remove(result); - - } - - if(!found) procedure.add(graph, result); - - run.add(result); - - } - - @Override - public void finished(AsyncReadGraph graph) { - - if(current != null) { - for(Statement s : current) procedure.remove(graph, s); - } - - current = run; - - run = new HashSet(); - - } - - @Override - public boolean isDisposed() { - return procedure.isDisposed(); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - procedure.exception(graph, t); - } - - @Override - public String toString() { - return "forStatementSet " + procedure; - } - - }); - - } - - @Override - final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject, - final Resource predicate, final AsyncMultiProcedure procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedure() { - - @Override - public void execute(ReadGraphImpl graph, int s, int p, int o) { - try { - procedure.execute(graph, querySupport.getResource(o)); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(ReadGraphImpl graph) { - try { - procedure.finished(graph); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - }); - - } - - @Override - final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - IntProcedure ip = new IntProcedure() { - - @Override - public void execute(ReadGraphImpl graph, int i) { - try { - procedure.execute(graph, querySupport.getResource(i)); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(ReadGraphImpl graph) { - try { - procedure.finished(graph); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(this); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(this); - } - - }; - - int sId = querySupport.getId(subject); - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId); -// else impl.state.barrier.inc(null, null); - - PrincipalTypes.queryEach(impl, sId, this, impl.parent, listener, ip); - - } - - @Override - final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - PrincipalTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() { - - @Override - public void execute(ReadGraphImpl graph, int i) { - try { - procedure.execute(querySupport.getResource(i)); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(ReadGraphImpl graph) { - try { - procedure.finished(); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - }); - - } - - final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure> procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - InternalProcedure ip = new InternalProcedure() { - - AtomicBoolean first = new AtomicBoolean(true); - - @Override - public void execute(final ReadGraphImpl graph, IntSet set) { - try { - if(first.compareAndSet(true, false)) { - procedure.execute(graph, set); -// impl.state.barrier.dec(this); - } else { - procedure.execute(impl.newRestart(graph), set); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - if(first.compareAndSet(true, false)) { - procedure.exception(graph, t); -// impl.state.barrier.dec(this); - } else { - procedure.exception(impl.newRestart(graph), t); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - }; - - int sId = querySupport.getId(subject); - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Types" + sId); -// else impl.state.barrier.inc(null, null); - - Types.queryEach(impl, sId, this, impl.parent, listener, ip); - - } - - @Override - final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable { - - assert(subject != null); - - return Types.queryEach2(impl, querySupport.getId(subject), this, impl.parent); - - } - - @Override - final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - RelationInfoQuery.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure() { - - AtomicBoolean first = new AtomicBoolean(true); - - @Override - public void execute(final ReadGraphImpl graph, RelationInfo set) { - try { - if(first.compareAndSet(true, false)) { - procedure.execute(graph, set); -// impl.state.barrier.dec(); - } else { - procedure.execute(impl.newRestart(graph), set); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - if(first.compareAndSet(true, false)) { - procedure.exception(graph, t); -// impl.state.barrier.dec("ReadGraphSupportImpl.1353"); - } else { - procedure.exception(impl.newRestart(graph), t); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - }); - - } - - @Override - final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure> procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - SuperTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure() { - - AtomicBoolean first = new AtomicBoolean(true); - - @Override - public void execute(final ReadGraphImpl graph, IntSet set) { -// final HashSet result = new HashSet(); -// set.forEach(new TIntProcedure() { -// -// @Override -// public boolean execute(int type) { -// result.add(querySupport.getResource(type)); -// return true; -// } -// -// }); - try { - if(first.compareAndSet(true, false)) { - procedure.execute(graph, set); -// impl.state.barrier.dec(); - } else { - procedure.execute(impl.newRestart(graph), set); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - if(first.compareAndSet(true, false)) { - procedure.exception(graph, t); -// impl.state.barrier.dec(); - } else { - procedure.exception(impl.newRestart(graph), t); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - }); - - } - - @Override - final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - IntProcedure ip = new IntProcedureAdapter() { - - @Override - public void execute(final ReadGraphImpl graph, int superRelation) { - try { - procedure.execute(graph, querySupport.getResource(superRelation)); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(final ReadGraphImpl graph) { - try { - procedure.finished(graph); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(this); - } - - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(this); - } - - }; - - int sId = querySupport.getId(subject); - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId); -// else impl.state.barrier.inc(null, null); - - DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip); - - } - - @Override - final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure); - - } - - @Override - final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure> procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - InternalProcedure ip = new InternalProcedure() { - - @Override - public void execute(final ReadGraphImpl graph, IntSet set) { -// final HashSet result = new HashSet(); -// set.forEach(new TIntProcedure() { -// -// @Override -// public boolean execute(int type) { -// result.add(querySupport.getResource(type)); -// return true; -// } -// -// }); - try { - procedure.execute(graph, set); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(this); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(this); - } - - }; - - int sId = querySupport.getId(subject); - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId); -// else impl.state.barrier.inc(null, null); - - SuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip); - - } - - final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException { - - int sId = querySupport.getId(subject); - return ValueQuery.queryEach(impl, sId, impl.parent); - - } - - final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException { - - return ValueQuery.queryEach(impl, subject, impl.parent); - - } - - @Override - final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { - - assert(subject != null); - - int sId = querySupport.getId(subject); - - if(procedure != null) { - - final ListenerBase listener = getListenerBase(procedure); - - InternalProcedure ip = new InternalProcedure() { - - AtomicBoolean first = new AtomicBoolean(true); - - @Override - public void execute(ReadGraphImpl graph, byte[] result) { - try { - if(first.compareAndSet(true, false)) { - procedure.execute(graph, result); -// impl.state.barrier.dec(this); - } else { - procedure.execute(impl.newRestart(graph), result); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - if(first.compareAndSet(true, false)) { - procedure.exception(graph, t); -// impl.state.barrier.dec(this); - } else { - procedure.exception(impl.newRestart(graph), t); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - }; - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId); -// else impl.state.barrier.inc(null, null); - - return ValueQuery.queryEach(impl, sId, impl.parent, listener, ip); - - } else { - - return ValueQuery.queryEach(impl, sId, impl.parent, null, null); - - } - - } - - @Override - final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - if(impl.parent != null || listener != null) { - - InternalProcedure ip = new InternalProcedure() { - - AtomicBoolean first = new AtomicBoolean(true); - - @Override - public void execute(ReadGraphImpl graph, byte[] result) { - try { - if(first.compareAndSet(true, false)) { - procedure.execute(graph, result); -// impl.state.barrier.dec(this); - } else { - procedure.execute(impl.newRestart(graph), result); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - if(first.compareAndSet(true, false)) { - procedure.exception(graph, t); -// impl.state.barrier.dec(this); - } else { - procedure.exception(impl.newRestart(graph), t); - } - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - }; - - int sId = querySupport.getId(subject); - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId); -// else impl.state.barrier.inc(null, null); - - ValueQuery.queryEach(impl, sId, impl.parent, listener, ip); - - } else { - - InternalProcedure ip = new InternalProcedure() { - - @Override - public void execute(ReadGraphImpl graph, byte[] result) { - - procedure.execute(graph, result); - - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - - procedure.exception(graph, t); - - } - - }; - - int sId = querySupport.getId(subject); - - ValueQuery.queryEach(impl, sId, impl.parent, listener, ip); - - } - - } - - @Override - final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure procedure) { - - assert(relation != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - - IntProcedure ip = new IntProcedure() { - - private int result = 0; - - final AtomicBoolean found = new AtomicBoolean(false); - final AtomicBoolean done = new AtomicBoolean(false); - - @Override - public void finished(ReadGraphImpl graph) { - - // Shall fire exactly once! - if(done.compareAndSet(false, true)) { - try { - if(result == 0) { - procedure.exception(graph, new NoInverseException("")); -// impl.state.barrier.dec(this); - } else { - procedure.execute(graph, querySupport.getResource(result)); -// impl.state.barrier.dec(this); - } - } catch (Throwable t) { - Logger.defaultLogError(t); - } - } - - } - - @Override - public void execute(ReadGraphImpl graph, int i) { - - if(found.compareAndSet(false, true)) { - this.result = i; - } else { - // Shall fire exactly once! - if(done.compareAndSet(false, true)) { - try { - procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result)); -// impl.state.barrier.dec(this); - } catch (Throwable t) { - Logger.defaultLogError(t); - } - } - } - - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - // Shall fire exactly once! - if(done.compareAndSet(false, true)) { - try { - procedure.exception(graph, t); -// impl.state.barrier.dec(this); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - } - - }; - - int sId = querySupport.getId(relation); - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId); -// else impl.state.barrier.inc(null, null); - - Objects.runner(impl, sId, getInverseOf(), impl.parent, listener, ip); - - } - - @Override - final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure procedure) { - - assert(id != null); - assert(procedure != null); - - InternalProcedure ip = new InternalProcedure() { - - @Override - public void execute(ReadGraphImpl graph, Integer result) { - try { - procedure.execute(graph, querySupport.getResource(result)); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(this); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(this); - } - - }; - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource"); -// else impl.state.barrier.inc(null, null); - - forResource(impl, id, impl.parent, ip); - - } - - @Override - final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure procedure) { - - assert(id != null); - assert(procedure != null); - -// impl.state.barrier.inc(); - - forBuiltin(impl, id, impl.parent, new InternalProcedure() { - - @Override - public void execute(ReadGraphImpl graph, Integer result) { - try { - procedure.execute(graph, querySupport.getResource(result)); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - }); - - } - - @Override - final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - DirectPredicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() { - - boolean found = false; - - @Override - public void execute(ReadGraphImpl graph, int object) { - found = true; - } - - @Override - public void finished(ReadGraphImpl graph) { - try { - procedure.execute(graph, found); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - }); - - } - - @Override - final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - - AsyncMultiProcedure ip = new AsyncMultiProcedureAdapter() { - - boolean found = false; - - @Override - synchronized public void execute(AsyncReadGraph graph, Resource resource) { - found = true; - } - - @Override - synchronized public void finished(AsyncReadGraph graph) { - try { - procedure.execute(graph, found); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(this); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(this); - } - - }; - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate); -// else impl.state.barrier.inc(null, null); - - forEachObject(impl, subject, predicate, ip); - - } - - @Override - final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure procedure) { - - assert(subject != null); - assert(predicate != null); - assert(procedure != null); - -// impl.state.barrier.inc(); - - forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter() { - - boolean found = false; - - @Override - synchronized public void execute(AsyncReadGraph graph, Resource resource) { - if(resource.equals(object)) found = true; - } - - @Override - synchronized public void finished(AsyncReadGraph graph) { - try { - procedure.execute(graph, found); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - }); - - } - - @Override - final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - ValueQuery.queryEach(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure() { - - @Override - public void execute(ReadGraphImpl graph, byte[] object) { - boolean result = object != null; - try { - procedure.execute(graph, result); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - }); - - } - - @Override - final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { - - assert(subject != null); - assert(procedure != null); - - final ListenerBase listener = getListenerBase(procedure); - -// impl.state.barrier.inc(); - - OrderedSet.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() { - - @Override - public void exception(ReadGraphImpl graph, Throwable t) { - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - @Override - public void execute(ReadGraphImpl graph, int i) { - try { - procedure.execute(graph, querySupport.getResource(i)); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(ReadGraphImpl graph) { - try { - procedure.finished(graph); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } -// impl.state.barrier.dec(); - } - - }); - - } - - @Override - final public void query(final ReadGraphImpl impl, final AsyncRead request, final CacheEntry parent, final AsyncProcedure procedure, ListenerBase listener) { - - assert(request != null); - assert(procedure != null); - -// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(request, "#" + request.toString() + ".1999"); -// else impl.state.barrier.inc(null, null); - - runAsyncRead(impl, request, parent, listener, procedure); - - } - - @Override - final public T tryQuery(final ReadGraphImpl graph, final Read request) throws DatabaseException { - - assert(graph != null); - assert(request != null); - - final ReadEntry entry = readMap.get(request); - if(entry != null && entry.isReady()) { - return (T)entry.get(graph, this, null); - } else { - return request.perform(graph); - } - - } - - final public T tryQuery(final ReadGraphImpl graph, final ExternalRead request) throws DatabaseException { - - assert(graph != null); - assert(request != null); - - final ExternalReadEntry entry = externalReadMap.get(request); - if(entry != null && entry.isReady()) { - if(entry.isExcepted()) { - Throwable t = (Throwable)entry.getResult(); - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException(t); - } else { - return (T)entry.getResult(); - } - } else { - - final DataContainer result = new DataContainer(); - final DataContainer exception = new DataContainer(); - - request.register(graph, new Listener() { - - @Override - public void exception(Throwable t) { - exception.set(t); - } - - @Override - public void execute(T t) { - result.set(t); - } - - @Override - public boolean isDisposed() { - return true; - } - - }); - - Throwable t = exception.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException(t); - } - - return result.get(); - - } - - } - - @Override - final public void tryQuery(final ReadGraphImpl graph, final AsyncRead request, AsyncProcedure procedure) { - - assert(graph != null); - assert(request != null); - - final AsyncReadEntry entry = asyncReadMap.get(request); - if(entry != null && entry.isReady()) { - if(entry.isExcepted()) { - procedure.exception(graph, (Throwable)entry.getResult()); - } else { - procedure.execute(graph, (T)entry.getResult()); - } - } else { - request.perform(graph, procedure); - } - - } - - @Override - final public void query(final ReadGraphImpl impl, final MultiRead request, final CacheEntry parent, final AsyncMultiProcedure procedure, ListenerBase listener) { - - assert(request != null); - assert(procedure != null); - -// impl.state.barrier.inc(null, null); - - queryMultiRead(impl, request, parent, listener, procedure); - - } - - @Override - final public void query(final ReadGraphImpl impl, final AsyncMultiRead request, final CacheEntry parent, final AsyncMultiProcedure procedure, ListenerBase listener) { - - assert(request != null); - assert(procedure != null); - -// impl.state.barrier.inc(); - - runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure() { - - public void execute(AsyncReadGraph graph, T result) { - - try { - procedure.execute(graph, result); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public void finished(AsyncReadGraph graph) { - - try { - procedure.finished(graph); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - -// impl.state.barrier.dec(); - - } - - @Override - public String toString() { - return procedure.toString(); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - - try { - procedure.exception(graph, t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - -// impl.state.barrier.dec(); - - } - - }); - - } - - @Override - final public void query(final ReadGraphImpl impl, final ExternalRead request, final CacheEntry parent, final Procedure procedure, ListenerBase listener) { - - assert(request != null); - assert(procedure != null); - - queryPrimitiveRead(impl, request, parent, listener, new Procedure() { - - @Override - public void execute(T result) { - try { - procedure.execute(result); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - @Override - public String toString() { - return procedure.toString(); - } - - @Override - public void exception(Throwable t) { - try { - procedure.exception(t); - } catch (Throwable t2) { - Logger.defaultLogError(t2); - } - } - - }); - - } - - @Override - public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) { - - return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object)); - - } - - @Override - public VirtualGraph getProvider(Resource subject, Resource predicate) { - - return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate)); - - } - - @Override - public VirtualGraph getValueProvider(Resource subject) { - - return querySupport.getValueProvider(querySupport.getId(subject)); - - } - - public boolean resumeTasks(ReadGraphImpl graph) { - - return querySupport.resume(graph); - - } - - public boolean isImmutable(int resourceId) { - return querySupport.isImmutable(resourceId); - } - - public boolean isImmutable(Resource resource) { - ResourceImpl impl = (ResourceImpl)resource; - return isImmutable(impl.id); - } - - private Layer0 L0; - - public Layer0 getL0(ReadGraph graph) { - if(L0 == null) { - L0 = Layer0.getInstance(graph); - } - return L0; - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.db.impl.query; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.simantics.databoard.Bindings; +import org.simantics.db.AsyncReadGraph; +import org.simantics.db.DevelopmentKeys; +import org.simantics.db.DirectStatements; +import org.simantics.db.ReadGraph; +import org.simantics.db.RelationInfo; +import org.simantics.db.Resource; +import org.simantics.db.Session; +import org.simantics.db.Statement; +import org.simantics.db.VirtualGraph; +import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; +import org.simantics.db.common.utils.Logger; +import org.simantics.db.debug.ListenerReport; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.exception.ManyObjectsForFunctionalRelationException; +import org.simantics.db.exception.NoInverseException; +import org.simantics.db.exception.ResourceNotFoundException; +import org.simantics.db.impl.ResourceImpl; +import org.simantics.db.impl.graph.BarrierTracing; +import org.simantics.db.impl.graph.ReadGraphImpl; +import org.simantics.db.impl.graph.ReadGraphSupport; +import org.simantics.db.impl.graph.WriteGraphImpl; +import org.simantics.db.impl.procedure.IntProcedureAdapter; +import org.simantics.db.impl.procedure.InternalProcedure; +import org.simantics.db.impl.procedure.TripleIntProcedureAdapter; +import org.simantics.db.impl.support.ResourceSupport; +import org.simantics.db.procedure.AsyncMultiListener; +import org.simantics.db.procedure.AsyncMultiProcedure; +import org.simantics.db.procedure.AsyncProcedure; +import org.simantics.db.procedure.AsyncSetListener; +import org.simantics.db.procedure.ListenerBase; +import org.simantics.db.procedure.MultiProcedure; +import org.simantics.db.procedure.StatementProcedure; +import org.simantics.db.procedure.SyncMultiProcedure; +import org.simantics.db.request.AsyncMultiRead; +import org.simantics.db.request.ExternalRead; +import org.simantics.db.request.MultiRead; +import org.simantics.db.request.RequestFlags; +import org.simantics.layer0.Layer0; +import org.simantics.utils.DataContainer; +import org.simantics.utils.Development; +import org.simantics.utils.datastructures.Pair; +import org.simantics.utils.datastructures.collections.CollectionUtils; +import org.simantics.utils.datastructures.disposable.AbstractDisposable; + +import gnu.trove.procedure.TIntProcedure; +import gnu.trove.procedure.TLongProcedure; +import gnu.trove.procedure.TObjectProcedure; +import gnu.trove.set.hash.THashSet; +import gnu.trove.set.hash.TIntHashSet; + +@SuppressWarnings({"rawtypes", "unchecked"}) +final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport { + + public static int indent = 0; + + + // Garbage collection + + public int boundQueries = 0; + + + final private int functionalRelation; + + final private int superrelationOf; + + final private int instanceOf; + + final private int inverseOf; + + final private int asserts; + + final private int hasPredicate; + + final private int hasPredicateInverse; + + final private int hasObject; + + final private int inherits; + + final private int subrelationOf; + + final private int rootLibrary; + + /** + * A cache for the root library resource. Initialized in + * {@link #getRootLibraryResource()}. + */ + private volatile ResourceImpl rootLibraryResource; + + final private int library; + + final private int consistsOf; + + final private int hasName; + + AtomicInteger sleepers = new AtomicInteger(0); + + private boolean updating = false; + + + private boolean firingListeners = false; + + final public QueryCache cache; + final public QuerySupport querySupport; + final public Session session; + final public ResourceSupport resourceSupport; + + private THashSet scheduledListeners = new THashSet(); + + QueryThread[] executors; + +// public ArrayList[] queues; + + public LinkedList freeScheduling = new LinkedList(); + + enum ThreadState { + + INIT, RUN, SLEEP, DISPOSED + + } + + public ThreadState[] threadStates; +// public ReentrantLock[] threadLocks; +// public Condition[] threadConditions; + + //public ArrayList[] ownTasks; + + //public ArrayList[] ownSyncTasks; + + //ArrayList[] delayQueues; + + final Object querySupportLock; + + public Long modificationCounter = 0L; + + public void close() { + } + + public SessionTask getOwnTask(ReadGraphImpl impl) { + Set ancestors = impl.ancestorSet(); + synchronized(querySupportLock) { + int index = 0; + while(index < freeScheduling.size()) { + SessionTask task = freeScheduling.get(index); + if(task.hasCommonParent(ancestors)) { + return freeScheduling.remove(index); + } + index++; + } + } + return null; + } + + public SessionTask getSubTask(ReadGraphImpl impl) { + Set onlyThis = Collections.singleton(impl); + synchronized(querySupportLock) { + int index = 0; + while(index < freeScheduling.size()) { + SessionTask task = freeScheduling.get(index); + if(task.hasCommonParent(onlyThis)) { + return freeScheduling.remove(index); + } + index++; + } + } + return null; + } + + public boolean performPending(ReadGraphImpl graph) { + SessionTask task = getOwnTask(graph); + if(task != null) { + task.run(QueryProcessor.thread.get()); + return true; + } else { + return false; + } + } + +// final public void scheduleOwn(int caller, SessionTask request) { +// ownTasks[caller].add(request); +// } + + final public void schedule(SessionTask request) { + + //int performer = request.thread; + +// if(DebugPolicy.SCHEDULE) +// System.out.println("schedule " + request + " " + " -> " + performer); + + //assert(performer >= 0); + + assert(request != null); + +// if(caller == performer) { +// request.run(caller); +// } else { + +// if(performer == THREADS) { + + synchronized(querySupportLock) { + + if(BarrierTracing.BOOKKEEPING) { + Exception current = new Exception(); + Exception previous = BarrierTracing.tasks.put(request, current); + if(previous != null) { + previous.printStackTrace(); + current.printStackTrace(); + } + } + + freeScheduling.add(request); + + querySupportLock.notifyAll(); + + } + + return; + +// } +// +// ReentrantLock queueLock = threadLocks[performer]; +// queueLock.lock(); +// queues[performer].add(request); +// // This thread could have been sleeping +// if(queues[performer].size() == 1) { +// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet(); +// threadConditions[performer].signalAll(); +// } +// queueLock.unlock(); +// } + + } + + + final int THREADS; + final public int THREAD_MASK; + + final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group"); + + public static abstract class SessionTask { + + public final ReadGraphImpl graph; + private Set ancestors; + private int counter = 0; + private Exception trace; + + public SessionTask(ReadGraphImpl graph) { + this.graph = graph; + if(graph != null) graph.asyncBarrier.inc(); + } + + public boolean hasCommonParent(Set otherAncestors) { + if(graph == null) return false; + if(ancestors == null) ancestors = graph.ancestorSet(); + return !Collections.disjoint(ancestors, otherAncestors); + } + + public abstract void run0(int thread); + + public final void run(int thread) { + if(counter++ > 0) { + if(BarrierTracing.BOOKKEEPING) { + trace.printStackTrace(); + new Exception().printStackTrace(); + } + throw new IllegalStateException("Multiple invocations of SessionTask!"); + } + if(BarrierTracing.BOOKKEEPING) { + trace = new Exception(); + } + run0(thread); + if(graph != null) graph.asyncBarrier.dec(); + } + + @Override + public String toString() { + return "SessionTask[" + graph.parent + "]"; + } + + } + + public static abstract class SessionRead extends SessionTask { + + final public Semaphore notify; + final public DataContainer throwable; + + public SessionRead(DataContainer throwable, Semaphore notify) { + super(null); + this.throwable = throwable; + this.notify = notify; + } + + } + + long waitingTime = 0; + + static int koss = 0; + static int koss2 = 0; + + public boolean resume(ReadGraphImpl graph) { + return executors[0].runSynchronized(); + } + + //private WeakReference garbageTracker; + + private class GarbageTracker { + + @Override + protected void finalize() throws Throwable { + +// System.err.println("GarbageTracker"); +// +// garbageTracker = new WeakReference(new GarbageTracker()); + + super.finalize(); + + } + + } + + public QueryProcessor(final int threads, QuerySupport core, Set threadSet) + throws DatabaseException { + + //garbageTracker = new WeakReference(new GarbageTracker()); + + THREADS = threads; + THREAD_MASK = threads - 1; + + querySupport = core; + cache = new QueryCache(core, threads); + session = querySupport.getSession(); + resourceSupport = querySupport.getSupport(); + querySupportLock = core.getLock(); + + executors = new QueryThread[THREADS]; +// queues = new ArrayList[THREADS]; +// threadLocks = new ReentrantLock[THREADS]; +// threadConditions = new Condition[THREADS]; + threadStates = new ThreadState[THREADS]; +// ownTasks = new ArrayList[THREADS]; +// ownSyncTasks = new ArrayList[THREADS]; +// delayQueues = new ArrayList[THREADS * THREADS]; + + // freeSchedule = new AtomicInteger(0); + +// for (int i = 0; i < THREADS * THREADS; i++) { +// delayQueues[i] = new ArrayList(); +// } + + for (int i = 0; i < THREADS; i++) { + + // tasks[i] = new ArrayList(); +// ownTasks[i] = new ArrayList(); +// ownSyncTasks[i] = new ArrayList(); +// queues[i] = new ArrayList(); +// threadLocks[i] = new ReentrantLock(); +// threadConditions[i] = threadLocks[i].newCondition(); + // limits[i] = false; + threadStates[i] = ThreadState.INIT; + + } + + for (int i = 0; i < THREADS; i++) { + + final int index = i; + + executors[i] = new QueryThread(session, this, index, "Query Thread " + index); + + threadSet.add(executors[i]); + + } + + // Now start threads + for (int i = 0; i < THREADS; i++) { + executors[i].start(); + } + + // Make sure that query threads are up and running + while(sleepers.get() != THREADS) { + try { + Thread.sleep(5); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + rootLibrary = core.getBuiltin("http:/"); + boolean builtinsInstalled = rootLibrary != 0; + + if (builtinsInstalled) { + functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation); + assert (functionalRelation != 0); + } else + functionalRelation = 0; + + if (builtinsInstalled) { + instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf); + assert (instanceOf != 0); + } else + instanceOf = 0; + + if (builtinsInstalled) { + inverseOf = core.getBuiltin(Layer0.URIs.InverseOf); + assert (inverseOf != 0); + } else + inverseOf = 0; + + + if (builtinsInstalled) { + inherits = core.getBuiltin(Layer0.URIs.Inherits); + assert (inherits != 0); + } else + inherits = 0; + + if (builtinsInstalled) { + asserts = core.getBuiltin(Layer0.URIs.Asserts); + assert (asserts != 0); + } else + asserts = 0; + + if (builtinsInstalled) { + hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate); + assert (hasPredicate != 0); + } else + hasPredicate = 0; + + if (builtinsInstalled) { + hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse); + assert (hasPredicateInverse != 0); + } else + hasPredicateInverse = 0; + + if (builtinsInstalled) { + hasObject = core.getBuiltin(Layer0.URIs.HasObject); + assert (hasObject != 0); + } else + hasObject = 0; + + if (builtinsInstalled) { + subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf); + assert (subrelationOf != 0); + } else + subrelationOf = 0; + + if (builtinsInstalled) { + superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf); + assert (superrelationOf != 0); + } else + superrelationOf = 0; + + if (builtinsInstalled) { + library = core.getBuiltin(Layer0.URIs.Library); + assert (library != 0); + } else + library = 0; + + if (builtinsInstalled) { + consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf); + assert (consistsOf != 0); + } else + consistsOf = 0; + + if (builtinsInstalled) { + hasName = core.getBuiltin(Layer0.URIs.HasName); + assert (hasName != 0); + } else + hasName = 0; + + } + + final public void releaseWrite(ReadGraphImpl graph) { + performDirtyUpdates(graph); + modificationCounter++; + } + + final public int getId(final Resource r) { + return querySupport.getId(r); + } + + public QuerySupport getCore() { + return querySupport; + } + + public int getFunctionalRelation() { + return functionalRelation; + } + + public int getInherits() { + return inherits; + } + + public int getInstanceOf() { + return instanceOf; + } + + public int getInverseOf() { + return inverseOf; + } + + public int getSubrelationOf() { + return subrelationOf; + } + + public int getSuperrelationOf() { + return superrelationOf; + } + + public int getAsserts() { + return asserts; + } + + public int getHasPredicate() { + return hasPredicate; + } + + public int getHasPredicateInverse() { + return hasPredicateInverse; + } + + public int getHasObject() { + return hasObject; + } + + public int getRootLibrary() { + return rootLibrary; + } + + public Resource getRootLibraryResource() { + if (rootLibraryResource == null) { + // Synchronization is not needed here, it doesn't matter if multiple + // threads simultaneously set rootLibraryResource once. + int root = getRootLibrary(); + if (root == 0) + throw new UnsupportedOperationException("database is not initialized, cannot get root library resource"); + this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root); + } + return rootLibraryResource; + } + + public int getLibrary() { + return library; + } + + public int getConsistsOf() { + return consistsOf; + } + + public int getHasName() { + return hasName; + } + + public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure procedure) { + + try { + + QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure() { + + @Override + public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException { + + if (result != null && result != 0) { + procedure.execute(graph, result); + return; + } + + // Fall back to using the fixed builtins. +// result = querySupport.getBuiltin(id); +// if (result != 0) { +// procedure.execute(graph, result); +// return; +// } + +// try { +// result = querySupport.getRandomAccessReference(id); +// } catch (ResourceNotFoundException e) { +// procedure.exception(graph, e); +// return; +// } + + if (result != 0) { + procedure.execute(graph, result); + } else { + procedure.exception(graph, new ResourceNotFoundException(id)); + } + + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException { + procedure.exception(graph, t); + } + + }); + } catch (DatabaseException e) { + + try { + + procedure.exception(graph, e); + + } catch (DatabaseException e1) { + + Logger.defaultLogError(e1); + + } + + } + + } + + public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure procedure) throws DatabaseException { + + Integer result = querySupport.getBuiltin(id); + if (result != 0) { + procedure.execute(graph, result); + } else { + procedure.exception(graph, new ResourceNotFoundException(id)); + } + + } + + final void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure procedure) { + + try { + QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure); + } catch (DatabaseException e) { + throw new IllegalStateException(e); + } + + } + + public final void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure procedure) { + + + try { + QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure); + } catch (DatabaseException e) { + throw new IllegalStateException(e); + } + + } + + final void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure procedure) throws DatabaseException { + QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure); + } + +// @Override +// public T query(final ReadGraphImpl graph, final Read query, final CacheEntry parent, final AsyncProcedure procedure, final ListenerBase listener) throws DatabaseException { +// +// return QueryCache.resultReadEntry(graph, query, parent, listener, procedure); +// +// } + + public void queryMultiRead(final ReadGraphImpl graph, final MultiRead query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure procedure) throws DatabaseException { + + QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure); + + } + + public void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure procedure) throws DatabaseException { + + QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure); + + } + + boolean isBound(ExternalReadEntry entry) { + if(entry.hasParents()) return true; + else if(hasListener(entry)) return true; + else return false; + } + + synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { + + if (parent != null && !inferred) { + try { + if(!child.isImmutable(graph)) + child.addParent(parent); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { + System.out.println(child + " -> " + parent); + } + } + } + + if (listener != null) { + return registerListener(child, listener, procedure); + } else { + return null; + } + + } + + + static class Dummy implements InternalProcedure, IntProcedure { + + @Override + public void execute(ReadGraphImpl graph, int i) { + } + + @Override + public void finished(ReadGraphImpl graph) { + } + + @Override + public void execute(ReadGraphImpl graph, Object result) { + } + + @Override + public void exception(ReadGraphImpl graph, Throwable throwable) { + } + + } + + private static final Dummy dummy = new Dummy(); + + /* + public Object performForEach2(ReadGraphImpl graph, UnaryQuery query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable { + + if (DebugPolicy.PERFORM) + System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); + + assert (!dirty); + assert (!collecting); + + assert(query.assertNotDiscarded()); + + registerDependencies(graph, query, parent, listener, procedure, false); + + // FRESH, REFUTED, EXCEPTED go here + if (!query.isReady()) { + + size++; + misses++; + + query.computeForEach(graph, this, (Procedure)dummy, true); + return query.get(graph, this, null); + + } else { + + hits++; + + return query.get(graph, this, procedure); + + } + + } + */ + + + interface QueryCollectorSupport { + public CacheCollectionResult allCaches(); + public Collection getRootList(); + public int getCurrentSize(); + public int calculateCurrentSize(); + public CacheEntryBase iterate(int level); + public void remove(); + public void setLevel(CacheEntryBase entry, int level); + public boolean start(boolean flush); + } + + interface QueryCollector { + + public void collect(int youngTarget, int allowedTimeInMs); + + } + + class QueryCollectorSupportImpl implements QueryCollectorSupport { + + private static final boolean DEBUG = false; + private static final double ITERATION_RATIO = 0.2; + + private CacheCollectionResult iteration = new CacheCollectionResult(); + private boolean fresh = true; + private boolean needDataInStart = true; + + QueryCollectorSupportImpl() { + iteration.restart(); + } + + public CacheCollectionResult allCaches() { + CacheCollectionResult result = new CacheCollectionResult(); + QueryProcessor.this.allCaches(result); + result.restart(); + return result; + } + + public boolean start(boolean flush) { + // We need new data from query maps + fresh = true; + if(needDataInStart || flush) { + // Last run ended after processing all queries => refresh data + restart(flush ? 0.0 : ITERATION_RATIO); + } else { + // continue with previous big data + } + // Notify caller about iteration situation + return iteration.isAtStart(); + } + + private void restart(double targetRatio) { + + needDataInStart = true; + + long start = System.nanoTime(); + if(fresh) { + + // We need new data from query maps + + int iterationSize = iteration.size()+1; + int diff = calculateCurrentSize()-iterationSize; + + double ratio = (double)diff / (double)iterationSize; + boolean dirty = Math.abs(ratio) >= targetRatio; + + if(dirty) { + iteration = allCaches(); + if(DEBUG) { + System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") "); + for(int i=0;i getRootList() { + return cache.getRootList(); + } + + @Override + public int calculateCurrentSize() { + return cache.calculateCurrentSize(); + } + + @Override + public int getCurrentSize() { + return cache.size; + } + + } + // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600); + + private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl(); + private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport); + + public int querySize() { + return cache.size; + } + + public void gc(int youngTarget, int allowedTimeInMs) { + + collector.collect(youngTarget, allowedTimeInMs); + + } + + public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { + + assert (entry != null); + + if (base.isDisposed()) + return null; + + return addListener(entry, base, procedure); + + } + + private void primeListenerEntry(final ListenerEntry entry, final Object result) { + entry.setLastKnown(result); + } + + private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { + + assert (entry != null); + assert (procedure != null); + + ArrayList list = cache.listeners.get(entry); + if (list == null) { + list = new ArrayList(1); + cache.listeners.put(entry, list); + } + + ListenerEntry result = new ListenerEntry(entry, base, procedure); + int currentIndex = list.indexOf(result); + // There was already a listener + if(currentIndex > -1) { + ListenerEntry current = list.get(currentIndex); + if(!current.base.isDisposed()) return null; + list.set(currentIndex, result); + } else { + list.add(result); + } + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); + } + } + + return result; + + } + + private void scheduleListener(ListenerEntry entry) { + assert (entry != null); + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Scheduled " + entry.procedure); + } + } + scheduledListeners.add(entry); + } + + private void removeListener(ListenerEntry entry) { + assert (entry != null); + ArrayList list = cache.listeners.get(entry.entry); + if(list == null) return; + boolean success = list.remove(entry); + assert (success); + if (list.isEmpty()) + cache.listeners.remove(entry.entry); + } + + private boolean hasListener(CacheEntry entry) { + if(cache.listeners.get(entry) != null) return true; + return false; + } + + boolean hasListenerAfterDisposing(CacheEntry entry) { + if(cache.listeners.get(entry) != null) { + ArrayList entries = cache.listeners.get(entry); + ArrayList list = null; + for (ListenerEntry e : entries) { + if (e.base.isDisposed()) { + if(list == null) list = new ArrayList(); + list.add(e); + } + } + if(list != null) { + for (ListenerEntry e : list) { + entries.remove(e); + } + } + if (entries.isEmpty()) { + cache.listeners.remove(entry); + return false; + } + return true; + } + return false; + } + + List getListenerEntries(CacheEntry entry) { + hasListenerAfterDisposing(entry); + if(cache.listeners.get(entry) != null) + return cache.listeners.get(entry); + else + return Collections.emptyList(); + } + + void processListenerReport(CacheEntry entry, Map> workarea) { + + if(!workarea.containsKey(entry)) { + + HashSet ls = new HashSet(); + for(ListenerEntry e : getListenerEntries(entry)) + ls.add(e.base); + + workarea.put(entry, ls); + + for(CacheEntry parent : entry.getParents(this)) { + processListenerReport(parent, workarea); + ls.addAll(workarea.get(parent)); + } + + } + + } + + public synchronized ListenerReport getListenerReport() throws IOException { + + class ListenerReportImpl implements ListenerReport { + + Map> workarea = new HashMap>(); + + @Override + public void print(PrintStream b) { + Map hist = new HashMap(); + for(Map.Entry> e : workarea.entrySet()) { + for(ListenerBase l : e.getValue()) { + Integer i = hist.get(l); + hist.put(l, i != null ? i-1 : -1); + } + } + + for(Pair p : CollectionUtils.valueSortedEntries(hist)) { + b.print("" + -p.second + " " + p.first + "\n"); + } + + b.flush(); + } + + } + + ListenerReportImpl result = new ListenerReportImpl(); + + Collection all = allCaches(new CacheCollectionResult()).toCollection(); + for(CacheEntryBase entry : all) { + hasListenerAfterDisposing(entry); + } + for(CacheEntryBase entry : all) { + processListenerReport(entry, result.workarea); + } + + return result; + + } + + public synchronized String reportListeners(File file) throws IOException { + + if (!isAlive()) + return "Disposed!"; + + PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); + ListenerReport report = getListenerReport(); + report.print(b); + + return "Done reporting listeners."; + + } + + void processParentReport(CacheEntry entry, Map> workarea) { + + if(entry.isDiscarded()) return; + if(workarea.containsKey(entry)) return; + + Iterable parents = entry.getParents(this); + HashSet ps = new HashSet(); + for(CacheEntry e : parents) { + if(e.isDiscarded()) continue; + ps.add(e); + processParentReport(e, workarea); + } + workarea.put(entry, ps); + + } + + public synchronized String reportQueryActivity(File file) throws IOException { + + System.err.println("reportQueries " + file.getAbsolutePath()); + + if (!isAlive()) + return "Disposed!"; + + PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); + + List> entries = CollectionUtils.valueSortedEntries(Development.histogram); + Collections.reverse(entries); + + for(Pair entry : entries) { + b.println(entry.first + ": " + entry.second); + } + + b.close(); + + Development.histogram.clear(); + + return "OK"; + + } + + public synchronized String reportQueries(File file) throws IOException { + + System.err.println("reportQueries " + file.getAbsolutePath()); + + if (!isAlive()) + return "Disposed!"; + + PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); + + long start = System.nanoTime(); + +// ArrayList all = ; + + Map> workarea = new HashMap>(); + Collection caches = allCaches(new CacheCollectionResult()).toCollection(); + for(CacheEntryBase entry : caches) { + processParentReport(entry, workarea); + } + + // for(CacheEntry e : all) System.err.println("entry: " + e); + + long duration = System.nanoTime() - start; + System.err.println("Query root set in " + 1e-9*duration + "s."); + + start = System.nanoTime(); + + HashMap flagMap = new HashMap(); + + int listeners = 0; + + for(CacheEntry entry : workarea.keySet()) { + boolean listener = hasListenerAfterDisposing(entry); + boolean hasParents = entry.getParents(this).iterator().hasNext(); + if(listener) { + // Bound + flagMap.put(entry, 0); + } else if (!hasParents) { + // Unbound + flagMap.put(entry, 1); + } else { + // Unknown + flagMap.put(entry, 2); + } + // // Write leaf bit + // entry.flags |= 4; + } + + boolean done = true; + int loops = 0; + + do { + + done = true; + + long start2 = System.nanoTime(); + + int boundCounter = 0; + int unboundCounter = 0; + int unknownCounter = 0; + + for(CacheEntry entry : workarea.keySet()) { + + //System.err.println("process " + entry); + + int flags = flagMap.get(entry); + int bindStatus = flags & 3; + + if(bindStatus == 0) boundCounter++; + else if(bindStatus == 1) unboundCounter++; + else if(bindStatus == 2) unknownCounter++; + + if(bindStatus < 2) continue; + + int newStatus = 1; + for(CacheEntry parent : entry.getParents(this)) { + + if(parent.isDiscarded()) flagMap.put(parent, 1); + + int flags2 = flagMap.get(parent); + int bindStatus2 = flags2 & 3; + // Parent is bound => child is bound + if(bindStatus2 == 0) { + newStatus = 0; + break; + } + // Parent is unknown => child is unknown + else if (bindStatus2 == 2) { + newStatus = 2; + done = false; + break; + } + } + + flagMap.put(entry, newStatus); + + } + + duration = System.nanoTime() - start2; + System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s."); + b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s."); + + } while(!done && loops++ < 20); + + if(loops >= 20) { + + for(CacheEntry entry : workarea.keySet()) { + + int bindStatus = flagMap.get(entry); + if(bindStatus == 2) System.err.println("Undefined bind status for " + entry); + + } + + } + + duration = System.nanoTime() - start; + System.err.println("Query analysis in " + 1e-9*duration + "s."); + + Map, Integer> counts = new HashMap, Integer>(); + + for(CacheEntry entry : workarea.keySet()) { + Class clazz = entry.getClass(); + if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass(); + else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass(); + else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass(); + else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass(); + else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass(); + Integer c = counts.get(clazz); + if(c == null) counts.put(clazz, -1); + else counts.put(clazz, c-1); + } + + b.print("// Simantics DB client query report file\n"); + b.print("// This file contains the following information\n"); + b.print("// -The amount of cached query instances per query class\n"); + b.print("// -The sizes of retained child sets\n"); + b.print("// -List of parents for each query (search for 'P ')\n"); + b.print("// -Followed by status, where\n"); + b.print("// -0=bound\n"); + b.print("// -1=free\n"); + b.print("// -2=unknown\n"); + b.print("// -L=has listener\n"); + b.print("// -List of children for each query (search for 'C ')\n"); + + b.print("----------------------------------------\n"); + + b.print("// Queries by class\n"); + for(Pair, Integer> p : CollectionUtils.valueSortedEntries(counts)) { + b.print(-p.second + " " + p.first.getName() + "\n"); + } + + Map hist = new HashMap(); + for(CacheEntry e : workarea.keySet()) + hist.put(e, -1); + + boolean changed = true; + int iter = 0; + while(changed && iter++<50) { + + changed = false; + + Map newHist = new HashMap(); + for(CacheEntry e : workarea.keySet()) + newHist.put(e, -1); + + for(Map.Entry> e : workarea.entrySet()) { + Integer c = hist.get(e.getKey()); + for(CacheEntry p : e.getValue()) { + Integer i = newHist.get(p); + newHist.put(p, i+c); + } + } + for(CacheEntry e : workarea.keySet()) { + Integer value = newHist.get(e); + Integer old = hist.get(e); + if(!value.equals(old)) { + hist.put(e, value); +// System.err.println("hist " + e + ": " + old + " => " + value); + changed = true; + } + } + + System.err.println("Retained set iteration " + iter); + + } + + b.print("// Queries by retained set\n"); + for(Pair p : CollectionUtils.valueSortedEntries(hist)) { + b.print("" + -p.second + " " + p.first + "\n"); + } + + HashMap> inverse = new HashMap>(); + + b.print("// Entry parent listing\n"); + for(CacheEntry entry : workarea.keySet()) { + int status = flagMap.get(entry); + boolean hasListener = hasListenerAfterDisposing(entry); + b.print("Q " + entry.toString()); + if(hasListener) { + b.print(" (L" + status + ")"); + listeners++; + } else { + b.print(" (" + status + ")"); + } + b.print("\n"); + for(CacheEntry parent : workarea.get(entry)) { + Collection inv = inverse.get(parent); + if(inv == null) { + inv = new ArrayList(); + inverse.put(parent, inv); + } + inv.add(entry); + b.print(" " + parent.toString()); + b.print("\n"); + } + } + + b.print("// Entry child listing\n"); + for(Map.Entry> entry : inverse.entrySet()) { + b.print("C " + entry.getKey().toString()); + b.print("\n"); + for(CacheEntry child : entry.getValue()) { + Integer h = hist.get(child); + if(h != null) { + b.print(" " + h); + } else { + b.print(" "); + } + b.print(" " + child.toString()); + b.print("\n"); + } + } + + b.print("#queries: " + workarea.keySet().size() + "\n"); + b.print("#listeners: " + listeners + "\n"); + + b.close(); + + return "Dumped " + workarea.keySet().size() + " queries."; + + } + + class UpdateEntry { + + public CacheEntry caller; + + public CacheEntry entry; + + public int indent; + + public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) { + this.caller = caller; + this.entry = entry; + this.indent = indent; + } + + }; + + boolean removeQuery(CacheEntry entry) { + + // This entry has been removed before. No need to do anything here. + if(entry.isDiscarded()) return false; + + assert (!entry.isDiscarded()); + + Query query = entry.getQuery(); + + query.removeEntry(this); + + cache.updates++; + cache.size--; + + if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0) + boundQueries--; + + entry.discard(); + + return true; + + } + + /** + * + * @return true if this entry is being listened + */ + private boolean updateQuery(UpdateEntry e, LinkedList todo, IdentityHashMap immediates) throws DatabaseException { + + assert (e != null); + + CacheEntry entry = e.entry; + + /* + * If the dependency graph forms a DAG, some entries are inserted in the + * todo list many times. They only need to be processed once though. + */ + if (entry.isDiscarded()) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + System.err.print("D"); + for (int i = 0; i < e.indent; i++) + System.err.print(" "); + System.err.println(entry.getQuery()); + } + } +// System.err.println(" => DISCARDED"); + return false; + } + +// if (entry.isRefuted()) { +// if (Development.DEVELOPMENT) { +// if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { +// System.err.print("R"); +// for (int i = 0; i < e.indent; i++) +// System.err.print(" "); +// System.err.println(entry.getQuery()); +// } +// } +// return false; +// } + + if (entry.isExcepted()) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + System.err.print("E"); + } + } + } + + if (entry.isPending()) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + System.err.print("P"); + } + } + } + + cache.updates++; + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + System.err.print("U "); + for (int i = 0; i < e.indent; i++) + System.err.print(" "); + System.err.print(entry.getQuery()); + } + } + + Query query = entry.getQuery(); + int type = query.type(); + + boolean hasListener = hasListener(entry); + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + if(hasListener(entry)) { + System.err.println(" (L)"); + } else { + System.err.println(""); + } + } + } + + if(entry.isPending() || entry.isExcepted()) { + + // If updated + if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) { + + immediates.put(entry, entry); + + } else { + + if(hasListener) { + entry.refute(); + } else { + removeQuery(entry); + } + + } + + } else { + + // If updated + if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) { + + immediates.put(entry, entry); + + } else { + + if(hasListener) { + entry.refute(); + } else { + removeQuery(entry); + } + + } + + } + +// System.err.println(" => FOO " + type); + + if (hasListener) { + ArrayList entries = cache.listeners.get(entry); + if(entries != null) { + for (ListenerEntry le : entries) { + scheduleListener(le); + } + } + } + + // If invalid, update parents + if (type == RequestFlags.INVALIDATE) { + updateParents(e.indent, entry, todo); + } + + return hasListener; + + } + + private void updateParents(int indent, CacheEntry entry, LinkedList todo) { + + Iterable oldParents = entry.getParents(this); + for (CacheEntry parent : oldParents) { +// System.err.println("updateParents " + entry + " => " + parent); + if(!parent.isDiscarded()) + todo.push(new UpdateEntry(entry, parent, indent + 2)); + } + + } + + private boolean pruneListener(ListenerEntry entry) { + if (entry.base.isDisposed()) { + removeListener(entry); + return true; + } else { + return false; + } + } + + /** + * @param av1 an array (guaranteed) + * @param av2 any object + * @return true if the two arrays are equal + */ + private final boolean arrayEquals(Object av1, Object av2) { + if (av2 == null) + return false; + Class c1 = av1.getClass().getComponentType(); + Class c2 = av2.getClass().getComponentType(); + if (c2 == null || !c1.equals(c2)) + return false; + boolean p1 = c1.isPrimitive(); + boolean p2 = c2.isPrimitive(); + if (p1 != p2) + return false; + if (!p1) + return Arrays.equals((Object[]) av1, (Object[]) av2); + if (boolean.class.equals(c1)) + return Arrays.equals((boolean[]) av1, (boolean[]) av2); + else if (byte.class.equals(c1)) + return Arrays.equals((byte[]) av1, (byte[]) av2); + else if (int.class.equals(c1)) + return Arrays.equals((int[]) av1, (int[]) av2); + else if (long.class.equals(c1)) + return Arrays.equals((long[]) av1, (long[]) av2); + else if (float.class.equals(c1)) + return Arrays.equals((float[]) av1, (float[]) av2); + else if (double.class.equals(c1)) + return Arrays.equals((double[]) av1, (double[]) av2); + throw new RuntimeException("??? Contact application querySupport."); + } + + + + final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) { + + try { + + Query query = entry.getQuery(); + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_RECOMPUTE, Bindings.BOOLEAN)) { + System.err.println("R " + query); + } + } + + entry.prepareRecompute(querySupport); + + ReadGraphImpl parentGraph = graph.forRecompute(entry); + + query.recompute(parentGraph); + + if(entry.isExcepted()) return ListenerEntry.NO_VALUE; + + Object newValue = entry.getResult(); + + if (ListenerEntry.NO_VALUE == oldValue) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) { + System.out.println("C " + query); + System.out.println("- " + oldValue); + System.out.println("- " + newValue); + } + } + return newValue; + } + + boolean changed = false; + + if (newValue != null) { + if (newValue.getClass().isArray()) { + changed = !arrayEquals(newValue, oldValue); + } else { + changed = !newValue.equals(oldValue); + } + } else + changed = (oldValue != null); + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) { + System.err.println("C " + query); + System.err.println("- " + oldValue); + System.err.println("- " + newValue); + } + } + + return changed ? newValue : ListenerEntry.NOT_CHANGED; + + } catch (Throwable t) { + + Logger.defaultLogError(t); + entry.except(t); + return ListenerEntry.NO_VALUE; + + } + + } + + public boolean hasScheduledUpdates() { + return !scheduledListeners.isEmpty(); + } + + public void performScheduledUpdates(WriteGraphImpl graph) { + + assert (!updating); + assert (!cache.collecting); + assert (!firingListeners); + + firingListeners = true; + + try { + + // Performing may cause further events to be scheduled. + while (!scheduledListeners.isEmpty()) { + +// graph.restart(); +// graph.state.barrier.inc(); + + // Clone current events to make new entries possible during + // firing. + THashSet entries = scheduledListeners; + scheduledListeners = new THashSet(); + + ArrayList schedule = new ArrayList(); + + for (ListenerEntry listenerEntry : entries) { + + if (pruneListener(listenerEntry)) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("Pruned " + listenerEntry.procedure); + } + } + continue; + } + + final CacheEntry entry = listenerEntry.entry; + assert (entry != null); + + Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown()); + + if (newValue != ListenerEntry.NOT_CHANGED) { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + new Exception().printStackTrace(); + System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); + } + } + schedule.add(listenerEntry); + listenerEntry.setLastKnown(entry.getResult()); + } + + } + + for(ListenerEntry listenerEntry : schedule) { + final CacheEntry entry = listenerEntry.entry; + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Firing " + listenerEntry.procedure); + } + } + try { + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { + System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); + } + } + entry.performFromCache(graph, listenerEntry.procedure); + } catch (Throwable t) { + t.printStackTrace(); + } + } + +// graph.state.barrier.dec(); +// graph.waitAsync(null); +// graph.state.barrier.assertReady(); + + } + + } finally { + firingListeners = false; + } + + } + + /** + * + * @return true if this entry still has listeners + */ + public boolean update(final ReadGraphImpl graph, final CacheEntry entry) { + + assert (!cache.collecting); + assert (!updating); + updating = true; + + boolean hadListeners = false; + boolean listenersUnknown = false; + + try { + + assert(entry != null); + LinkedList todo = new LinkedList(); + IdentityHashMap immediates = new IdentityHashMap(); + todo.add(new UpdateEntry(null, entry, 0)); + + while(true) { + + // Walk the tree and collect immediate updates + while (!todo.isEmpty()) { + UpdateEntry e = todo.pop(); + hadListeners |= updateQuery(e, todo, immediates); + } + + if(immediates.isEmpty()) break; + + // Evaluate all immediate updates and collect parents to update + for(CacheEntry immediate : immediates.values()) { + + if(immediate.isDiscarded()) { + continue; + } + + if(immediate.isExcepted()) { + + Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE); + if (newValue != ListenerEntry.NOT_CHANGED) + updateParents(0, immediate, todo); + + } else { + + Object oldValue = immediate.getResult(); + Object newValue = compareTo(graph, immediate, oldValue); + + if (newValue != ListenerEntry.NOT_CHANGED) { + updateParents(0, immediate, todo); + } else { + // If not changed, keep the old value + immediate.setResult(oldValue); + immediate.setReady(); + listenersUnknown = true; + } + + } + + } + immediates.clear(); + + } + + } catch (Throwable t) { + Logger.defaultLogError(t); + } + + assert (updating); + updating = false; + + return hadListeners | listenersUnknown; + + } + + private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet(); + private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet(); + private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet(); + // Maybe use a mutex from util.concurrent? + private Object primitiveUpdateLock = new Object(); + private THashSet scheduledPrimitiveUpdates = new THashSet(); + + private ArrayList refutations = new ArrayList<>(); + + private void markForUpdate(ReadGraphImpl graph, CacheEntry e) { + e.refute(); + refutations.add(e); + } + + private void updateRefutations(ReadGraphImpl graph) { + + for(CacheEntry e : refutations) + update(graph, e); + + refutations.clear(); + + } + + public void performDirtyUpdates(final ReadGraphImpl graph) { + + cache.dirty = false; + lastInvalidate = 0; + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + System.err.println("== Query update =="); + } + } + + // Special case - one statement + if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) { + + long arg0 = scheduledObjectUpdates.getFirst(); + + final int subject = (int)(arg0 >>> 32); + final int predicate = (int)(arg0 & 0xffffffff); + + for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o); + for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o); + for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o); + + if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) { + PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject); + if(principalTypes != null) markForUpdate(graph, principalTypes); + Types types = QueryCache.entryTypes(QueryProcessor.this, subject); + if(types != null) markForUpdate(graph, types); + } + + if(predicate == subrelationOf) { + SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject); + if(superRelations != null) markForUpdate(graph, superRelations); + } + + DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject); + if(dp != null) markForUpdate(graph, dp); + OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate); + if(os != null) markForUpdate(graph, os); + + updateRefutations(graph); + + scheduledObjectUpdates.clear(); + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + System.err.println("== Query update ends =="); + } + } + + return; + + } + + // Special case - one value + if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) { + + int arg0 = scheduledValueUpdates.getFirst(); + + ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0); + if(valueQuery != null) markForUpdate(graph, valueQuery); + + updateRefutations(graph); + + scheduledValueUpdates.clear(); + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + System.err.println("== Query update ends =="); + } + } + + return; + + } + + final TIntHashSet predicates = new TIntHashSet(); + final TIntHashSet orderedSets = new TIntHashSet(); + + THashSet primitiveUpdates; + synchronized (primitiveUpdateLock) { + primitiveUpdates = scheduledPrimitiveUpdates; + scheduledPrimitiveUpdates = new THashSet(); + } + + scheduledValueUpdates.forEach(new TIntProcedure() { + + @Override + public boolean execute(int arg0) { + ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0); + if(valueQuery != null) markForUpdate(graph, valueQuery); + return true; + } + + }); + + scheduledInvalidates.forEach(new TIntProcedure() { + + @Override + public boolean execute(int resource) { + + ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource); + if(valueQuery != null) markForUpdate(graph, valueQuery); + + PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource); + if(principalTypes != null) markForUpdate(graph, principalTypes); + Types types = QueryCache.entryTypes(QueryProcessor.this, resource); + if(types != null) markForUpdate(graph, types); + + SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource); + if(superRelations != null) markForUpdate(graph, superRelations); + + predicates.add(resource); + + return true; + } + + }); + + scheduledObjectUpdates.forEach(new TLongProcedure() { + + @Override + public boolean execute(long arg0) { + + final int subject = (int)(arg0 >>> 32); + final int predicate = (int)(arg0 & 0xffffffff); + + if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) { + PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject); + if(principalTypes != null) markForUpdate(graph, principalTypes); + Types types = QueryCache.entryTypes(QueryProcessor.this, subject); + if(types != null) markForUpdate(graph, types); + } + + if(predicate == subrelationOf) { + SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject); + if(superRelations != null) markForUpdate(graph, superRelations); + } + + predicates.add(subject); + orderedSets.add(predicate); + + return true; + + } + + }); + + predicates.forEach(new TIntProcedure() { + + @Override + public boolean execute(final int subject) { + + for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o); + for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o); + for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o); + + DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject); + if(entry != null) markForUpdate(graph, entry); + + return true; + + } + + }); + + orderedSets.forEach(new TIntProcedure() { + + @Override + public boolean execute(int orderedSet) { + + OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet); + if(entry != null) markForUpdate(graph, entry); + + return true; + + } + + }); + + updateRefutations(graph); + + primitiveUpdates.forEach(new TObjectProcedure() { + + @Override + public boolean execute(Object arg0) { + + ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0); + if (query != null) { + boolean listening = update(graph, query); + if (!listening && !query.hasParents()) { + cache.externalReadEntryMap.remove(arg0); + query.discard(); + } + } + return true; + } + + }); + + scheduledValueUpdates.clear(); + scheduledObjectUpdates.clear(); + scheduledInvalidates.clear(); + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + System.err.println("== Query update ends =="); + } + } + + } + + public void updateValue(final int resource) { + scheduledValueUpdates.add(resource); + cache.dirty = true; + } + + public void updateStatements(final int resource, final int predicate) { + scheduledObjectUpdates.add((((long)resource) << 32) + predicate); + cache.dirty = true; + } + + private int lastInvalidate = 0; + + public void invalidateResource(final int resource) { + if(lastInvalidate == resource) return; + scheduledValueUpdates.add(resource); + lastInvalidate = resource; + cache.dirty = true; + } + + public void updatePrimitive(final ExternalRead primitive) { + + // External reads may be updated from arbitrary threads. + // Synchronize to prevent race-conditions. + synchronized (primitiveUpdateLock) { + scheduledPrimitiveUpdates.add(primitive); + } + querySupport.dirtyPrimitives(); + + } + + @Override + public synchronized String toString() { + return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]"; + } + + @Override + protected void doDispose() { + + for(int index = 0; index < THREADS; index++) { + executors[index].dispose(); + } + + // First just wait + for(int i=0;i<100;i++) { + + boolean alive = false; + for(int index = 0; index < THREADS; index++) { + alive |= executors[index].isAlive(); + } + if(!alive) return; + try { + Thread.sleep(5); + } catch (InterruptedException e) { + Logger.defaultLogError(e); + } + + } + + // Then start interrupting + for(int i=0;i<100;i++) { + + boolean alive = false; + for(int index = 0; index < THREADS; index++) { + alive |= executors[index].isAlive(); + } + if(!alive) return; + for(int index = 0; index < THREADS; index++) { + executors[index].interrupt(); + } + } + + // // Then just destroy + // for(int index = 0; index < THREADS; index++) { + // executors[index].destroy(); + // } + + for(int index = 0; index < THREADS; index++) { + try { + executors[index].join(5000); + } catch (InterruptedException e) { + Logger.defaultLogError("QueryThread " + index + " will not die.", e); + } + executors[index] = null; + } + + } + + public int getHits() { + return cache.hits; + } + + public int getMisses() { + return cache.misses; + } + + public int getSize() { + return cache.size; + } + + public Set getReferencedClusters() { + HashSet result = new HashSet(); + for (CacheEntry entry : QueryCache.entriesObjects(this)) { + Objects query = (Objects) entry.getQuery(); + result.add(querySupport.getClusterId(query.r1())); + } + for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) { + DirectPredicates query = (DirectPredicates) entry.getQuery(); + result.add(querySupport.getClusterId(query.id)); + } + for (CacheEntry entry : cache.valueQueryMap.values()) { + ValueQuery query = (ValueQuery) entry.getQuery(); + result.add(querySupport.getClusterId(query.id)); + } + return result; + } + + public void assertDone() { + } + + CacheCollectionResult allCaches(CacheCollectionResult result) { + + return cache.allCaches(result); + + } + + public void printDiagnostics() { + } + + public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) { + querySupport.requestCluster(graph, clusterId, runnable); + } + + public int clean() { + collector.collect(0, Integer.MAX_VALUE); + return cache.size; + } + + public void clean(final Collection> requests) { + QueryCollectorSupport collectorSupport = new QueryCollectorSupport() { + Iterator> iterator = requests.iterator(); + @Override + public CacheCollectionResult allCaches() { + throw new UnsupportedOperationException(); + } + @Override + public CacheEntryBase iterate(int level) { + if(iterator.hasNext()) { + ExternalRead request = iterator.next(); + ExternalReadEntry entry = cache.externalReadEntryMap.get(request); + if (entry != null) return entry; + else return iterate(level); + } else { + iterator = requests.iterator(); + return null; + } + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + @Override + public void setLevel(CacheEntryBase entry, int level) { + throw new UnsupportedOperationException(); + } + @Override + public Collection getRootList() { + ArrayList result = new ArrayList(requests.size()); + for (ExternalRead request : requests) { + ExternalReadEntry entry = cache.externalReadEntryMap.get(request); + if (entry != null) + result.add(entry); + } + return result; + } + @Override + public int getCurrentSize() { + return cache.size; + } + @Override + public int calculateCurrentSize() { + // This tells the collector to attempt collecting everything. + return Integer.MAX_VALUE; + } + @Override + public boolean start(boolean flush) { + return true; + } + }; + new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE); + } + + public void scanPending() { + + cache.scanPending(); + + } + + public ReadGraphImpl graphForVirtualRequest() { + return ReadGraphImpl.createAsync(this); + } + + + private HashMap> builtinValues; + + public Class getBuiltinValue(Resource r) { + if(builtinValues == null) initBuiltinValues(); + return builtinValues.get(r); + } + + Exception callerException = null; + + public interface AsyncBarrier { + public void inc(); + public void dec(); + // public void inc(String debug); + // public void dec(String debug); + } + +// final public QueryProcessor processor; +// final public QuerySupport support; + + // boolean disposed = false; + + private void initBuiltinValues() { + + Layer0 b = getSession().peekService(Layer0.class); + if(b == null) return; + + builtinValues = new HashMap>(); + + builtinValues.put(b.String, String.class); + builtinValues.put(b.Double, Double.class); + builtinValues.put(b.Float, Float.class); + builtinValues.put(b.Long, Long.class); + builtinValues.put(b.Integer, Integer.class); + builtinValues.put(b.Byte, Byte.class); + builtinValues.put(b.Boolean, Boolean.class); + + builtinValues.put(b.StringArray, String[].class); + builtinValues.put(b.DoubleArray, double[].class); + builtinValues.put(b.FloatArray, float[].class); + builtinValues.put(b.LongArray, long[].class); + builtinValues.put(b.IntegerArray, int[].class); + builtinValues.put(b.ByteArray, byte[].class); + builtinValues.put(b.BooleanArray, boolean[].class); + + } + +// public ReadGraphSupportImpl(final QueryProcessor provider2) { +// +// if (null == provider2) { +// this.processor = null; +// support = null; +// return; +// } +// this.processor = provider2; +// support = provider2.getCore(); +// initBuiltinValues(); +// +// } + +// final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) { +// return new ReadGraphSupportImpl(impl.processor); +// } + + @Override + final public Session getSession() { + return session; + } + + final public ResourceSupport getResourceSupport() { + return resourceSupport; + } + + @Override + final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { + + try { + + for(Resource predicate : getPredicates(impl, subject)) + procedure.execute(impl, predicate); + + procedure.finished(impl); + + } catch (Throwable e) { + procedure.exception(impl, e); + } + + } + + @Override + final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure procedure) { + + throw new UnsupportedOperationException(); + +// assert(subject != null); +// assert(procedure != null); +// +// final ListenerBase listener = getListenerBase(procedure); +// +// try { +// QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() { +// +// @Override +// public void execute(ReadGraphImpl graph, int i) { +// try { +// procedure.execute(querySupport.getResource(i)); +// } catch (Throwable t2) { +// Logger.defaultLogError(t2); +// } +// } +// +// @Override +// public void finished(ReadGraphImpl graph) { +// try { +// procedure.finished(); +// } catch (Throwable t2) { +// Logger.defaultLogError(t2); +// } +//// impl.state.barrier.dec(); +// } +// +// @Override +// public void exception(ReadGraphImpl graph, Throwable t) { +// try { +// procedure.exception(t); +// } catch (Throwable t2) { +// Logger.defaultLogError(t2); +// } +//// impl.state.barrier.dec(); +// } +// +// }); +// } catch (DatabaseException e) { +// Logger.defaultLogError(e); +// } + + } + + @Override + final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable { + return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null); + } + + @Override + final public void forEachStatement(final ReadGraphImpl impl, final Resource subject, + final Resource predicate, final MultiProcedure procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + +// impl.state.barrier.inc(); + + try { + Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() { + + @Override + public void execute(ReadGraphImpl graph, int s, int p, int o) { + try { + procedure.execute(querySupport.getStatement(s, p, o)); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void finished(ReadGraphImpl graph) { + try { + procedure.finished(); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + procedure.exception(t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + }); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + @Override + final public void forEachStatement(final ReadGraphImpl impl, final Resource subject, + final Resource predicate, final AsyncMultiProcedure procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() { + + boolean first = true; + + @Override + public void execute(ReadGraphImpl graph, int s, int p, int o) { + try { + if(first) { + procedure.execute(graph, querySupport.getStatement(s, p, o)); + } else { + procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o)); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void finished(ReadGraphImpl graph) { + + try { + if(first) { + first = false; + procedure.finished(graph); +// impl.state.barrier.dec(this); + } else { + procedure.finished(impl.newRestart(graph)); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + + try { + if(first) { + first = false; + procedure.exception(graph, t); +// impl.state.barrier.dec(this); + } else { + procedure.exception(impl.newRestart(graph), t); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + + } + + }; + + int sId = querySupport.getId(subject); + int pId = querySupport.getId(predicate); + +// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId); +// else impl.state.barrier.inc(null, null); + + try { + Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + @Override + final public void forEachStatement(final ReadGraphImpl impl, final Resource subject, + final Resource predicate, final StatementProcedure procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() { + + boolean first = true; + + @Override + public void execute(ReadGraphImpl graph, int s, int p, int o) { + try { + if(first) { + procedure.execute(graph, s, p, o); + } else { + procedure.execute(impl.newRestart(graph), s, p, o); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void finished(ReadGraphImpl graph) { + + try { + if(first) { + first = false; + procedure.finished(graph); +// impl.state.barrier.dec(this); + } else { + procedure.finished(impl.newRestart(graph)); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + + try { + if(first) { + first = false; + procedure.exception(graph, t); +// impl.state.barrier.dec(this); + } else { + procedure.exception(impl.newRestart(graph), t); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + + } + + }; + + int sId = querySupport.getId(subject); + int pId = querySupport.getId(predicate); + +// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId); +// else impl.state.barrier.inc(null, null); + + try { + Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + @Override + final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + + forEachStatement(impl, subject, predicate, new AsyncMultiListener() { + + private Set current = null; + private Set run = new HashSet(); + + @Override + public void execute(AsyncReadGraph graph, Statement result) { + + boolean found = false; + + if(current != null) { + + found = current.remove(result); + + } + + if(!found) procedure.add(graph, result); + + run.add(result); + + } + + @Override + public void finished(AsyncReadGraph graph) { + + if(current != null) { + for(Statement r : current) procedure.remove(graph, r); + } + + current = run; + + run = new HashSet(); + + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + procedure.exception(graph, t); + } + + @Override + public boolean isDisposed() { + return procedure.isDisposed(); + } + + }); + + } + + @Override + final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject, + final Resource predicate, final AsyncMultiProcedure procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + +// impl.state.barrier.inc(); + + try { + QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() { + + @Override + public void execute(ReadGraphImpl graph, int s, int p, int o) { + try { + procedure.execute(graph, querySupport.getStatement(s, p, o)); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void finished(ReadGraphImpl graph) { + try { + procedure.finished(graph); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + }); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + private static ListenerBase getListenerBase(Object procedure) { + if(procedure instanceof ListenerBase) return (ListenerBase)procedure; + else return null; + } + + @Override + final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + +// impl.state.barrier.inc(); + + try { + QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() { + + @Override + public void execute(ReadGraphImpl graph, int i) { + try { + procedure.execute(querySupport.getResource(i)); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void finished(ReadGraphImpl graph) { + try { + procedure.finished(); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + System.out.println("forEachObject exception " + t); + try { + procedure.exception(t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + }); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + @Override + final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure> procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + int sId = querySupport.getId(subject); + + try { + QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure() { + + @Override + public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException { + procedure.execute(graph, result); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException { + procedure.exception(graph, throwable); + } + + }); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) { + +// assert(subject != null); +// assert(procedure != null); +// +// final ListenerBase listener = getListenerBase(procedure); +// +// org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure); + + return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual); + + } + +// @Override +// final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure procedure, boolean ignoreVirtual) { +// +// assert(subject != null); +// assert(procedure != null); +// +// final ListenerBase listener = getListenerBase(procedure); +// +// org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual); +// +// } + + private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE); + + @Override + final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure procedure) { + + forEachObject(impl, subject, predicate, new AsyncMultiProcedure() { + + private Resource single = null; + + @Override + public synchronized void execute(AsyncReadGraph graph, Resource result) { + if(single == null) { + single = result; + } else { + single = INVALID_RESOURCE; + } + } + + @Override + public synchronized void finished(AsyncReadGraph graph) { + if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null); + else procedure.execute(graph, single); + } + + @Override + public synchronized void exception(AsyncReadGraph graph, Throwable throwable) { + procedure.exception(graph, throwable); + } + + }); + + } + + final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) { + + final int sId = querySupport.getId(subject); + final int pId = querySupport.getId(predicate); + + try { + QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + static class Runner2Procedure implements IntProcedure { + + public int single = 0; + public Throwable t = null; + + public void clear() { + single = 0; + t = null; + } + + @Override + public void execute(ReadGraphImpl graph, int i) { + if(single == 0) single = i; + else single = -1; + } + + @Override + public void finished(ReadGraphImpl graph) { + if(single == -1) single = 0; + } + + @Override + public void exception(ReadGraphImpl graph, Throwable throwable) { + single = 0; + this.t = throwable; + } + + public int get() throws DatabaseException { + if(t != null) { + if(t instanceof DatabaseException) throw (DatabaseException)t; + else throw new DatabaseException(t); + } + return single; + } + + } + + final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException { + + final int sId = querySupport.getId(subject); + final int pId = querySupport.getId(predicate); + + Runner2Procedure proc = new Runner2Procedure(); + QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc); + return proc.get(); + + } + + final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure procedure) { + + assert(subject != null); + assert(predicate != null); + + final ListenerBase listener = getListenerBase(procedure); + + if(impl.parent != null || listener != null) { + + IntProcedure ip = new IntProcedure() { + + AtomicBoolean first = new AtomicBoolean(true); + + @Override + public void execute(ReadGraphImpl graph, int i) { + try { + if(first.get()) { + procedure.execute(impl, querySupport.getResource(i)); + } else { + procedure.execute(impl.newRestart(graph), querySupport.getResource(i)); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + + } + + @Override + public void finished(ReadGraphImpl graph) { + try { + if(first.compareAndSet(true, false)) { + procedure.finished(impl); +// impl.state.barrier.dec(this); + } else { + procedure.finished(impl.newRestart(graph)); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(this); + } + + @Override + public String toString() { + return "forEachObject with " + procedure; + } + + }; + +// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate); +// else impl.state.barrier.inc(null, null); + + forEachObject(impl, subject, predicate, listener, ip); + + } else { + + IntProcedure ip = new IntProcedure() { + + @Override + public void execute(ReadGraphImpl graph, int i) { + procedure.execute(graph, querySupport.getResource(i)); + } + + @Override + public void finished(ReadGraphImpl graph) { + procedure.finished(graph); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + procedure.exception(graph, t); + } + + @Override + public String toString() { + return "forEachObject with " + procedure; + } + + }; + + forEachObject(impl, subject, predicate, listener, ip); + + } + + } + + @Override + final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + + forEachObject(impl, subject, predicate, new AsyncMultiListener() { + + private Set current = null; + private Set run = new HashSet(); + + @Override + public void execute(AsyncReadGraph graph, Resource result) { + + boolean found = false; + + if(current != null) { + + found = current.remove(result); + + } + + if(!found) procedure.add(graph, result); + + run.add(result); + + } + + @Override + public void finished(AsyncReadGraph graph) { + + if(current != null) { + for(Resource r : current) procedure.remove(graph, r); + } + + current = run; + + run = new HashSet(); + + } + + @Override + public boolean isDisposed() { + return procedure.isDisposed(); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + procedure.exception(graph, t); + } + + @Override + public String toString() { + return "forObjectSet " + procedure; + } + + }); + + } + + @Override + final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener procedure) { + + assert(subject != null); + assert(procedure != null); + + forEachPredicate(impl, subject, new AsyncMultiListener() { + + private Set current = null; + private Set run = new HashSet(); + + @Override + public void execute(AsyncReadGraph graph, Resource result) { + + boolean found = false; + + if(current != null) { + + found = current.remove(result); + + } + + if(!found) procedure.add(graph, result); + + run.add(result); + + } + + @Override + public void finished(AsyncReadGraph graph) { + + if(current != null) { + for(Resource r : current) procedure.remove(graph, r); + } + + current = run; + + run = new HashSet(); + + } + + @Override + public boolean isDisposed() { + return procedure.isDisposed(); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + procedure.exception(graph, t); + } + + @Override + public String toString() { + return "forPredicateSet " + procedure; + } + + }); + + } + + @Override + final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener procedure) { + + assert(subject != null); + assert(procedure != null); + + forEachPrincipalType(impl, subject, new AsyncMultiListener() { + + private Set current = null; + private Set run = new HashSet(); + + @Override + public void execute(AsyncReadGraph graph, Resource result) { + + boolean found = false; + + if(current != null) { + + found = current.remove(result); + + } + + if(!found) procedure.add(graph, result); + + run.add(result); + + } + + @Override + public void finished(AsyncReadGraph graph) { + + if(current != null) { + for(Resource r : current) procedure.remove(graph, r); + } + + current = run; + + run = new HashSet(); + + } + + @Override + public boolean isDisposed() { + return procedure.isDisposed(); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + procedure.exception(graph, t); + } + + @Override + public String toString() { + return "forPrincipalTypeSet " + procedure; + } + + }); + + } + + @Override + final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + + forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener() { + + private Set current = null; + private Set run = new HashSet(); + + @Override + public void execute(AsyncReadGraph graph, Resource result) { + + boolean found = false; + + if(current != null) { + + found = current.remove(result); + + } + + if(!found) procedure.add(graph, result); + + run.add(result); + + } + + @Override + public void finished(AsyncReadGraph graph) { + + if(current != null) { + for(Resource r : current) procedure.remove(graph, r); + } + + current = run; + + run = new HashSet(); + + } + + @Override + public boolean isDisposed() { + return procedure.isDisposed(); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + procedure.exception(graph, t); + } + + @Override + public String toString() { + return "forObjectSet " + procedure; + } + + }); + + } + + @Override + final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + + forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener() { + + private Set current = null; + private Set run = new HashSet(); + + @Override + public void execute(AsyncReadGraph graph, Statement result) { + + boolean found = false; + + if(current != null) { + + found = current.remove(result); + + } + + if(!found) procedure.add(graph, result); + + run.add(result); + + } + + @Override + public void finished(AsyncReadGraph graph) { + + if(current != null) { + for(Statement s : current) procedure.remove(graph, s); + } + + current = run; + + run = new HashSet(); + + } + + @Override + public boolean isDisposed() { + return procedure.isDisposed(); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + procedure.exception(graph, t); + } + + @Override + public String toString() { + return "forStatementSet " + procedure; + } + + }); + + } + + @Override + final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject, + final Resource predicate, final AsyncMultiProcedure procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + try { + QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() { + + @Override + public void execute(ReadGraphImpl graph, int s, int p, int o) { + try { + procedure.execute(graph, querySupport.getResource(o)); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void finished(ReadGraphImpl graph) { + try { + procedure.finished(graph); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + }); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + @Override + final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + IntProcedure ip = new IntProcedure() { + + @Override + public void execute(ReadGraphImpl graph, int i) { + try { + procedure.execute(graph, querySupport.getResource(i)); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void finished(ReadGraphImpl graph) { + try { + procedure.finished(graph); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(this); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(this); + } + + }; + + int sId = querySupport.getId(subject); + +// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId); +// else impl.state.barrier.inc(null, null); + + try { + QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + @Override + final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + +// impl.state.barrier.inc(); + + try { + QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() { + + @Override + public void execute(ReadGraphImpl graph, int i) { + try { + procedure.execute(querySupport.getResource(i)); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void finished(ReadGraphImpl graph) { + try { + procedure.finished(); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + procedure.exception(t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + }); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + } + + final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure> procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + assert(listener == null); + + InternalProcedure ip = new InternalProcedure() { + + @Override + public void execute(final ReadGraphImpl graph, IntSet set) { + procedure.execute(graph, set); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + procedure.exception(graph, t); + } + + }; + + int sId = querySupport.getId(subject); + + try { + QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + @Override + final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable { + + assert(subject != null); + + return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null); + + } + + @Override + final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException { + + assert(subject != null); + + return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null); + + } + + @Override + final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure> procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + try { + QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure() { + + AtomicBoolean first = new AtomicBoolean(true); + + @Override + public void execute(final ReadGraphImpl graph, IntSet set) { +// final HashSet result = new HashSet(); +// set.forEach(new TIntProcedure() { +// +// @Override +// public boolean execute(int type) { +// result.add(querySupport.getResource(type)); +// return true; +// } +// +// }); + try { + if(first.compareAndSet(true, false)) { + procedure.execute(graph, set); +// impl.state.barrier.dec(); + } else { + procedure.execute(impl.newRestart(graph), set); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + if(first.compareAndSet(true, false)) { + procedure.exception(graph, t); +// impl.state.barrier.dec(); + } else { + procedure.exception(impl.newRestart(graph), t); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + }); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + @Override + final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + IntProcedure ip = new IntProcedureAdapter() { + + @Override + public void execute(final ReadGraphImpl graph, int superRelation) { + try { + procedure.execute(graph, querySupport.getResource(superRelation)); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void finished(final ReadGraphImpl graph) { + try { + procedure.finished(graph); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(this); + } + + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(this); + } + + }; + + int sId = querySupport.getId(subject); + +// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId); +// else impl.state.barrier.inc(null, null); + + try { + QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + +// DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip); + + } + + @Override + final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + +// impl.state.barrier.inc(); + + PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure); + + } + + @Override + final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure> procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + InternalProcedure ip = new InternalProcedure() { + + @Override + public void execute(final ReadGraphImpl graph, IntSet set) { +// final HashSet result = new HashSet(); +// set.forEach(new TIntProcedure() { +// +// @Override +// public boolean execute(int type) { +// result.add(querySupport.getResource(type)); +// return true; +// } +// +// }); + try { + procedure.execute(graph, set); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(this); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(this); + } + + }; + + int sId = querySupport.getId(subject); + +// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId); +// else impl.state.barrier.inc(null, null); + + try { + QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException { + return getValue(impl, querySupport.getId(subject)); + } + + final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException { + return QueryCache.resultValueQuery(impl, subject, impl.parent, null); + } + + @Override + final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { + + assert(subject != null); + assert(procedure != null); + + int sId = querySupport.getId(subject); + +// if(procedure != null) { + + final ListenerBase listener = getListenerBase(procedure); + + InternalProcedure ip = new InternalProcedure() { + + AtomicBoolean first = new AtomicBoolean(true); + + @Override + public void execute(ReadGraphImpl graph, byte[] result) { + try { + if(first.compareAndSet(true, false)) { + procedure.execute(graph, result); +// impl.state.barrier.dec(this); + } else { + procedure.execute(impl.newRestart(graph), result); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + if(first.compareAndSet(true, false)) { + procedure.exception(graph, t); +// impl.state.barrier.dec(this); + } else { + procedure.exception(impl.newRestart(graph), t); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + }; + +// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId); +// else impl.state.barrier.inc(null, null); + + try { + QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip); + } catch (DatabaseException e) { + throw new IllegalStateException("Internal error"); + } + +// } else { +// +// return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null); +// +// } +// +// throw new IllegalStateException("Internal error"); + + } + + @Override + final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + if(impl.parent != null || listener != null) { + + InternalProcedure ip = new InternalProcedure() { + + AtomicBoolean first = new AtomicBoolean(true); + + @Override + public void execute(ReadGraphImpl graph, byte[] result) { + try { + if(first.compareAndSet(true, false)) { + procedure.execute(graph, result); +// impl.state.barrier.dec(this); + } else { + procedure.execute(impl.newRestart(graph), result); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + if(first.compareAndSet(true, false)) { + procedure.exception(graph, t); +// impl.state.barrier.dec(this); + } else { + procedure.exception(impl.newRestart(graph), t); + } + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + }; + + int sId = querySupport.getId(subject); + +// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId); +// else impl.state.barrier.inc(null, null); + + try { + QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } else { + + InternalProcedure ip = new InternalProcedure() { + + @Override + public void execute(ReadGraphImpl graph, byte[] result) { + + procedure.execute(graph, result); + + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + + procedure.exception(graph, t); + + } + + }; + + int sId = querySupport.getId(subject); + + try { + QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + } + + @Override + final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure procedure) { + + assert(relation != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + IntProcedure ip = new IntProcedure() { + + private int result = 0; + + final AtomicBoolean found = new AtomicBoolean(false); + final AtomicBoolean done = new AtomicBoolean(false); + + @Override + public void finished(ReadGraphImpl graph) { + + // Shall fire exactly once! + if(done.compareAndSet(false, true)) { + try { + if(result == 0) { + procedure.exception(graph, new NoInverseException("")); +// impl.state.barrier.dec(this); + } else { + procedure.execute(graph, querySupport.getResource(result)); +// impl.state.barrier.dec(this); + } + } catch (Throwable t) { + Logger.defaultLogError(t); + } + } + + } + + @Override + public void execute(ReadGraphImpl graph, int i) { + + if(found.compareAndSet(false, true)) { + this.result = i; + } else { + // Shall fire exactly once! + if(done.compareAndSet(false, true)) { + try { + procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result)); +// impl.state.barrier.dec(this); + } catch (Throwable t) { + Logger.defaultLogError(t); + } + } + } + + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + // Shall fire exactly once! + if(done.compareAndSet(false, true)) { + try { + procedure.exception(graph, t); +// impl.state.barrier.dec(this); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + } + + }; + + int sId = querySupport.getId(relation); + +// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId); +// else impl.state.barrier.inc(null, null); + + try { + QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + @Override + final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure procedure) { + + assert(id != null); + assert(procedure != null); + + InternalProcedure ip = new InternalProcedure() { + + @Override + public void execute(ReadGraphImpl graph, Integer result) { + try { + procedure.execute(graph, querySupport.getResource(result)); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(this); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(this); + } + + }; + +// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource"); +// else impl.state.barrier.inc(null, null); + + forResource(impl, id, impl.parent, ip); + + } + + @Override + final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure procedure) { + + assert(id != null); + assert(procedure != null); + +// impl.state.barrier.inc(); + + try { + forBuiltin(impl, id, impl.parent, new InternalProcedure() { + + @Override + public void execute(ReadGraphImpl graph, Integer result) { + try { + procedure.execute(graph, querySupport.getResource(result)); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + }); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + @Override + final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + try { + IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener); + procedure.execute(impl, !result.isEmpty()); + } catch (DatabaseException e) { + procedure.exception(impl, e); + } + + } + + @Override + final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + + AsyncMultiProcedure ip = new AsyncMultiProcedureAdapter() { + + boolean found = false; + + @Override + synchronized public void execute(AsyncReadGraph graph, Resource resource) { + found = true; + } + + @Override + synchronized public void finished(AsyncReadGraph graph) { + try { + procedure.execute(graph, found); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(this); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(this); + } + + }; + +// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate); +// else impl.state.barrier.inc(null, null); + + forEachObject(impl, subject, predicate, ip); + + } + + @Override + final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure procedure) { + + assert(subject != null); + assert(predicate != null); + assert(procedure != null); + +// impl.state.barrier.inc(); + + forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter() { + + boolean found = false; + + @Override + synchronized public void execute(AsyncReadGraph graph, Resource resource) { + if(resource.equals(object)) found = true; + } + + @Override + synchronized public void finished(AsyncReadGraph graph) { + try { + procedure.execute(graph, found); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + }); + + } + + @Override + final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + +// impl.state.barrier.inc(); + + try { + QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure() { + + @Override + public void execute(ReadGraphImpl graph, byte[] object) { + boolean result = object != null; + try { + procedure.execute(graph, result); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + }); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + + @Override + final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { + + assert(subject != null); + assert(procedure != null); + + final ListenerBase listener = getListenerBase(procedure); + + try { + + QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() { + + @Override + public void exception(ReadGraphImpl graph, Throwable t) { + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + @Override + public void execute(ReadGraphImpl graph, int i) { + try { + procedure.execute(graph, querySupport.getResource(i)); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void finished(ReadGraphImpl graph) { + try { + procedure.finished(graph); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } +// impl.state.barrier.dec(); + } + + }); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } + +// @Override +// final public void query(final ReadGraphImpl impl, final AsyncRead request, final CacheEntry parent, final AsyncProcedure procedure, ListenerBase listener) throws DatabaseException { +// +// assert(request != null); +// assert(procedure != null); +// +// QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure); +// +// } + +// @Override +// final public T tryQuery(final ReadGraphImpl graph, final Read request) throws DatabaseException { +// +// assert(graph != null); +// assert(request != null); +// +// final ReadEntry entry = (ReadEntry)cache.getCached(request); +// if(entry != null && entry.isReady()) { +// return (T)entry.get(graph, this, null); +// } else { +// return request.perform(graph); +// } +// +// } + +// final public T tryQuery(final ReadGraphImpl graph, final ExternalRead request) throws DatabaseException { +// +// assert(graph != null); +// assert(request != null); +// +// final ExternalReadEntry entry = cache.externalReadMap.get(request); +// if(entry != null && entry.isReady()) { +// if(entry.isExcepted()) { +// Throwable t = (Throwable)entry.getResult(); +// if(t instanceof DatabaseException) throw (DatabaseException)t; +// else throw new DatabaseException(t); +// } else { +// return (T)entry.getResult(); +// } +// } else { +// +// final DataContainer result = new DataContainer(); +// final DataContainer exception = new DataContainer(); +// +// request.register(graph, new Listener() { +// +// @Override +// public void exception(Throwable t) { +// exception.set(t); +// } +// +// @Override +// public void execute(T t) { +// result.set(t); +// } +// +// @Override +// public boolean isDisposed() { +// return true; +// } +// +// }); +// +// Throwable t = exception.get(); +// if(t != null) { +// if(t instanceof DatabaseException) throw (DatabaseException)t; +// else throw new DatabaseException(t); +// } +// +// return result.get(); +// +// } +// +// } + +// @Override +// final public void tryQuery(final ReadGraphImpl graph, final AsyncRead request, AsyncProcedure procedure) { +// +// assert(graph != null); +// assert(request != null); +// +// final AsyncReadEntry entry = cache.asyncReadMap.get(request); +// if(entry != null && entry.isReady()) { +// if(entry.isExcepted()) { +// procedure.exception(graph, (Throwable)entry.getResult()); +// } else { +// procedure.execute(graph, (T)entry.getResult()); +// } +// } else { +// request.perform(graph, procedure); +// } +// +// } + + @Override + final public void query(final ReadGraphImpl impl, final MultiRead request, final CacheEntry parent, final SyncMultiProcedure procedure, ListenerBase listener) { + + assert(request != null); + assert(procedure != null); + + try { + + queryMultiRead(impl, request, parent, listener, procedure); + + } catch (DatabaseException e) { + + throw new IllegalStateException(e); + + } + + } + + @Override + final public void query(final ReadGraphImpl impl, final AsyncMultiRead request, final CacheEntry parent, final AsyncMultiProcedure procedure, ListenerBase listener) { + + assert(request != null); + assert(procedure != null); + +// impl.state.barrier.inc(); + + runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure() { + + public void execute(AsyncReadGraph graph, T result) { + + try { + procedure.execute(graph, result); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + } + + @Override + public void finished(AsyncReadGraph graph) { + + try { + procedure.finished(graph); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + +// impl.state.barrier.dec(); + + } + + @Override + public String toString() { + return procedure.toString(); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + + try { + procedure.exception(graph, t); + } catch (Throwable t2) { + Logger.defaultLogError(t2); + } + +// impl.state.barrier.dec(); + + } + + }); + + } + +// @Override +// final public void query(final ReadGraphImpl impl, final ExternalRead request, final CacheEntry parent, final Procedure procedure, ListenerBase listener) throws DatabaseException { +// +// assert(request != null); +// assert(procedure != null); +// +// try { +// +// queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure() { +// +// @Override +// public String toString() { +// return procedure.toString(); +// } +// +// @Override +// public void execute(AsyncReadGraph graph, T result) { +// try { +// procedure.execute(result); +// } catch (Throwable t2) { +// Logger.defaultLogError(t2); +// } +// } +// +// @Override +// public void exception(AsyncReadGraph graph, Throwable throwable) { +// try { +// procedure.exception(throwable); +// } catch (Throwable t2) { +// Logger.defaultLogError(t2); +// } +// } +// +// }); +// +// } catch (DatabaseException e) { +// +// throw new IllegalStateException(e); +// +// } +// +// } + + @Override + public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) { + + return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object)); + + } + + @Override + public VirtualGraph getProvider(Resource subject, Resource predicate) { + + return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate)); + + } + + @Override + public VirtualGraph getValueProvider(Resource subject) { + + return querySupport.getValueProvider(querySupport.getId(subject)); + + } + + public boolean resumeTasks(ReadGraphImpl graph) { + + return querySupport.resume(graph); + + } + + public boolean isImmutable(int resourceId) { + return querySupport.isImmutable(resourceId); + } + + public boolean isImmutable(Resource resource) { + ResourceImpl impl = (ResourceImpl)resource; + return isImmutable(impl.id); + } + + private Layer0 L0; + + public Layer0 getL0(ReadGraph graph) { + if(L0 == null) { + L0 = Layer0.getInstance(graph); + } + return L0; + } + + public static ThreadLocal thread = new ThreadLocal() { + protected Integer initialValue() { + return -1; + } + }; + +}