-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.db.impl.query;\r
-\r
-import java.io.BufferedOutputStream;\r
-import java.io.File;\r
-import java.io.FileOutputStream;\r
-import java.io.IOException;\r
-import java.io.PrintStream;\r
-import java.util.ArrayList;\r
-import java.util.Arrays;\r
-import java.util.Collection;\r
-import java.util.Collections;\r
-import java.util.HashMap;\r
-import java.util.HashSet;\r
-import java.util.IdentityHashMap;\r
-import java.util.Iterator;\r
-import java.util.LinkedList;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.Set;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.atomic.AtomicBoolean;\r
-import java.util.concurrent.atomic.AtomicInteger;\r
-import java.util.concurrent.locks.Condition;\r
-import java.util.concurrent.locks.ReentrantLock;\r
-\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.db.AsyncReadGraph;\r
-import org.simantics.db.DevelopmentKeys;\r
-import org.simantics.db.DirectStatements;\r
-import org.simantics.db.ReadGraph;\r
-import org.simantics.db.RelationInfo;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.Session;\r
-import org.simantics.db.Statement;\r
-import org.simantics.db.VirtualGraph;\r
-import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.debug.ListenerReport;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;\r
-import org.simantics.db.exception.NoInverseException;\r
-import org.simantics.db.exception.ResourceNotFoundException;\r
-import org.simantics.db.impl.DebugPolicy;\r
-import org.simantics.db.impl.ResourceImpl;\r
-import org.simantics.db.impl.graph.MultiIntProcedure;\r
-import org.simantics.db.impl.graph.ReadGraphImpl;\r
-import org.simantics.db.impl.graph.ReadGraphSupport;\r
-import org.simantics.db.impl.graph.WriteGraphImpl;\r
-import org.simantics.db.impl.procedure.IntProcedureAdapter;\r
-import org.simantics.db.impl.procedure.InternalProcedure;\r
-import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;\r
-import org.simantics.db.impl.support.ResourceSupport;\r
-import org.simantics.db.procedure.AsyncMultiListener;\r
-import org.simantics.db.procedure.AsyncMultiProcedure;\r
-import org.simantics.db.procedure.AsyncProcedure;\r
-import org.simantics.db.procedure.AsyncSetListener;\r
-import org.simantics.db.procedure.Listener;\r
-import org.simantics.db.procedure.ListenerBase;\r
-import org.simantics.db.procedure.MultiProcedure;\r
-import org.simantics.db.procedure.Procedure;\r
-import org.simantics.db.procedure.StatementProcedure;\r
-import org.simantics.db.request.AsyncMultiRead;\r
-import org.simantics.db.request.AsyncRead;\r
-import org.simantics.db.request.ExternalRead;\r
-import org.simantics.db.request.MultiRead;\r
-import org.simantics.db.request.Read;\r
-import org.simantics.db.request.RequestFlags;\r
-import org.simantics.db.request.WriteTraits;\r
-import org.simantics.layer0.Layer0;\r
-import org.simantics.utils.DataContainer;\r
-import org.simantics.utils.Development;\r
-import org.simantics.utils.datastructures.Pair;\r
-import org.simantics.utils.datastructures.collections.CollectionUtils;\r
-import org.simantics.utils.datastructures.disposable.AbstractDisposable;\r
-\r
-import gnu.trove.map.hash.THashMap;\r
-import gnu.trove.procedure.TIntProcedure;\r
-import gnu.trove.procedure.TLongProcedure;\r
-import gnu.trove.procedure.TObjectProcedure;\r
-import gnu.trove.set.hash.THashSet;\r
-import gnu.trove.set.hash.TIntHashSet;\r
-\r
-@SuppressWarnings({"rawtypes", "unchecked"})\r
-final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {\r
-\r
- final public UnaryQueryHashMap<IntProcedure> directPredicatesMap;\r
- final public UnaryQueryHashMap<IntProcedure> principalTypesMap;\r
- final public THashMap<String, URIToResource> uriToResourceMap;\r
- final public THashMap<String, NamespaceIndex> namespaceIndexMap22;\r
- final public UnaryQueryHashMap<IntProcedure> projectsMap;\r
- final public UnaryQueryHashMap<InternalProcedure<RelationInfo>> relationInfoMap;\r
- final public UnaryQueryHashMap<InternalProcedure<IntSet>> superTypesMap;\r
- final public UnaryQueryHashMap<InternalProcedure<IntSet>> typeHierarchyMap;\r
- final public UnaryQueryHashMap<InternalProcedure<IntSet>> superRelationsMap;\r
- final public UnaryQueryHashMap<InternalProcedure<IntSet>> typesMap;\r
- final public UnaryQueryHashMap<InternalProcedure<byte[]>> valueMap;\r
- final public DoubleKeyQueryHashMap<IntProcedure> directObjectsMap;\r
- final public DoubleKeyQueryHashMap<IntProcedure> objectsMap;\r
- final public UnaryQueryHashMap<IntProcedure> orderedSetMap;\r
- final public UnaryQueryHashMap<IntProcedure> predicatesMap;\r
- final public DoubleKeyQueryHashMap<TripleIntProcedure> statementsMap;\r
- final public UnaryQueryHashMap<IntProcedure> assertedPredicatesMap;\r
- final public BinaryQueryHashMap<TripleIntProcedure> assertedStatementsMap;\r
- final public StableHashMap<ExternalRead, ExternalReadEntry> externalReadMap; \r
- final public StableHashMap<AsyncRead, AsyncReadEntry> asyncReadMap; \r
- final public StableHashMap<Read, ReadEntry> readMap;\r
- final public StableHashMap<AsyncMultiRead, AsyncMultiReadEntry> asyncMultiReadMap; \r
- final public StableHashMap<MultiRead, MultiReadEntry> multiReadMap; \r
-\r
- final private THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners;\r
-\r
- public static int indent = 0;\r
-\r
- public int size = 0;\r
- \r
- \r
- // Garbage collection\r
- \r
- public int boundQueries = 0;\r
-\r
- // Statistics\r
- private int hits = 0;\r
-\r
- private int misses = 0;\r
-\r
- private int updates = 0;\r
-\r
- final private int functionalRelation;\r
-\r
- final private int superrelationOf;\r
-\r
- final private int instanceOf;\r
-\r
- final private int inverseOf;\r
-\r
- final private int asserts;\r
-\r
- final private int hasPredicate;\r
-\r
- final private int hasPredicateInverse;\r
-\r
- final private int hasObject;\r
-\r
- final private int inherits;\r
-\r
- final private int subrelationOf;\r
-\r
- final private int rootLibrary;\r
-\r
- /**\r
- * A cache for the root library resource. Initialized in\r
- * {@link #getRootLibraryResource()}.\r
- */\r
- private volatile ResourceImpl rootLibraryResource;\r
-\r
- final private int library;\r
-\r
- final private int consistsOf;\r
-\r
- final private int hasName;\r
-\r
- AtomicInteger sleepers = new AtomicInteger(0);\r
-\r
- private boolean updating = false;\r
-\r
- static public boolean collecting = false;\r
-\r
- private boolean firingListeners = false;\r
-\r
- final public QuerySupport querySupport;\r
- final public Session session;\r
- final public ResourceSupport resourceSupport;\r
-\r
- private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();\r
-\r
- QueryThread[] executors;\r
-\r
- public ArrayList<SessionTask>[] queues;\r
-\r
- enum ThreadState {\r
-\r
- INIT, RUN, SLEEP, DISPOSED\r
-\r
- }\r
-\r
- public ThreadState[] threadStates;\r
- public ReentrantLock[] threadLocks;\r
- public Condition[] threadConditions;\r
-\r
- public ArrayList<SessionTask>[] ownTasks;\r
-\r
- public ArrayList<SessionTask>[] ownSyncTasks;\r
-\r
- ArrayList<SessionTask>[] delayQueues;\r
- \r
- public boolean synch = true;\r
-\r
- final Object querySupportLock;\r
- \r
- public Long modificationCounter = 0L;\r
-\r
- public void close() {\r
- }\r
-\r
- final public void scheduleOwn(int caller, SessionTask request) {\r
- ownTasks[caller].add(request);\r
- }\r
-\r
- final public void scheduleAlways(int caller, SessionTask request) {\r
-\r
- int performer = request.thread;\r
- if(caller == performer) {\r
- ownTasks[caller].add(request);\r
- } else {\r
- schedule(caller, request);\r
- }\r
-\r
- }\r
-\r
- final public void schedule(int caller, SessionTask request) {\r
-\r
- int performer = request.thread;\r
-\r
- if(DebugPolicy.SCHEDULE)\r
- System.out.println("schedule " + request + " " + caller + " -> " + performer);\r
-\r
- assert(performer >= 0);\r
-\r
- assert(request != null);\r
-\r
- if(caller == performer) {\r
- request.run(caller);\r
- } else {\r
- ReentrantLock queueLock = threadLocks[performer];\r
- queueLock.lock();\r
- queues[performer].add(request);\r
- // This thread could have been sleeping\r
- if(queues[performer].size() == 1) {\r
- if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();\r
- threadConditions[performer].signalAll();\r
- }\r
- queueLock.unlock();\r
- }\r
-\r
- }\r
-\r
-\r
- final int THREADS;\r
-\r
- final public int THREAD_MASK;\r
- final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group"); \r
-\r
- public static abstract class SessionTask {\r
-\r
- final public int thread;\r
- final public int syncCaller;\r
- final public Object object;\r
-\r
- public SessionTask(WriteTraits object, int thread) {\r
- this.thread = thread;\r
- this.syncCaller = -1;\r
- this.object = object;\r
- }\r
-\r
- public SessionTask(Object object, int thread, int syncCaller) {\r
- this.thread = thread;\r
- this.syncCaller = syncCaller;\r
- this.object = object;\r
- }\r
-\r
- public abstract void run(int thread);\r
-\r
- @Override\r
- public String toString() {\r
- return "SessionTask[" + object + "]";\r
- }\r
-\r
- }\r
-\r
- public static abstract class SessionRead extends SessionTask {\r
-\r
- final public Semaphore notify;\r
- final public DataContainer<Throwable> throwable; \r
-\r
- public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {\r
- super(object, thread, thread);\r
- this.throwable = throwable;\r
- this.notify = notify;\r
- }\r
-\r
- public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {\r
- super(object, thread, syncThread);\r
- this.throwable = throwable;\r
- this.notify = notify;\r
- }\r
-\r
- }\r
-\r
- long waitingTime = 0;\r
-\r
- static int koss = 0;\r
- static int koss2 = 0;\r
-\r
- public boolean resume(ReadGraphImpl graph) {\r
- return executors[0].runSynchronized();\r
- }\r
-\r
- public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)\r
- throws DatabaseException {\r
-\r
- THREADS = threads;\r
- THREAD_MASK = threads - 1;\r
-\r
- querySupport = core;\r
- session = querySupport.getSession();\r
- resourceSupport = querySupport.getSupport();\r
- querySupportLock = core.getLock();\r
-\r
- executors = new QueryThread[THREADS];\r
- queues = new ArrayList[THREADS];\r
- threadLocks = new ReentrantLock[THREADS];\r
- threadConditions = new Condition[THREADS];\r
- threadStates = new ThreadState[THREADS];\r
- ownTasks = new ArrayList[THREADS];\r
- ownSyncTasks = new ArrayList[THREADS];\r
- delayQueues = new ArrayList[THREADS * THREADS];\r
-\r
- // freeSchedule = new AtomicInteger(0);\r
-\r
- for (int i = 0; i < THREADS * THREADS; i++) {\r
- delayQueues[i] = new ArrayList<SessionTask>();\r
- }\r
-\r
- for (int i = 0; i < THREADS; i++) {\r
-\r
- // tasks[i] = new ArrayList<Runnable>();\r
- ownTasks[i] = new ArrayList<SessionTask>();\r
- ownSyncTasks[i] = new ArrayList<SessionTask>();\r
- queues[i] = new ArrayList<SessionTask>();\r
- threadLocks[i] = new ReentrantLock();\r
- threadConditions[i] = threadLocks[i].newCondition();\r
- // limits[i] = false;\r
- threadStates[i] = ThreadState.INIT;\r
-\r
- }\r
-\r
- for (int i = 0; i < THREADS; i++) {\r
-\r
- final int index = i;\r
-\r
- executors[i] = new QueryThread(session, this, index, "Query Thread " + index);\r
-\r
- threadSet.add(executors[i]);\r
-\r
- }\r
-\r
- directPredicatesMap = new UnaryQueryHashMap();\r
- valueMap = new UnaryQueryHashMap();\r
- principalTypesMap = new UnaryQueryHashMap();\r
- uriToResourceMap = new THashMap<String, URIToResource>();\r
- namespaceIndexMap22 = new THashMap<String, NamespaceIndex>();\r
- projectsMap = new UnaryQueryHashMap();\r
- relationInfoMap = new UnaryQueryHashMap();\r
- typeHierarchyMap = new UnaryQueryHashMap();\r
- superTypesMap = new UnaryQueryHashMap();\r
- superRelationsMap = new UnaryQueryHashMap();\r
- typesMap = new UnaryQueryHashMap();\r
- objectsMap = new DoubleKeyQueryHashMap();\r
- orderedSetMap = new UnaryQueryHashMap();\r
- predicatesMap = new UnaryQueryHashMap();\r
- statementsMap = new DoubleKeyQueryHashMap();\r
- directObjectsMap = new DoubleKeyQueryHashMap();\r
- assertedPredicatesMap = new UnaryQueryHashMap();\r
- assertedStatementsMap = new BinaryQueryHashMap();\r
- asyncReadMap = new StableHashMap<AsyncRead, AsyncReadEntry>(); \r
- readMap = new StableHashMap<Read, ReadEntry>();\r
- asyncMultiReadMap = new StableHashMap<AsyncMultiRead, AsyncMultiReadEntry>(); \r
- multiReadMap = new StableHashMap<MultiRead, MultiReadEntry>(); \r
- externalReadMap = new StableHashMap<ExternalRead, ExternalReadEntry>(); \r
- listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);\r
-\r
- // Now start threads\r
- for (int i = 0; i < THREADS; i++) {\r
- executors[i].start();\r
- }\r
-\r
- // Make sure that query threads are up and running\r
- while(sleepers.get() != THREADS) {\r
- try {\r
- Thread.sleep(5);\r
- } catch (InterruptedException e) {\r
- e.printStackTrace();\r
- }\r
- }\r
-\r
- rootLibrary = core.getBuiltin("http:/");\r
- boolean builtinsInstalled = rootLibrary != 0;\r
-\r
- if (builtinsInstalled) {\r
- functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);\r
- assert (functionalRelation != 0);\r
- } else\r
- functionalRelation = 0;\r
-\r
- if (builtinsInstalled) {\r
- instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);\r
- assert (instanceOf != 0);\r
- } else\r
- instanceOf = 0;\r
-\r
- if (builtinsInstalled) {\r
- inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);\r
- assert (inverseOf != 0);\r
- } else\r
- inverseOf = 0;\r
-\r
-\r
- if (builtinsInstalled) {\r
- inherits = core.getBuiltin(Layer0.URIs.Inherits);\r
- assert (inherits != 0);\r
- } else\r
- inherits = 0;\r
-\r
- if (builtinsInstalled) {\r
- asserts = core.getBuiltin(Layer0.URIs.Asserts);\r
- assert (asserts != 0);\r
- } else\r
- asserts = 0;\r
-\r
- if (builtinsInstalled) {\r
- hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);\r
- assert (hasPredicate != 0);\r
- } else\r
- hasPredicate = 0;\r
-\r
- if (builtinsInstalled) {\r
- hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);\r
- assert (hasPredicateInverse != 0);\r
- } else\r
- hasPredicateInverse = 0;\r
-\r
- if (builtinsInstalled) {\r
- hasObject = core.getBuiltin(Layer0.URIs.HasObject);\r
- assert (hasObject != 0);\r
- } else\r
- hasObject = 0;\r
-\r
- if (builtinsInstalled) {\r
- subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);\r
- assert (subrelationOf != 0);\r
- } else\r
- subrelationOf = 0;\r
-\r
- if (builtinsInstalled) {\r
- superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);\r
- assert (superrelationOf != 0);\r
- } else\r
- superrelationOf = 0;\r
-\r
- if (builtinsInstalled) {\r
- library = core.getBuiltin(Layer0.URIs.Library);\r
- assert (library != 0);\r
- } else\r
- library = 0;\r
-\r
- if (builtinsInstalled) {\r
- consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);\r
- assert (consistsOf != 0);\r
- } else\r
- consistsOf = 0;\r
-\r
- if (builtinsInstalled) {\r
- hasName = core.getBuiltin(Layer0.URIs.HasName);\r
- assert (hasName != 0);\r
- } else\r
- hasName = 0;\r
-\r
- }\r
-\r
- final public void releaseWrite(ReadGraphImpl graph) {\r
- performDirtyUpdates(graph);\r
- modificationCounter++;\r
- }\r
-\r
- final public int getId(final Resource r) {\r
- return querySupport.getId(r);\r
- }\r
-\r
- public QuerySupport getCore() {\r
- return querySupport;\r
- }\r
-\r
- public int getFunctionalRelation() {\r
- return functionalRelation;\r
- }\r
-\r
- public int getInherits() {\r
- return inherits;\r
- }\r
-\r
- public int getInstanceOf() {\r
- return instanceOf;\r
- }\r
-\r
- public int getInverseOf() {\r
- return inverseOf;\r
- }\r
-\r
- public int getSubrelationOf() {\r
- return subrelationOf;\r
- }\r
-\r
- public int getSuperrelationOf() {\r
- return superrelationOf;\r
- }\r
-\r
- public int getAsserts() {\r
- return asserts;\r
- }\r
-\r
- public int getHasPredicate() {\r
- return hasPredicate;\r
- }\r
-\r
- public int getHasPredicateInverse() {\r
- return hasPredicateInverse;\r
- }\r
-\r
- public int getHasObject() {\r
- return hasObject;\r
- }\r
-\r
- public int getRootLibrary() {\r
- return rootLibrary;\r
- }\r
-\r
- public Resource getRootLibraryResource() {\r
- if (rootLibraryResource == null) {\r
- // Synchronization is not needed here, it doesn't matter if multiple\r
- // threads simultaneously set rootLibraryResource once.\r
- int root = getRootLibrary();\r
- if (root == 0)\r
- throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");\r
- this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);\r
- }\r
- return rootLibraryResource;\r
- }\r
-\r
- public int getLibrary() {\r
- return library;\r
- }\r
-\r
- public int getConsistsOf() {\r
- return consistsOf;\r
- }\r
-\r
- public int getHasName() {\r
- return hasName;\r
- }\r
-\r
- public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {\r
-\r
- URIToResource.queryEach(graph, id, parent, null, new InternalProcedure<Integer>() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, Integer result) {\r
-\r
- if (result != null && result != 0) {\r
- procedure.execute(graph, result);\r
- return;\r
- }\r
-\r
- // Fall back to using the fixed builtins.\r
- result = querySupport.getBuiltin(id);\r
- if (result != 0) {\r
- procedure.execute(graph, result);\r
- return;\r
- } \r
-\r
- try {\r
- result = querySupport.getRandomAccessReference(id);\r
- } catch (ResourceNotFoundException e) {\r
- procedure.exception(graph, e);\r
- return;\r
- }\r
-\r
- if (result != 0) {\r
- procedure.execute(graph, result);\r
- } else {\r
- procedure.exception(graph, new ResourceNotFoundException(id));\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- procedure.exception(graph, t);\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {\r
-\r
- Integer result = querySupport.getBuiltin(id);\r
- if (result != 0) {\r
- procedure.execute(graph, result);\r
- } else {\r
- procedure.exception(graph, new ResourceNotFoundException(id));\r
- }\r
-\r
- }\r
-\r
- public final <T> void runAsyncRead(final ReadGraphImpl graph, final AsyncRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) {\r
-\r
- int hash = requestHash(query);\r
-\r
- AsyncReadEntry<T> entry = asyncReadMap.get(query, hash); \r
-\r
- if(parent == null && listener == null) {\r
- if(entry != null && (entry.isReady() || entry.isExcepted())) {\r
- System.out.println("ready " + query);\r
- entry.performFromCache(graph, this, procedure);\r
-// graph.state.barrier.dec(query);\r
- return;\r
- } else {\r
- query.perform(graph, procedure);\r
-// graph.state.barrier.dec(query);\r
- return;\r
- }\r
- }\r
-\r
- if(entry == null) {\r
-\r
- entry = new AsyncReadEntry<T>(query);\r
- entry.setPending();\r
- entry.clearResult(querySupport);\r
- asyncReadMap.put(query, entry, hash);\r
-\r
- performForEach(graph, query, entry, parent, listener, procedure, false);\r
-\r
- } else {\r
-\r
- if(entry.isPending()) {\r
- synchronized(entry) {\r
- if(entry.isPending()) {\r
- throw new IllegalStateException();\r
- // final AsyncBarrierImpl parentBarrier = graph.state.barrier;\r
- // if(entry.procs == null) entry.procs = new ArrayList<AsyncProcedure<T>>();\r
- // entry.procs.add(new AsyncProcedure<T>() {\r
- //\r
- // @Override\r
- // public void execute(AsyncReadGraph graph, T result) {\r
- // procedure.execute(graph, result);\r
- // parentBarrier.dec(query);\r
- // }\r
- //\r
- // @Override\r
- // public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- // procedure.exception(graph, throwable);\r
- // parentBarrier.dec(query);\r
- // }\r
- // \r
- // });\r
-// if(graph.parent != null || listener != null) {\r
-// registerDependencies(graph, entry, parent, listener, procedure, false);\r
-// }\r
-//\r
-// query.perform(graph, procedure);\r
-//\r
-// return;\r
-\r
- }\r
- }\r
- }\r
-\r
- if(entry.isReady()) { \r
- entry.performFromCache(graph, this, procedure);\r
- registerDependencies(graph, entry, parent, listener, procedure, false);\r
- } else {\r
- performForEach(graph, query, entry, parent, listener, procedure, false);\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
-\r
- final static <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {\r
-\r
- MultiReadEntry entry = cached != null ? cached : provider.multiReadMap.get(query); \r
- if(entry == null) {\r
-\r
- entry = new MultiReadEntry(query);\r
- entry.setPending();\r
- entry.clearResult(provider.querySupport);\r
-\r
- provider.multiReadMap.put(query, entry);\r
-\r
- provider.performForEach(graph, query, entry, parent, listener, procedure, false);\r
-\r
- } else {\r
-\r
- if(entry.isPending()) {\r
-\r
- synchronized(entry) {\r
-\r
- if(entry.isPending()) {\r
- throw new IllegalStateException();\r
-\r
-// if(entry.procs == null) entry.procs = new ArrayList<Pair<AsyncMultiProcedure<T>, AsyncBarrier>>();\r
-// entry.procs.add(new Pair(procedure, parentBarrier));\r
-// if(graph.parent != null || listener != null) {\r
-// provider.registerDependencies(graph, entry, parent, listener, procedure, false);\r
-// }\r
-\r
- // If this was synchronized we must wait here until completion\r
- // if(graph.state.synchronizedExecution) {\r
- // while(entry.isPending()) {\r
- // graph.resumeTasks(graph.callerThread, null, null);\r
- // }\r
- // }\r
-//\r
-// return;\r
-\r
- }\r
- }\r
-\r
- entry.performFromCache(graph, provider, procedure);\r
-// graph.state.barrier.dec(query);\r
- return;\r
-\r
- } else {\r
-\r
- provider.performForEach(graph, query, entry, parent, listener, procedure, false);\r
-\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
- public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {\r
-\r
- int hash = requestHash(query);\r
-\r
- AsyncMultiReadEntry entry = asyncMultiReadMap.get(query, hash);\r
-\r
- if(parent == null && listener == null) {\r
- if(entry != null && (entry.isReady() || entry.isExcepted())) {\r
- System.out.println("ready " + query);\r
- entry.performFromCache(graph, this, procedure);\r
- return;\r
- } else {\r
- query.perform(graph, procedure);\r
- return;\r
- }\r
- }\r
-\r
- if(entry == null) {\r
-\r
- entry = new AsyncMultiReadEntry<T>(query); \r
- entry.setPending();\r
- entry.clearResult(querySupport);\r
-\r
- asyncMultiReadMap.put(query, entry, hash);\r
-\r
- performForEach(graph, query, entry, parent, listener, procedure, false);\r
-\r
- } else {\r
-\r
- if(entry.isPending()) {\r
- \r
- synchronized(entry) {\r
- if(entry.isPending()) {\r
- throw new IllegalStateException();\r
-// if(entry.procs == null) entry.procs = new ArrayList<AsyncMultiProcedure<T>>();\r
-// entry.procs.add(procedure);\r
-// if(graph.parent != null || listener != null) {\r
-// registerDependencies(graph, entry, parent, listener, procedure, false);\r
-// }\r
-// return;\r
- }\r
- }\r
- }\r
-\r
- performForEach(graph, query, entry, parent, listener, procedure, false);\r
-\r
- }\r
-\r
- }\r
-\r
- final static <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final Procedure<T> procedure) {\r
-\r
- final ExternalReadEntry<T> entry = cached != null ? cached : provider.externalReadMap.get(query); \r
- if(entry == null) {\r
- provider.performForEach(graph, query, new ExternalReadEntry<T>(query), parent, listener, procedure, false);\r
- } else {\r
- if(entry.isPending()) {\r
- synchronized(entry) {\r
- if(entry.isPending()) {\r
- throw new IllegalStateException();\r
-// if(entry.procs == null) entry.procs = new ArrayList<Procedure<T>>();\r
-// entry.procs.add(procedure);\r
-// return;\r
- }\r
- }\r
- }\r
- provider.performForEach(graph, query, entry, parent, listener, procedure, false);\r
- }\r
-\r
- }\r
-\r
- public int requestHash(Object object) {\r
- try {\r
- return object.hashCode();\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- return 0;\r
- }\r
- }\r
- \r
- @Override\r
- public <T> T queryRead(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws Throwable {\r
-\r
- assert(query != null);\r
-\r
- ReadEntry entry = readMap.get(query);\r
-\r
- if(entry != null) {\r
- if(parent == null && (listener == null || listener.isDisposed()) && entry.isReady()) {\r
- return (T)entry.get(graph, this, procedure);\r
- } else if (entry.isPending()) {\r
- throw new IllegalStateException();\r
- }\r
- }\r
-\r
- if(entry == null) {\r
-\r
- entry = new ReadEntry(query);\r
- entry.setPending();\r
- entry.clearResult(querySupport);\r
-\r
- readMap.put(query, entry);\r
-\r
- return (T)performForEach(graph, query, entry, parent, listener, procedure, false);\r
-\r
- } else {\r
-\r
- if(entry.isPending()) {\r
- throw new IllegalStateException();\r
- } else {\r
- return (T)performForEach(graph, query, entry, parent, listener, procedure, false);\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
- public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {\r
-\r
- assert(query != null);\r
- assert(procedure != null);\r
-\r
- final MultiReadEntry entry = multiReadMap.get(query);\r
-\r
- if(parent == null && !(listener != null)) {\r
- if(entry != null && entry.isReady()) { \r
- entry.performFromCache(graph, this, procedure);\r
- return;\r
- }\r
- }\r
-\r
- runMultiRead(graph, entry, query, parent, this, listener, procedure);\r
-\r
- }\r
-\r
- public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final Procedure<T> procedure) {\r
-\r
- assert(query != null);\r
- assert(procedure != null);\r
-\r
- final ExternalReadEntry entry = externalReadMap.get(query);\r
-\r
- if(parent == null && !(listener != null)) {\r
- if(entry != null && entry.isReady()) { \r
- entry.performFromCache(procedure);\r
- return;\r
- }\r
- }\r
-\r
- runPrimitiveRead(graph, entry, query, parent, this, listener, procedure);\r
-\r
- }\r
-\r
- public <T> void performForEach(ReadGraphImpl parentGraph, final AsyncRead<T> query, final AsyncReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final AsyncProcedure<T> procedure,\r
- boolean inferredDependency) {\r
-\r
- if (DebugPolicy.PERFORM)\r
- System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
-\r
- assert (!dirty);\r
- assert (!collecting);\r
-\r
- assert(!entry.isDiscarded());\r
-\r
- final ListenerEntry listenerEntry = registerDependencies(parentGraph, entry, parent, base, procedure, inferredDependency);\r
-\r
- // FRESH, REFUTED, EXCEPTED go here \r
- if (!entry.isReady()) {\r
-\r
- entry.setPending();\r
-\r
- size++;\r
-\r
- try {\r
-\r
- final ReadGraphImpl finalParentGraph = parentGraph;\r
-\r
- query.perform(parentGraph.withParent(entry), new AsyncProcedure<T>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph returnGraph, T result) {\r
- ReadGraphImpl impl = (ReadGraphImpl)returnGraph;\r
- //AsyncReadGraph resumeGraph = finalParentGraph.newAsync();\r
- entry.addOrSet(finalParentGraph, result);\r
- if(listenerEntry != null) {\r
- primeListenerEntry(listenerEntry, result);\r
- }\r
- try {\r
- procedure.execute(finalParentGraph, result);\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
-// parentBarrier.dec(query);\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph returnGraph, Throwable t) {\r
- ReadGraphImpl impl = (ReadGraphImpl)returnGraph;\r
-// AsyncReadGraph resumeGraph = finalParentGraph.newAsync();\r
- entry.except(finalParentGraph, t);\r
- try {\r
- procedure.exception(finalParentGraph, t);\r
- } catch (Throwable t2) {\r
- t2.printStackTrace();\r
- }\r
-// parentBarrier.dec(query);\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return procedure.toString();\r
- }\r
-\r
- });\r
-\r
- } catch (Throwable t) {\r
-\r
- entry.except(t);\r
- try {\r
- procedure.exception(parentGraph, t);\r
- } catch (Throwable t2) {\r
- t2.printStackTrace();\r
- }\r
-// parentBarrier.dec(query);\r
-\r
- }\r
-\r
- misses++;\r
-\r
- } else {\r
-\r
- entry.performFromCache(parentGraph, this, new AsyncProcedure<T>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- procedure.exception(graph, throwable);\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, T result) {\r
- procedure.execute(graph, result);\r
- if(listenerEntry != null) {\r
- primeListenerEntry(listenerEntry, result);\r
- }\r
- }\r
-\r
- });\r
-\r
-// parentBarrier.dec(query);\r
-\r
- hits++;\r
-\r
- }\r
-\r
- assert (!entry.isDiscarded());\r
-\r
- }\r
-\r
- public <T> T performForEach(final ReadGraphImpl graph, final Read<T> query, final ReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure,\r
- boolean inferredDependency) throws Throwable {\r
-\r
- if (DebugPolicy.PERFORM)\r
- System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
-\r
- assert (!dirty);\r
- assert (!collecting);\r
-\r
- entry.assertNotDiscarded();\r
-\r
- if(entry.isReady()) {\r
-\r
- // EXCEPTED goes here\r
-\r
-// if(procedure != null) entry.performFromCache(graph, this, procedure);\r
-// parentBarrier.dec(query);\r
- hits++;\r
-\r
- ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);\r
- \r
- T result = (T)entry.get(graph, this, procedure); \r
-\r
- if(listenerEntry != null) primeListenerEntry(listenerEntry, result);\r
-\r
- return result;\r
-\r
- } else {\r
-\r
- // FRESH, REFUTED, PENDING go here\r
-\r
- entry.setPending();\r
-\r
- size++;\r
- misses++;\r
-\r
- ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);\r
-\r
- final ReadGraphImpl performGraph = graph.newSync(entry);\r
-\r
- try {\r
-\r
- if(Development.DEVELOPMENT)\r
- Development.recordHistogram("run " + query);\r
-\r
- T result = query.perform(performGraph);\r
- entry.addOrSet(performGraph, result);\r
- \r
- if(listenerEntry != null) primeListenerEntry(listenerEntry, result);\r
-\r
- return (T)entry.get(graph, this, procedure);\r
-\r
- } catch (Throwable t) {\r
-\r
- entry.except(t);\r
- return (T)entry.get(graph, this, procedure);\r
-\r
- }\r
-\r
- } \r
-\r
- }\r
-\r
- public <T> void performForEach(final ReadGraphImpl graph, final MultiRead<T> query, final MultiReadEntry<T> entry, CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,\r
- boolean inferredDependency) {\r
-\r
- if (DebugPolicy.PERFORM)\r
- System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
-\r
- assert (!dirty);\r
- assert (!collecting);\r
-\r
- assert(!entry.isPending());\r
- assert(!entry.isDiscarded());\r
-\r
- // FRESH, REFUTED, EXCEPTED go here \r
- if (!entry.isReady()) {\r
-\r
- entry.setPending();\r
- entry.clearResult(querySupport);\r
-\r
- multiReadMap.put(query, entry);\r
- size++;\r
-\r
- final ReadGraphImpl newGraph = graph.newSync(entry);\r
-// newGraph.state.barrier.inc();\r
-\r
- try {\r
-\r
- query.perform(newGraph, new AsyncMultiProcedure<T>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, T result) {\r
- entry.addOrSet(result);\r
- try {\r
- procedure.execute(graph, result);\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
- entry.finish(graph);\r
- try {\r
- procedure.finished(graph);\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
-// newGraph.state.barrier.dec();\r
-// parentBarrier.dec();\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- entry.except(t);\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- t2.printStackTrace();\r
- }\r
-// newGraph.state.barrier.dec();\r
-// parentBarrier.dec();\r
- }\r
-\r
- });\r
-\r
- } catch (DatabaseException e) {\r
-\r
- entry.except(e);\r
- try {\r
- procedure.exception(graph, e);\r
- } catch (Throwable t2) {\r
- t2.printStackTrace();\r
- }\r
-// newGraph.state.barrier.dec();\r
-// parentBarrier.dec();\r
-\r
- } catch (Throwable t) {\r
-\r
- DatabaseException e = new DatabaseException(t);\r
-\r
- entry.except(e);\r
- try {\r
- procedure.exception(graph, e);\r
- } catch (Throwable t2) {\r
- t2.printStackTrace();\r
- }\r
-// newGraph.state.barrier.dec();\r
-// parentBarrier.dec();\r
-\r
- }\r
-\r
- misses++;\r
-\r
- } else {\r
-\r
- entry.performFromCache(graph, this, procedure);\r
- hits++;\r
-\r
-\r
- }\r
-\r
- assert (!entry.isDiscarded());\r
-\r
- registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);\r
-\r
- }\r
-\r
-\r
- public <T> void performForEach(final ReadGraphImpl callerGraph, AsyncMultiRead<T> query, final AsyncMultiReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,\r
- boolean inferredDependency) {\r
-\r
- if (DebugPolicy.PERFORM)\r
- System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
-\r
- assert (!dirty);\r
- assert (!collecting);\r
-\r
- try {\r
-\r
- assert(!entry.isDiscarded());\r
-\r
- // FRESH, REFUTED, EXCEPTED go here \r
- if (!entry.isReady()) {\r
-\r
- size++;\r
-\r
- try {\r
-\r
- ReadGraphImpl performGraph = callerGraph.withAsyncParent(entry);\r
-\r
- query.perform(performGraph, new AsyncMultiProcedure<T>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, T result) {\r
- ReadGraphImpl impl = (ReadGraphImpl)graph;\r
-// ReadGraphImpl executeGraph = callerGraph.newAsync();\r
- entry.addOrSet(result);\r
- try {\r
- procedure.execute(callerGraph, result);\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
- ReadGraphImpl impl = (ReadGraphImpl)graph;\r
-// ReadGraphImpl executeGraph = callerGraph.newAsync();\r
- entry.finish(callerGraph);\r
- try {\r
- procedure.finished(callerGraph);\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- ReadGraphImpl impl = (ReadGraphImpl)graph;\r
-// ReadGraphImpl executeGraph = callerGraph.newAsync();\r
- entry.except(callerGraph, t);\r
- try {\r
- procedure.exception(callerGraph, t);\r
- } catch (Throwable t2) {\r
- t2.printStackTrace();\r
- }\r
- }\r
-\r
- });\r
-\r
- } catch (Throwable t) {\r
-\r
- entry.except(t);\r
- try {\r
- procedure.exception(callerGraph, t);\r
- } catch (Throwable t2) {\r
- t2.printStackTrace();\r
- }\r
-\r
- }\r
-\r
-\r
- misses++;\r
-\r
- } else {\r
-\r
- entry.performFromCache(callerGraph, this, procedure);\r
-\r
- hits++;\r
-\r
- }\r
-\r
- assert (!entry.isDiscarded());\r
-\r
- registerDependencies(callerGraph, entry, parent, listener, procedure, inferredDependency);\r
-\r
- } catch (Throwable t) {\r
-\r
- Logger.defaultLogError(t);\r
-\r
- } finally {\r
-\r
- }\r
-\r
- }\r
-\r
- public <T> void performForEach(ReadGraphImpl graph, final ExternalRead<T> query, final ExternalReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final Procedure<T> procedure,\r
- boolean inferredDependency) {\r
-\r
- if (DebugPolicy.PERFORM)\r
- System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
-\r
- assert (!dirty);\r
- assert (!collecting);\r
-\r
- assert(!entry.isPending());\r
- assert(!entry.isDiscarded());\r
-\r
- registerDependencies(graph, entry, parent, base, procedure, inferredDependency);\r
-\r
- // FRESH, REFUTED, EXCEPTED go here \r
- if (!entry.isReady()) {\r
-\r
- entry.setPending();\r
- entry.clearResult(querySupport);\r
-\r
- externalReadMap.put(query, entry);\r
- size++;\r
-\r
- try {\r
-\r
- query.register(graph, new Listener<T>() {\r
-\r
- AtomicBoolean used = new AtomicBoolean(false);\r
-\r
- @Override\r
- public void execute(T result) {\r
- \r
- // Just for safety\r
- if(entry.isDiscarded()) return;\r
- if(entry.isExcepted()) entry.setPending();\r
- \r
- if(used.compareAndSet(false, true)) {\r
- entry.addOrSet(QueryProcessor.this, result);\r
- procedure.execute(result);\r
- } else {\r
- entry.queue(result);\r
- updatePrimitive(query);\r
- }\r
- \r
- }\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- \r
- entry.except(t);\r
-\r
- if(used.compareAndSet(false, true)) {\r
- procedure.exception(t);\r
- } else {\r
-// entry.queue(result);\r
- updatePrimitive(query);\r
- }\r
- \r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return procedure.toString();\r
- }\r
-\r
- @Override\r
- public boolean isDisposed() {\r
- return entry.isDiscarded() || !isBound(entry);\r
- }\r
-\r
- });\r
-\r
- } catch (Throwable t) {\r
-\r
- entry.except(t);\r
- procedure.exception(t);\r
-\r
- }\r
-\r
- misses++;\r
-\r
- } else {\r
-\r
- entry.performFromCache(procedure);\r
-\r
- hits++;\r
-\r
- }\r
-\r
- assert (!entry.isDiscarded());\r
-\r
- }\r
-\r
- private boolean isBound(ExternalReadEntry<?> entry) {\r
- if(entry.hasParents()) return true;\r
- else if(hasListener(entry)) return true;\r
- else return false;\r
- }\r
-\r
- synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {\r
-\r
- if (parent != null && !inferred) {\r
- try {\r
- if(!child.isImmutable(graph))\r
- child.addParent(parent);\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- }\r
- if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);\r
- }\r
-\r
- if (listener != null) {\r
- return registerListener(child, listener, procedure);\r
- } else {\r
- return null;\r
- }\r
-\r
- }\r
-\r
- public <Procedure> void performForEach(ReadGraphImpl graph, BinaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {\r
-\r
- if (DebugPolicy.PERFORM)\r
- System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
-\r
- assert (!dirty);\r
- assert (!collecting);\r
-\r
- try {\r
-\r
- registerDependencies(graph, query, parent, listener, procedure, false);\r
-\r
- // FRESH, REFUTED, EXCEPTED go here \r
- if (!query.isReady()) {\r
-\r
- boolean fresh = query.isFresh();\r
-\r
- if(fresh) {\r
- size++;\r
- }\r
-\r
- query.computeForEach(graph, this, procedure, true);\r
-\r
- misses++;\r
-\r
- } else {\r
-\r
- query.performFromCache(graph, this, procedure);\r
-\r
- hits++;\r
-\r
- }\r
-\r
- } catch (Throwable t) {\r
-\r
- Logger.defaultLogError(t);\r
-\r
- }\r
- }\r
-\r
- public <Procedure> Object performForEach(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {\r
-\r
- if (DebugPolicy.PERFORM)\r
- System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
-\r
- assert (!dirty);\r
- assert (!collecting);\r
-\r
- try {\r
-\r
- assert(query.assertNotDiscarded());\r
-\r
- registerDependencies(graph, query, parent, listener, procedure, false);\r
-\r
- // FRESH, REFUTED, EXCEPTED go here \r
- if (!query.isReady()) {\r
-\r
- size++;\r
- misses++;\r
-\r
- return query.computeForEach(graph, this, procedure, true);\r
-\r
-\r
- } else {\r
-\r
- hits++;\r
-\r
- return query.performFromCache(graph, this, procedure);\r
-\r
- }\r
-\r
- } catch (Throwable t) {\r
-\r
- Logger.defaultLogError(t);\r
- return null;\r
-\r
- } \r
-\r
- }\r
- \r
- static class Dummy implements InternalProcedure<Object>, IntProcedure {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int i) {\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- }\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, Object result) {\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable throwable) {\r
- }\r
- \r
- }\r
- \r
- private static final Dummy dummy = new Dummy();\r
-\r
- public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {\r
-\r
- if (DebugPolicy.PERFORM)\r
- System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
-\r
- assert (!dirty);\r
- assert (!collecting);\r
-\r
- assert(query.assertNotDiscarded());\r
-\r
- registerDependencies(graph, query, parent, listener, procedure, false);\r
-\r
- // FRESH, REFUTED, EXCEPTED go here \r
- if (!query.isReady()) {\r
-\r
- size++;\r
- misses++;\r
-\r
- query.computeForEach(graph, this, (Procedure)dummy, true);\r
- return query.get(graph, this, null);\r
-\r
- } else {\r
-\r
- hits++;\r
-\r
- return query.get(graph, this, procedure);\r
-\r
- }\r
-\r
- }\r
- \r
- public <Procedure> void performForEach(ReadGraphImpl graph, StringQuery<Procedure> query, CacheEntry parent, final ListenerBase listener, Procedure procedure) {\r
-\r
- if (DebugPolicy.PERFORM)\r
- System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
-\r
- assert (!dirty);\r
- assert (!collecting);\r
-\r
- try {\r
-\r
- if(query.isDiscarded()) {\r
- System.err.println("aff");\r
- }\r
- assert(!query.isDiscarded());\r
-\r
- // FRESH, REFUTED, EXCEPTED go here \r
- if (!query.isReady()) {\r
-\r
- query.computeForEach(graph.withAsyncParent(query), this, procedure);\r
-\r
- size++;\r
- misses++;\r
-\r
- } else {\r
-\r
- query.performFromCache(graph, this, procedure);\r
-\r
- hits++;\r
-\r
- }\r
-\r
- assert (!query.isDiscarded());\r
-\r
- registerDependencies(graph, query, parent, listener, procedure, false);\r
-\r
- } catch (Throwable t) {\r
-\r
- t.printStackTrace();\r
- Logger.defaultLogError(t);\r
-\r
- }\r
-\r
- }\r
-\r
- interface QueryCollectorSupport {\r
- public CacheCollectionResult allCaches();\r
- public Collection<CacheEntry> getRootList();\r
- public int getCurrentSize();\r
- public int calculateCurrentSize();\r
- public CacheEntryBase iterate(int level);\r
- public void remove();\r
- public void setLevel(CacheEntryBase entry, int level);\r
- public boolean start(boolean flush);\r
- }\r
-\r
- interface QueryCollector {\r
-\r
- public void collect(int youngTarget, int allowedTimeInMs);\r
-\r
- }\r
-\r
- class QueryCollectorSupportImpl implements QueryCollectorSupport {\r
-\r
- private static final boolean DEBUG = false;\r
- private static final double ITERATION_RATIO = 0.2;\r
- \r
- private CacheCollectionResult iteration = new CacheCollectionResult();\r
- private boolean fresh = true;\r
- private boolean needDataInStart = true;\r
- \r
- QueryCollectorSupportImpl() {\r
- iteration.restart();\r
- }\r
-\r
- public CacheCollectionResult allCaches() {\r
- CacheCollectionResult result = new CacheCollectionResult();\r
- QueryProcessor.this.allCaches(result);\r
- result.restart();\r
- return result;\r
- }\r
- \r
- public boolean start(boolean flush) {\r
- // We need new data from query maps\r
- fresh = true;\r
- if(needDataInStart || flush) {\r
- // Last run ended after processing all queries => refresh data\r
- restart(flush ? 0.0 : ITERATION_RATIO);\r
- } else {\r
- // continue with previous big data\r
- }\r
- // Notify caller about iteration situation\r
- return iteration.isAtStart();\r
- }\r
-\r
- private void restart(double targetRatio) {\r
- \r
- needDataInStart = true;\r
-\r
- long start = System.nanoTime();\r
- if(fresh) {\r
- \r
- // We need new data from query maps\r
- \r
- int iterationSize = iteration.size()+1;\r
- int diff = calculateCurrentSize()-iterationSize;\r
- \r
- double ratio = (double)diff / (double)iterationSize;\r
- boolean dirty = Math.abs(ratio) >= targetRatio;\r
- \r
- if(dirty) {\r
- iteration = allCaches();\r
- if(DEBUG) {\r
- System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");\r
- for(int i=0;i<CacheCollectionResult.LEVELS;i++)\r
- System.err.print(" " + iteration.levels[i].size());\r
- System.err.println("");\r
- }\r
- } else {\r
- iteration.restart();\r
- }\r
- \r
- fresh = false;\r
- needDataInStart = false;\r
- } else {\r
- // We are returning here within the same GC round - reuse the cache table\r
- iteration.restart();\r
- }\r
- \r
- return;\r
- \r
- }\r
- \r
- @Override\r
- public CacheEntryBase iterate(int level) {\r
- \r
- CacheEntryBase entry = iteration.next(level);\r
- if(entry == null) {\r
- restart(ITERATION_RATIO);\r
- return null;\r
- }\r
- \r
- while(entry != null && entry.isDiscarded()) {\r
- entry = iteration.next(level);\r
- }\r
- \r
- return entry;\r
- \r
- }\r
- \r
- @Override\r
- public void remove() {\r
- iteration.remove();\r
- }\r
- \r
- @Override\r
- public void setLevel(CacheEntryBase entry, int level) {\r
- iteration.setLevel(entry, level);\r
- }\r
-\r
- public Collection<CacheEntry> getRootList() {\r
-\r
- ArrayList<CacheEntry> result = new ArrayList<CacheEntry>();\r
-\r
- for (Object e : valueMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
- for (Object e : directPredicatesMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
- for (Object e : objectsMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
- for (Object e : directObjectsMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
- for (Object e : principalTypesMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
- for (Object e : superRelationsMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
- for (Object e : superTypesMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
- for (Object e : typesMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
- for (Object e : objectsMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
- for (Object e : assertedStatementsMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
- for (Object e : readMap.values()) {\r
- if(e instanceof CacheEntry) {\r
- result.add((CacheEntry) e);\r
- } else {\r
- System.err.println("e=" + e);\r
- }\r
- }\r
- for (Object e : asyncReadMap.values()) {\r
- if(e instanceof CacheEntry) {\r
- result.add((CacheEntry) e);\r
- } else {\r
- System.err.println("e=" + e);\r
- }\r
- }\r
- for (Object e : externalReadMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
- for (Object e : orderedSetMap.values()) {\r
- result.add((CacheEntry) e);\r
- }\r
-\r
- return result;\r
-\r
- }\r
-\r
- @Override\r
- public int calculateCurrentSize() {\r
- \r
- int realSize = 0;\r
- \r
- realSize += directPredicatesMap.size();\r
- realSize += principalTypesMap.size();\r
- realSize += uriToResourceMap.size();\r
- realSize += namespaceIndexMap22.size();\r
- realSize += projectsMap.size();\r
- \r
- realSize += relationInfoMap.size();\r
- realSize += superTypesMap.size();\r
- realSize += typeHierarchyMap.size();\r
- realSize += superRelationsMap.size();\r
- realSize += typesMap.size();\r
- \r
- realSize += valueMap.size();\r
- realSize += directObjectsMap.size();\r
- realSize += objectsMap.size();\r
- realSize += orderedSetMap.size();\r
- realSize += predicatesMap.size();\r
- \r
- realSize += statementsMap.size();\r
- realSize += assertedPredicatesMap.size();\r
- realSize += assertedStatementsMap.size();\r
- realSize += externalReadMap.size();\r
- realSize += asyncReadMap.size();\r
- \r
- realSize += readMap.size();\r
- realSize += asyncMultiReadMap.size();\r
- realSize += multiReadMap.size();\r
- \r
- size = realSize;\r
- \r
- return realSize;\r
- \r
- }\r
-\r
- @Override\r
- public int getCurrentSize() {\r
- return size;\r
- }\r
-\r
- }\r
- // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);\r
-\r
- private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();\r
- private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);\r
-\r
- public int querySize() {\r
- return size;\r
- }\r
-\r
- public void gc(int youngTarget, int allowedTimeInMs) {\r
-\r
- collector.collect(youngTarget, allowedTimeInMs);\r
-\r
- }\r
-\r
- public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {\r
-\r
- assert (entry != null);\r
-\r
- if (base.isDisposed())\r
- return null;\r
-\r
- return addListener(entry, base, procedure);\r
-\r
- }\r
-\r
- private void primeListenerEntry(final ListenerEntry entry, final Object result) {\r
- entry.setLastKnown(result);\r
- }\r
-\r
- private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {\r
-\r
- assert (entry != null);\r
- assert (procedure != null);\r
-\r
- ArrayList<ListenerEntry> list = listeners.get(entry);\r
- if (list == null) {\r
- list = new ArrayList<ListenerEntry>(1);\r
- listeners.put(entry, list);\r
- }\r
-\r
- ListenerEntry result = new ListenerEntry(entry, base, procedure);\r
- int currentIndex = list.indexOf(result);\r
- // There was already a listener\r
- if(currentIndex > -1) {\r
- ListenerEntry current = list.get(currentIndex);\r
- if(!current.base.isDisposed()) return null;\r
- list.set(currentIndex, result);\r
- } else {\r
- list.add(result);\r
- }\r
-\r
- if(DebugPolicy.LISTENER) {\r
- new Exception().printStackTrace();\r
- System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);\r
- }\r
-\r
- return result;\r
-\r
- }\r
-\r
- private void scheduleListener(ListenerEntry entry) {\r
- assert (entry != null);\r
- if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);\r
- scheduledListeners.add(entry);\r
- }\r
-\r
- private void removeListener(ListenerEntry entry) {\r
- assert (entry != null);\r
- ArrayList<ListenerEntry> list = listeners.get(entry.entry);\r
- if(list == null) return;\r
- boolean success = list.remove(entry);\r
- assert (success);\r
- if (list.isEmpty())\r
- listeners.remove(entry.entry);\r
- }\r
-\r
- private boolean hasListener(CacheEntry entry) {\r
- if(listeners.get(entry) != null) return true;\r
- return false;\r
- }\r
-\r
- boolean hasListenerAfterDisposing(CacheEntry entry) {\r
- if(listeners.get(entry) != null) {\r
- ArrayList<ListenerEntry> entries = listeners.get(entry);\r
- ArrayList<ListenerEntry> list = null;\r
- for (ListenerEntry e : entries) {\r
- if (e.base.isDisposed()) {\r
- if(list == null) list = new ArrayList<ListenerEntry>();\r
- list.add(e);\r
- }\r
- }\r
- if(list != null) {\r
- for (ListenerEntry e : list) {\r
- entries.remove(e);\r
- }\r
- }\r
- if (entries.isEmpty()) {\r
- listeners.remove(entry);\r
- return false;\r
- }\r
- return true;\r
- }\r
- return false;\r
- }\r
-\r
- List<ListenerEntry> getListenerEntries(CacheEntry entry) {\r
- hasListenerAfterDisposing(entry);\r
- if(listeners.get(entry) != null)\r
- return listeners.get(entry);\r
- else \r
- return Collections.emptyList();\r
- }\r
-\r
- void processListenerReport(CacheEntry entry, Map<CacheEntry, Set<ListenerBase>> workarea) {\r
-\r
- if(!workarea.containsKey(entry)) {\r
-\r
- HashSet<ListenerBase> ls = new HashSet<ListenerBase>();\r
- for(ListenerEntry e : getListenerEntries(entry))\r
- ls.add(e.base);\r
-\r
- workarea.put(entry, ls);\r
-\r
- for(CacheEntry parent : entry.getParents(this)) {\r
- processListenerReport(parent, workarea);\r
- ls.addAll(workarea.get(parent));\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
- public synchronized ListenerReport getListenerReport() throws IOException {\r
-\r
- class ListenerReportImpl implements ListenerReport {\r
-\r
- Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();\r
-\r
- @Override\r
- public void print(PrintStream b) {\r
- Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();\r
- for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {\r
- for(ListenerBase l : e.getValue()) {\r
- Integer i = hist.get(l);\r
- hist.put(l, i != null ? i-1 : -1);\r
- }\r
- }\r
-\r
- for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {\r
- b.print("" + -p.second + " " + p.first + "\n");\r
- }\r
-\r
- b.flush();\r
- }\r
-\r
- }\r
-\r
- ListenerReportImpl result = new ListenerReportImpl();\r
-\r
- Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();\r
- for(CacheEntryBase entry : all) {\r
- hasListenerAfterDisposing(entry);\r
- }\r
- for(CacheEntryBase entry : all) {\r
- processListenerReport(entry, result.workarea);\r
- }\r
-\r
- return result;\r
-\r
- }\r
-\r
- public synchronized String reportListeners(File file) throws IOException {\r
-\r
- if (!isAlive())\r
- return "Disposed!";\r
-\r
- PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));\r
- ListenerReport report = getListenerReport();\r
- report.print(b);\r
-\r
- return "Done reporting listeners.";\r
-\r
- }\r
-\r
- void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {\r
-\r
- if(entry.isDiscarded()) return;\r
- if(workarea.containsKey(entry)) return;\r
- \r
- Iterable<CacheEntry> parents = entry.getParents(this);\r
- HashSet<CacheEntry> ps = new HashSet<CacheEntry>();\r
- for(CacheEntry e : parents) {\r
- if(e.isDiscarded()) continue;\r
- ps.add(e);\r
- processParentReport(e, workarea);\r
- }\r
- workarea.put(entry, ps);\r
-\r
- }\r
-\r
- public synchronized String reportQueryActivity(File file) throws IOException {\r
- \r
- System.err.println("reportQueries " + file.getAbsolutePath());\r
-\r
- if (!isAlive())\r
- return "Disposed!";\r
-\r
- PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));\r
-\r
- List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);\r
- Collections.reverse(entries);\r
- \r
- for(Pair<String,Integer> entry : entries) {\r
- b.println(entry.first + ": " + entry.second);\r
- }\r
-\r
- b.close();\r
- \r
- Development.histogram.clear();\r
-\r
- return "OK";\r
-\r
- }\r
- \r
- public synchronized String reportQueries(File file) throws IOException {\r
-\r
- System.err.println("reportQueries " + file.getAbsolutePath());\r
-\r
- if (!isAlive())\r
- return "Disposed!";\r
-\r
- PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));\r
-\r
- long start = System.nanoTime();\r
-\r
-// ArrayList<CacheEntry> all = ;\r
- \r
- Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();\r
- Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();\r
- for(CacheEntryBase entry : caches) {\r
- processParentReport(entry, workarea);\r
- }\r
- \r
- // for(CacheEntry e : all) System.err.println("entry: " + e);\r
-\r
- long duration = System.nanoTime() - start;\r
- System.err.println("Query root set in " + 1e-9*duration + "s.");\r
-\r
- start = System.nanoTime();\r
-\r
- HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>(); \r
-\r
- int listeners = 0;\r
-\r
- for(CacheEntry entry : workarea.keySet()) {\r
- boolean listener = hasListenerAfterDisposing(entry);\r
- boolean hasParents = entry.getParents(this).iterator().hasNext();\r
- if(listener) {\r
- // Bound\r
- flagMap.put(entry, 0);\r
- } else if (!hasParents) {\r
- // Unbound\r
- flagMap.put(entry, 1);\r
- } else {\r
- // Unknown\r
- flagMap.put(entry, 2);\r
- }\r
- // // Write leaf bit\r
- // entry.flags |= 4;\r
- }\r
-\r
- boolean done = true;\r
- int loops = 0;\r
-\r
- do {\r
-\r
- done = true;\r
-\r
- long start2 = System.nanoTime();\r
-\r
- int boundCounter = 0;\r
- int unboundCounter = 0;\r
- int unknownCounter = 0;\r
-\r
- for(CacheEntry entry : workarea.keySet()) {\r
-\r
- //System.err.println("process " + entry);\r
-\r
- int flags = flagMap.get(entry);\r
- int bindStatus = flags & 3;\r
-\r
- if(bindStatus == 0) boundCounter++;\r
- else if(bindStatus == 1) unboundCounter++;\r
- else if(bindStatus == 2) unknownCounter++;\r
-\r
- if(bindStatus < 2) continue;\r
-\r
- int newStatus = 1;\r
- for(CacheEntry parent : entry.getParents(this)) {\r
-\r
- if(parent.isDiscarded()) flagMap.put(parent, 1);\r
-\r
- int flags2 = flagMap.get(parent);\r
- int bindStatus2 = flags2 & 3;\r
- // Parent is bound => child is bound\r
- if(bindStatus2 == 0) {\r
- newStatus = 0;\r
- break;\r
- }\r
- // Parent is unknown => child is unknown\r
- else if (bindStatus2 == 2) {\r
- newStatus = 2;\r
- done = false;\r
- break;\r
- }\r
- }\r
-\r
- flagMap.put(entry, newStatus);\r
-\r
- }\r
-\r
- duration = System.nanoTime() - start2;\r
- System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");\r
- b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");\r
-\r
- } while(!done && loops++ < 20);\r
-\r
- if(loops >= 20) {\r
-\r
- for(CacheEntry entry : workarea.keySet()) {\r
-\r
- int bindStatus = flagMap.get(entry);\r
- if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);\r
-\r
- }\r
-\r
- }\r
-\r
- duration = System.nanoTime() - start;\r
- System.err.println("Query analysis in " + 1e-9*duration + "s.");\r
-\r
- Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();\r
-\r
- for(CacheEntry entry : workarea.keySet()) {\r
- Class<?> clazz = entry.getClass();\r
- if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass(); \r
- else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass(); \r
- else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass(); \r
- else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass(); \r
- else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass(); \r
- Integer c = counts.get(clazz);\r
- if(c == null) counts.put(clazz, -1);\r
- else counts.put(clazz, c-1);\r
- }\r
-\r
- b.print("// Simantics DB client query report file\n");\r
- b.print("// This file contains the following information\n");\r
- b.print("// -The amount of cached query instances per query class\n");\r
- b.print("// -The sizes of retained child sets\n");\r
- b.print("// -List of parents for each query (search for 'P <query name>')\n");\r
- b.print("// -Followed by status, where\n");\r
- b.print("// -0=bound\n");\r
- b.print("// -1=free\n");\r
- b.print("// -2=unknown\n");\r
- b.print("// -L=has listener\n");\r
- b.print("// -List of children for each query (search for 'C <query name>')\n");\r
-\r
- b.print("----------------------------------------\n");\r
-\r
- b.print("// Queries by class\n");\r
- for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {\r
- b.print(-p.second + " " + p.first.getName() + "\n");\r
- }\r
-\r
- Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();\r
- for(CacheEntry e : workarea.keySet())\r
- hist.put(e, -1);\r
- \r
- boolean changed = true;\r
- int iter = 0;\r
- while(changed && iter++<50) {\r
- \r
- changed = false;\r
- \r
- Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();\r
- for(CacheEntry e : workarea.keySet())\r
- newHist.put(e, -1);\r
-\r
- for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {\r
- Integer c = hist.get(e.getKey());\r
- for(CacheEntry p : e.getValue()) {\r
- Integer i = newHist.get(p);\r
- newHist.put(p, i+c);\r
- }\r
- }\r
- for(CacheEntry e : workarea.keySet()) {\r
- Integer value = newHist.get(e);\r
- Integer old = hist.get(e);\r
- if(!value.equals(old)) {\r
- hist.put(e, value);\r
-// System.err.println("hist " + e + ": " + old + " => " + value);\r
- changed = true;\r
- }\r
- }\r
- \r
- System.err.println("Retained set iteration " + iter);\r
-\r
- }\r
-\r
- b.print("// Queries by retained set\n");\r
- for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {\r
- b.print("" + -p.second + " " + p.first + "\n");\r
- }\r
-\r
- HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();\r
-\r
- b.print("// Entry parent listing\n");\r
- for(CacheEntry entry : workarea.keySet()) {\r
- int status = flagMap.get(entry);\r
- boolean hasListener = hasListenerAfterDisposing(entry);\r
- b.print("Q " + entry.toString());\r
- if(hasListener) {\r
- b.print(" (L" + status + ")");\r
- listeners++;\r
- } else {\r
- b.print(" (" + status + ")");\r
- }\r
- b.print("\n");\r
- for(CacheEntry parent : workarea.get(entry)) {\r
- Collection<CacheEntry> inv = inverse.get(parent);\r
- if(inv == null) {\r
- inv = new ArrayList<CacheEntry>();\r
- inverse.put(parent, inv);\r
- }\r
- inv.add(entry);\r
- b.print(" " + parent.toString());\r
- b.print("\n");\r
- }\r
- }\r
-\r
- b.print("// Entry child listing\n");\r
- for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {\r
- b.print("C " + entry.getKey().toString());\r
- b.print("\n");\r
- for(CacheEntry child : entry.getValue()) {\r
- Integer h = hist.get(child);\r
- if(h != null) {\r
- b.print(" " + h);\r
- } else {\r
- b.print(" <no children>");\r
- }\r
- b.print(" " + child.toString());\r
- b.print("\n");\r
- }\r
- }\r
-\r
- b.print("#queries: " + workarea.keySet().size() + "\n");\r
- b.print("#listeners: " + listeners + "\n");\r
-\r
- b.close();\r
-\r
- return "Dumped " + workarea.keySet().size() + " queries.";\r
-\r
- }\r
-\r
- class UpdateEntry {\r
-\r
- public CacheEntry caller;\r
-\r
- public CacheEntry entry;\r
-\r
- public int indent;\r
-\r
- public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {\r
- this.caller = caller;\r
- this.entry = entry;\r
- this.indent = indent;\r
- }\r
-\r
- };\r
-\r
- boolean removeQuery(CacheEntry entry) {\r
-\r
- // This entry has been removed before. No need to do anything here.\r
- if(entry.isDiscarded()) return false;\r
-\r
- assert (!entry.isDiscarded());\r
-\r
- Query query = entry.getQuery();\r
-\r
- query.removeEntry(this);\r
-\r
- updates++;\r
- size--;\r
-\r
- if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)\r
- boundQueries--;\r
- \r
- entry.discard();\r
-\r
- return true;\r
-\r
- }\r
-\r
- /**\r
- * \r
- * @return true if this entry is being listened\r
- */\r
- private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {\r
-\r
- assert (e != null);\r
-\r
- CacheEntry entry = e.entry;\r
-\r
-// System.err.println("updateQuery " + entry);\r
- \r
- /*\r
- * If the dependency graph forms a DAG, some entries are inserted in the\r
- * todo list many times. They only need to be processed once though.\r
- */\r
- if (entry.isDiscarded()) {\r
- if (Development.DEVELOPMENT) {\r
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
- System.out.print("D");\r
- for (int i = 0; i < e.indent; i++)\r
- System.out.print(" ");\r
- System.out.println(entry.getQuery());\r
- }\r
- }\r
-// System.err.println(" => DISCARDED");\r
- return false;\r
- }\r
-\r
- if (entry.isRefuted()) {\r
- if (Development.DEVELOPMENT) {\r
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
- System.out.print("R");\r
- for (int i = 0; i < e.indent; i++)\r
- System.out.print(" ");\r
- System.out.println(entry.getQuery());\r
- }\r
- }\r
- return false;\r
- }\r
-\r
- if (entry.isExcepted()) {\r
- if (Development.DEVELOPMENT) {\r
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
- System.out.print("E");\r
- }\r
- }\r
- }\r
-\r
- if (entry.isPending()) {\r
- if (Development.DEVELOPMENT) {\r
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
- System.out.print("P");\r
- }\r
- }\r
- }\r
-\r
- updates++;\r
-\r
- if (Development.DEVELOPMENT) {\r
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
- System.out.print("U ");\r
- for (int i = 0; i < e.indent; i++)\r
- System.out.print(" ");\r
- System.out.print(entry.getQuery());\r
- }\r
- }\r
-\r
- Query query = entry.getQuery();\r
- int type = query.type();\r
-\r
- boolean hasListener = hasListener(entry); \r
-\r
- if (Development.DEVELOPMENT) {\r
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
- if(hasListener(entry)) {\r
- System.out.println(" (L)");\r
- } else {\r
- System.out.println("");\r
- }\r
- }\r
- }\r
-\r
- if(entry.isPending() || entry.isExcepted()) {\r
-\r
- // If updated\r
- if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {\r
-\r
- immediates.put(entry, entry);\r
-\r
- } else {\r
-\r
- if(hasListener) {\r
- entry.refute();\r
- } else {\r
- removeQuery(entry);\r
- }\r
-\r
- }\r
-\r
- } else {\r
-\r
- // If updated\r
- if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {\r
-\r
- immediates.put(entry, entry);\r
-\r
- } else {\r
-\r
- if(hasListener) {\r
- entry.refute();\r
- } else {\r
- removeQuery(entry);\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
-// System.err.println(" => FOO " + type);\r
-\r
- if (hasListener) {\r
- ArrayList<ListenerEntry> entries = listeners.get(entry);\r
- if(entries != null) {\r
- for (ListenerEntry le : entries) {\r
- scheduleListener(le);\r
- }\r
- }\r
- }\r
-\r
- // If invalid, update parents\r
- if (type == RequestFlags.INVALIDATE) {\r
- updateParents(e.indent, entry, todo);\r
- }\r
-\r
- return hasListener;\r
-\r
- }\r
-\r
- private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {\r
-\r
- Iterable<CacheEntry> oldParents = entry.getParents(this);\r
- for (CacheEntry parent : oldParents) {\r
-// System.err.println("updateParents " + entry + " => " + parent);\r
- if(!parent.isDiscarded())\r
- todo.push(new UpdateEntry(entry, parent, indent + 2));\r
- }\r
-\r
- }\r
-\r
- private boolean pruneListener(ListenerEntry entry) {\r
- if (entry.base.isDisposed()) {\r
- removeListener(entry);\r
- return true;\r
- } else {\r
- return false;\r
- }\r
- }\r
-\r
- /**\r
- * @param av1 an array (guaranteed)\r
- * @param av2 any object\r
- * @return <code>true</code> if the two arrays are equal\r
- */\r
- private final boolean arrayEquals(Object av1, Object av2) {\r
- if (av2 == null)\r
- return false;\r
- Class<?> c1 = av1.getClass().getComponentType();\r
- Class<?> c2 = av2.getClass().getComponentType();\r
- if (c2 == null || !c1.equals(c2))\r
- return false;\r
- boolean p1 = c1.isPrimitive();\r
- boolean p2 = c2.isPrimitive();\r
- if (p1 != p2)\r
- return false;\r
- if (!p1)\r
- return Arrays.equals((Object[]) av1, (Object[]) av2);\r
- if (boolean.class.equals(c1))\r
- return Arrays.equals((boolean[]) av1, (boolean[]) av2);\r
- else if (byte.class.equals(c1))\r
- return Arrays.equals((byte[]) av1, (byte[]) av2);\r
- else if (int.class.equals(c1))\r
- return Arrays.equals((int[]) av1, (int[]) av2);\r
- else if (long.class.equals(c1))\r
- return Arrays.equals((long[]) av1, (long[]) av2);\r
- else if (float.class.equals(c1))\r
- return Arrays.equals((float[]) av1, (float[]) av2);\r
- else if (double.class.equals(c1))\r
- return Arrays.equals((double[]) av1, (double[]) av2);\r
- throw new RuntimeException("??? Contact application querySupport.");\r
- }\r
-\r
-\r
-\r
- final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {\r
-\r
- try {\r
-\r
- Query query = entry.getQuery();\r
-\r
- if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);\r
-\r
- entry.prepareRecompute(querySupport);\r
- \r
- ReadGraphImpl parentGraph = graph.withParent(entry);\r
-\r
- query.recompute(parentGraph, this, entry);\r
-\r
- if(entry.isExcepted()) return ListenerEntry.NO_VALUE;\r
-\r
- Object newValue = entry.getResult();\r
-\r
- if (ListenerEntry.NO_VALUE == oldValue) {\r
- if(DebugPolicy.CHANGES) {\r
- System.out.println("C " + query);\r
- System.out.println("- " + oldValue);\r
- System.out.println("- " + newValue);\r
- }\r
- return newValue;\r
- }\r
-\r
- boolean changed = false;\r
-\r
- if (newValue != null) {\r
- if (newValue.getClass().isArray()) {\r
- changed = !arrayEquals(newValue, oldValue);\r
- } else {\r
- changed = !newValue.equals(oldValue);\r
- }\r
- } else\r
- changed = (oldValue != null);\r
-\r
- if(DebugPolicy.CHANGES && changed) {\r
- System.out.println("C " + query);\r
- System.out.println("- " + oldValue);\r
- System.out.println("- " + newValue);\r
- }\r
-\r
- return changed ? newValue : ListenerEntry.NOT_CHANGED;\r
-\r
- } catch (Throwable t) {\r
-\r
- Logger.defaultLogError(t);\r
- entry.except(t);\r
- return ListenerEntry.NO_VALUE;\r
-\r
- }\r
-\r
- }\r
-\r
- public boolean hasScheduledUpdates() {\r
- return !scheduledListeners.isEmpty();\r
- }\r
-\r
- public void performScheduledUpdates(WriteGraphImpl graph) {\r
-\r
- assert (!updating);\r
- assert (!collecting);\r
- assert (!firingListeners);\r
-\r
- firingListeners = true;\r
-\r
- try {\r
-\r
- // Performing may cause further events to be scheduled.\r
- while (!scheduledListeners.isEmpty()) {\r
-\r
-// graph.restart();\r
-// graph.state.barrier.inc();\r
-\r
- // Clone current events to make new entries possible during\r
- // firing.\r
- THashSet<ListenerEntry> entries = scheduledListeners;\r
- scheduledListeners = new THashSet<ListenerEntry>();\r
-\r
- ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();\r
-\r
- for (ListenerEntry listenerEntry : entries) {\r
-\r
- if (pruneListener(listenerEntry)) {\r
- if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);\r
- continue;\r
- }\r
-\r
- final CacheEntry entry = listenerEntry.entry;\r
- assert (entry != null);\r
-\r
- Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());\r
-\r
- if (newValue != ListenerEntry.NOT_CHANGED) {\r
- if(DebugPolicy.LISTENER)\r
- System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);\r
- schedule.add(listenerEntry);\r
- listenerEntry.setLastKnown(entry.getResult());\r
- }\r
-\r
- }\r
-\r
- for(ListenerEntry listenerEntry : schedule) {\r
- final CacheEntry entry = listenerEntry.entry;\r
- if(DebugPolicy.LISTENER)\r
- System.out.println("Firing " + listenerEntry.procedure);\r
- try {\r
- if(DebugPolicy.LISTENER)\r
- System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);\r
- entry.performFromCache(graph, this, listenerEntry.procedure);\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
- }\r
-\r
-// graph.state.barrier.dec();\r
-// graph.waitAsync(null);\r
-// graph.state.barrier.assertReady();\r
-\r
- }\r
-\r
- } finally {\r
- firingListeners = false;\r
- }\r
-\r
- }\r
-\r
- /**\r
- * \r
- * @return true if this entry still has listeners\r
- */\r
- public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {\r
-\r
- assert (!collecting);\r
- assert (!updating);\r
- updating = true;\r
-\r
- boolean hadListeners = false;\r
- boolean listenersUnknown = false;\r
-\r
- try {\r
-\r
- assert(entry != null);\r
- LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();\r
- IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();\r
- todo.add(new UpdateEntry(null, entry, 0));\r
-\r
- while(true) {\r
-\r
- // Walk the tree and collect immediate updates\r
- while (!todo.isEmpty()) {\r
- UpdateEntry e = todo.pop();\r
- hadListeners |= updateQuery(e, todo, immediates);\r
- }\r
-\r
- if(immediates.isEmpty()) break;\r
-\r
- // Evaluate all immediate updates and collect parents to update\r
- for(CacheEntry immediate : immediates.values()) {\r
-\r
- if(immediate.isDiscarded()) {\r
- continue;\r
- }\r
-\r
- if(immediate.isExcepted()) {\r
-\r
- Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);\r
- if (newValue != ListenerEntry.NOT_CHANGED)\r
- updateParents(0, immediate, todo);\r
-\r
- } else {\r
-\r
- Object oldValue = immediate.getResult();\r
- Object newValue = compareTo(graph, immediate, oldValue);\r
-\r
- if (newValue != ListenerEntry.NOT_CHANGED) {\r
- updateParents(0, immediate, todo);\r
- } else {\r
- // If not changed, keep the old value\r
- immediate.setResult(oldValue);\r
- listenersUnknown = true;\r
- }\r
-\r
- }\r
-\r
- }\r
- immediates.clear();\r
-\r
- }\r
-\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- }\r
-\r
- assert (updating);\r
- updating = false;\r
-\r
- return hadListeners | listenersUnknown;\r
-\r
- }\r
-\r
- volatile public boolean dirty = false;\r
-\r
- private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();\r
- private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();\r
- private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();\r
- // Maybe use a mutex from util.concurrent?\r
- private Object primitiveUpdateLock = new Object();\r
- private THashSet scheduledPrimitiveUpdates = new THashSet();\r
-\r
- public void performDirtyUpdates(final ReadGraphImpl graph) {\r
-\r
- dirty = false;\r
- lastInvalidate = 0;\r
-\r
- if (Development.DEVELOPMENT) {\r
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
- System.err.println("== Query update ==");\r
- }\r
- }\r
-\r
- // Special case - one statement\r
- if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {\r
-\r
- long arg0 = scheduledObjectUpdates.getFirst();\r
-\r
- final int subject = (int)(arg0 >>> 32);\r
- final int predicate = (int)(arg0 & 0xffffffff);\r
-\r
- for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);\r
- for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);\r
- for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);\r
-\r
- if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {\r
- PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);\r
- if(principalTypes != null) update(graph, principalTypes);\r
- Types types = Types.entry(QueryProcessor.this, subject);\r
- if(types != null) update(graph, types);\r
- }\r
-\r
- if(predicate == subrelationOf) {\r
- SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);\r
- if(superRelations != null) update(graph, superRelations);\r
- }\r
-\r
- DirectPredicates dp = DirectPredicates.entry(QueryProcessor.this, subject);\r
- if(dp != null) update(graph, dp);\r
- OrderedSet os = OrderedSet.entry(QueryProcessor.this, predicate);\r
- if(os != null) update(graph, os);\r
-\r
- scheduledObjectUpdates.clear();\r
- return;\r
-\r
- }\r
-\r
- // Special case - one value\r
- if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {\r
-\r
- int arg0 = scheduledValueUpdates.getFirst();\r
-\r
- ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);\r
- if(valueQuery != null) update(graph, valueQuery);\r
-\r
- scheduledValueUpdates.clear();\r
- return;\r
-\r
- }\r
-\r
- final TIntHashSet predicates = new TIntHashSet();\r
- final TIntHashSet orderedSets = new TIntHashSet();\r
-\r
- THashSet primitiveUpdates;\r
- synchronized (primitiveUpdateLock) {\r
- primitiveUpdates = scheduledPrimitiveUpdates;\r
- scheduledPrimitiveUpdates = new THashSet();\r
- }\r
-\r
- primitiveUpdates.forEach(new TObjectProcedure() {\r
-\r
- @Override\r
- public boolean execute(Object arg0) {\r
-\r
- ExternalReadEntry query = (ExternalReadEntry)externalReadMap.get(arg0);\r
- if (query != null) {\r
- boolean listening = update(graph, query);\r
- if (!listening && !query.hasParents()) {\r
- externalReadMap.remove(arg0);\r
- query.discard();\r
- }\r
- }\r
- return true;\r
- }\r
-\r
- });\r
-\r
- scheduledValueUpdates.forEach(new TIntProcedure() {\r
-\r
- @Override\r
- public boolean execute(int arg0) {\r
- ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);\r
- if(valueQuery != null) update(graph, valueQuery);\r
- return true;\r
- }\r
-\r
- });\r
-\r
- scheduledInvalidates.forEach(new TIntProcedure() {\r
-\r
- @Override\r
- public boolean execute(int resource) {\r
- \r
- ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, resource);\r
- if(valueQuery != null) update(graph, valueQuery);\r
- \r
- PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, resource);\r
- if(principalTypes != null) update(graph, principalTypes);\r
- Types types = Types.entry(QueryProcessor.this, resource);\r
- if(types != null) update(graph, types);\r
-\r
- SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);\r
- if(superRelations != null) update(graph, superRelations);\r
-\r
- predicates.add(resource);\r
- \r
- return true;\r
- }\r
-\r
- });\r
-\r
- scheduledObjectUpdates.forEach(new TLongProcedure() {\r
-\r
- @Override\r
- public boolean execute(long arg0) {\r
-\r
- final int subject = (int)(arg0 >>> 32);\r
- final int predicate = (int)(arg0 & 0xffffffff);\r
-\r
- if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {\r
- PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);\r
- if(principalTypes != null) update(graph, principalTypes);\r
- Types types = Types.entry(QueryProcessor.this, subject);\r
- if(types != null) update(graph, types);\r
- }\r
-\r
- if(predicate == subrelationOf) {\r
- SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);\r
- if(superRelations != null) update(graph, superRelations);\r
- }\r
-\r
- predicates.add(subject);\r
- orderedSets.add(predicate);\r
-\r
- return true;\r
-\r
- }\r
-\r
- });\r
-\r
- predicates.forEach(new TIntProcedure() {\r
-\r
- @Override\r
- public boolean execute(final int subject) {\r
-\r
- for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);\r
- for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);\r
- for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);\r
-\r
- DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);\r
- if(entry != null) update(graph, entry);\r
-\r
- return true;\r
-\r
- }\r
-\r
- });\r
-\r
- orderedSets.forEach(new TIntProcedure() {\r
-\r
- @Override\r
- public boolean execute(int orderedSet) {\r
-\r
- OrderedSet entry = OrderedSet.entry(QueryProcessor.this, orderedSet);\r
- if(entry != null) update(graph, entry);\r
-\r
- return true;\r
-\r
- }\r
-\r
- });\r
-\r
- // for (Integer subject : predicates) {\r
- // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);\r
- // if(entry != null) update(graph, entry);\r
- // }\r
-\r
-\r
- if (Development.DEVELOPMENT) {\r
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
- System.err.println("== Query update ends ==");\r
- }\r
- }\r
-\r
- scheduledValueUpdates.clear();\r
- scheduledObjectUpdates.clear();\r
- scheduledInvalidates.clear();\r
-\r
- }\r
-\r
- public void updateValue(final int resource) {\r
- scheduledValueUpdates.add(resource);\r
- dirty = true;\r
- }\r
-\r
- public void updateStatements(final int resource, final int predicate) {\r
- scheduledObjectUpdates.add((((long)resource) << 32) + predicate);\r
- dirty = true;\r
- }\r
- \r
- private int lastInvalidate = 0;\r
- \r
- public void invalidateResource(final int resource) {\r
- if(lastInvalidate == resource) return;\r
- scheduledValueUpdates.add(resource);\r
- lastInvalidate = resource;\r
- dirty = true;\r
- }\r
-\r
- public void updatePrimitive(final ExternalRead primitive) {\r
-\r
- // External reads may be updated from arbitrary threads.\r
- // Synchronize to prevent race-conditions.\r
- synchronized (primitiveUpdateLock) {\r
- scheduledPrimitiveUpdates.add(primitive);\r
- }\r
- querySupport.dirtyPrimitives();\r
-\r
- }\r
-\r
- @Override\r
- public synchronized String toString() {\r
- return "QueryProvider [size = " + size + ", hits = " + hits + " misses = " + misses + ", updates = " + updates + "]";\r
- }\r
-\r
- @Override\r
- protected void doDispose() {\r
-\r
- for(int index = 0; index < THREADS; index++) { \r
- executors[index].dispose();\r
- }\r
-\r
- // First just wait\r
- for(int i=0;i<100;i++) {\r
-\r
- boolean alive = false;\r
- for(int index = 0; index < THREADS; index++) { \r
- alive |= executors[index].isAlive();\r
- }\r
- if(!alive) return;\r
- try {\r
- Thread.sleep(5);\r
- } catch (InterruptedException e) {\r
- Logger.defaultLogError(e);\r
- }\r
-\r
- }\r
-\r
- // Then start interrupting\r
- for(int i=0;i<100;i++) {\r
-\r
- boolean alive = false;\r
- for(int index = 0; index < THREADS; index++) { \r
- alive |= executors[index].isAlive();\r
- }\r
- if(!alive) return;\r
- for(int index = 0; index < THREADS; index++) {\r
- executors[index].interrupt();\r
- }\r
- }\r
-\r
- // // Then just destroy\r
- // for(int index = 0; index < THREADS; index++) {\r
- // executors[index].destroy();\r
- // }\r
-\r
- for(int index = 0; index < THREADS; index++) {\r
- try {\r
- executors[index].join(5000);\r
- } catch (InterruptedException e) {\r
- Logger.defaultLogError("QueryThread " + index + " will not die.", e);\r
- }\r
- executors[index] = null;\r
- }\r
-\r
- }\r
-\r
- public int getHits() {\r
- return hits;\r
- }\r
-\r
- public int getMisses() {\r
- return misses;\r
- }\r
-\r
- public int getSize() {\r
- return size;\r
- }\r
-\r
- public Set<Long> getReferencedClusters() {\r
- HashSet<Long> result = new HashSet<Long>();\r
- for (CacheEntry entry : objectsMap.values()) {\r
- Objects query = (Objects) entry.getQuery();\r
- result.add(querySupport.getClusterId(query.r1()));\r
- }\r
- for (CacheEntry entry : directPredicatesMap.values()) {\r
- DirectPredicates query = (DirectPredicates) entry.getQuery();\r
- result.add(querySupport.getClusterId(query.id));\r
- }\r
- for (CacheEntry entry : valueMap.values()) {\r
- ValueQuery query = (ValueQuery) entry.getQuery();\r
- result.add(querySupport.getClusterId(query.id));\r
- }\r
- return result;\r
- }\r
-\r
- public void assertDone() {\r
- }\r
-\r
- CacheCollectionResult allCaches(CacheCollectionResult result) {\r
-\r
- int level = Integer.MAX_VALUE;\r
- directPredicatesMap.values(level, result);\r
- principalTypesMap.values(level, result);\r
- for(CacheEntryBase e : uriToResourceMap.values())\r
- if(e.getLevel() <= level)\r
- result.add(e);\r
- for(CacheEntryBase e : namespaceIndexMap22.values())\r
- if(e.getLevel() <= level)\r
- result.add(e);\r
- projectsMap.values(level, result);\r
- \r
- relationInfoMap.values(level, result);\r
- superTypesMap.values(level, result);\r
- typeHierarchyMap.values(level, result);\r
- superRelationsMap.values(level, result);\r
- typesMap.values(level, result);\r
-\r
- valueMap.values(level, result);\r
- directObjectsMap.values(level, result);\r
- objectsMap.values(level, result);\r
- orderedSetMap.values(level, result);\r
- predicatesMap.values(level, result);\r
-\r
- statementsMap.values(level, result);\r
- assertedPredicatesMap.values(level, result);\r
- assertedStatementsMap.values(level, result);\r
- externalReadMap.values(level, result);\r
- asyncReadMap.values(level, result);\r
- \r
- readMap.values(level, result);\r
- asyncMultiReadMap.values(level, result);\r
- multiReadMap.values(level, result);\r
-\r
- return result;\r
-\r
- }\r
-\r
- public void printDiagnostics() {\r
- }\r
-\r
- public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {\r
- querySupport.requestCluster(graph, clusterId, runnable);\r
- }\r
-\r
- public int clean() {\r
- collector.collect(0, Integer.MAX_VALUE);\r
- return size;\r
- }\r
-\r
- public void clean(final Collection<ExternalRead<?>> requests) {\r
- QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {\r
- Iterator<ExternalRead<?>> iterator = requests.iterator();\r
- @Override\r
- public CacheCollectionResult allCaches() {\r
- throw new UnsupportedOperationException();\r
- }\r
- @Override\r
- public CacheEntryBase iterate(int level) {\r
- if(iterator.hasNext()) {\r
- ExternalRead<?> request = iterator.next();\r
- ExternalReadEntry entry = externalReadMap.get(request);\r
- if (entry != null) return entry;\r
- else return iterate(level);\r
- } else {\r
- iterator = requests.iterator();\r
- return null;\r
- }\r
- }\r
- @Override\r
- public void remove() {\r
- throw new UnsupportedOperationException();\r
- }\r
- @Override\r
- public void setLevel(CacheEntryBase entry, int level) {\r
- throw new UnsupportedOperationException();\r
- }\r
- @Override\r
- public Collection<CacheEntry> getRootList() {\r
- ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());\r
- for (ExternalRead<?> request : requests) {\r
- ExternalReadEntry entry = externalReadMap.get(request);\r
- if (entry != null)\r
- result.add(entry);\r
- }\r
- return result;\r
- }\r
- @Override\r
- public int getCurrentSize() {\r
- return size;\r
- }\r
- @Override\r
- public int calculateCurrentSize() {\r
- // This tells the collector to attempt collecting everything.\r
- return Integer.MAX_VALUE;\r
- }\r
- @Override\r
- public boolean start(boolean flush) {\r
- return true;\r
- }\r
- };\r
- new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);\r
- }\r
-\r
- public void scanPending() {\r
-\r
- ArrayList<CacheEntry> entries = new ArrayList<CacheEntry>();\r
-\r
- entries.addAll(directPredicatesMap.values());\r
- entries.addAll(principalTypesMap.values());\r
- entries.addAll(uriToResourceMap.values());\r
- entries.addAll(namespaceIndexMap22.values());\r
- entries.addAll(projectsMap.values());\r
- entries.addAll(relationInfoMap.values());\r
- entries.addAll(superTypesMap.values());\r
- entries.addAll(superRelationsMap.values());\r
- entries.addAll(typesMap.values());\r
- entries.addAll(valueMap.values());\r
- entries.addAll(directObjectsMap.values());\r
- entries.addAll(objectsMap.values());\r
- entries.addAll(orderedSetMap.values());\r
- entries.addAll(predicatesMap.values());\r
- entries.addAll(orderedSetMap.values());\r
- entries.addAll(statementsMap.values());\r
- // entries.addAll(assertedObjectsMap.values());\r
- entries.addAll(assertedPredicatesMap.values());\r
- entries.addAll(assertedStatementsMap.values());\r
- entries.addAll(externalReadMap.values());\r
- entries.addAll(asyncReadMap.values());\r
- entries.addAll(externalReadMap.values());\r
- entries.addAll(readMap.values());\r
- entries.addAll(asyncMultiReadMap.values());\r
- entries.addAll(multiReadMap.values());\r
- entries.addAll(readMap.values());\r
- System.out.println(entries.size() + " entries.");\r
- for(Object e : entries) {\r
- if(e instanceof CacheEntry) {\r
- CacheEntry en = (CacheEntry)e;\r
- if(en.isPending()) System.out.println("pending " + e);\r
- if(en.isExcepted()) System.out.println("excepted " + e);\r
- if(en.isDiscarded()) System.out.println("discarded " + e);\r
- if(en.isRefuted()) System.out.println("refuted " + e);\r
- if(en.isFresh()) System.out.println("fresh " + e);\r
- } else {\r
- //System.out.println("Unknown object " + e);\r
- }\r
- }\r
-\r
- }\r
-\r
- public ReadGraphImpl graphForVirtualRequest() {\r
- return ReadGraphImpl.createAsync(this);\r
- }\r
-\r
- \r
- private HashMap<Resource, Class<?>> builtinValues;\r
- \r
- public Class<?> getBuiltinValue(Resource r) {\r
- if(builtinValues == null) initBuiltinValues();\r
- return builtinValues.get(r);\r
- }\r
-\r
- Exception callerException = null;\r
-\r
- public interface AsyncBarrier {\r
- public void inc(); \r
- public void dec();\r
- // public void inc(String debug); \r
- // public void dec(String debug);\r
- }\r
-\r
-// final public QueryProcessor processor;\r
-// final public QuerySupport support;\r
-\r
- // boolean disposed = false;\r
-\r
- private void initBuiltinValues() {\r
-\r
- Layer0 b = getSession().peekService(Layer0.class);\r
- if(b == null) return;\r
-\r
- builtinValues = new HashMap<Resource, Class<?>>();\r
-\r
- builtinValues.put(b.String, String.class);\r
- builtinValues.put(b.Double, Double.class);\r
- builtinValues.put(b.Float, Float.class);\r
- builtinValues.put(b.Long, Long.class);\r
- builtinValues.put(b.Integer, Integer.class);\r
- builtinValues.put(b.Byte, Byte.class);\r
- builtinValues.put(b.Boolean, Boolean.class);\r
-\r
- builtinValues.put(b.StringArray, String[].class);\r
- builtinValues.put(b.DoubleArray, double[].class);\r
- builtinValues.put(b.FloatArray, float[].class);\r
- builtinValues.put(b.LongArray, long[].class);\r
- builtinValues.put(b.IntegerArray, int[].class);\r
- builtinValues.put(b.ByteArray, byte[].class);\r
- builtinValues.put(b.BooleanArray, boolean[].class);\r
-\r
- }\r
-\r
-// public ReadGraphSupportImpl(final QueryProcessor provider2) {\r
-//\r
-// if (null == provider2) {\r
-// this.processor = null;\r
-// support = null;\r
-// return;\r
-// }\r
-// this.processor = provider2;\r
-// support = provider2.getCore();\r
-// initBuiltinValues();\r
-//\r
-// }\r
-\r
-// final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {\r
-// return new ReadGraphSupportImpl(impl.processor);\r
-// }\r
-\r
- @Override\r
- final public Session getSession() {\r
- return session;\r
- }\r
- \r
- final public ResourceSupport getResourceSupport() {\r
- return resourceSupport;\r
- }\r
-\r
- @Override\r
- final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- IntProcedure ip = new IntProcedure() {\r
-\r
- AtomicBoolean first = new AtomicBoolean(true);\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int i) {\r
- try {\r
- if(first.get()) {\r
- procedure.execute(graph, querySupport.getResource(i));\r
- } else {\r
- procedure.execute(impl.newRestart(graph), querySupport.getResource(i));\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.finished(graph);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.finished(impl.newRestart(graph));\r
- }\r
-\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.exception(graph, t);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.exception(impl.newRestart(graph), t);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- };\r
-\r
- int sId = querySupport.getId(subject);\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Predicates#" + sId);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- Predicates.queryEach(impl, sId, this, impl.parent, listener, ip);\r
-\r
- }\r
-\r
- @Override\r
- final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- Predicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int i) {\r
- try {\r
- procedure.execute(querySupport.getResource(i));\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- try {\r
- procedure.finished();\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- });\r
-\r
- }\r
- \r
- @Override\r
- final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {\r
-\r
- assert(subject != null);\r
- \r
- return Predicates.queryEach2(impl, querySupport.getId(subject), this, impl.parent);\r
-\r
- }\r
- \r
-\r
- @Override\r
- final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,\r
- final Resource predicate, final MultiProcedure<Statement> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
- try {\r
- procedure.execute(querySupport.getStatement(s, p, o));\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- try {\r
- procedure.finished();\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,\r
- final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {\r
-\r
- boolean first = true;\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
- try {\r
- if(first) {\r
- procedure.execute(graph, querySupport.getStatement(s, p, o));\r
- } else {\r
- procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
-\r
- try {\r
- if(first) {\r
- first = false;\r
- procedure.finished(graph);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.finished(impl.newRestart(graph));\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
-\r
- try {\r
- if(first) {\r
- first = false;\r
- procedure.exception(graph, t);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.exception(impl.newRestart(graph), t);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-\r
- }\r
-\r
- };\r
-\r
- int sId = querySupport.getId(subject);\r
- int pId = querySupport.getId(predicate);\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);\r
-\r
- }\r
-\r
- @Override\r
- final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,\r
- final Resource predicate, final StatementProcedure procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {\r
-\r
- boolean first = true;\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
- try {\r
- if(first) {\r
- procedure.execute(graph, s, p, o);\r
- } else {\r
- procedure.execute(impl.newRestart(graph), s, p, o);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
-\r
- try {\r
- if(first) {\r
- first = false;\r
- procedure.finished(graph);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.finished(impl.newRestart(graph));\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
-\r
- try {\r
- if(first) {\r
- first = false;\r
- procedure.exception(graph, t);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.exception(impl.newRestart(graph), t);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-\r
- }\r
-\r
- };\r
-\r
- int sId = querySupport.getId(subject);\r
- int pId = querySupport.getId(predicate);\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);\r
-\r
- }\r
- \r
- @Override\r
- final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
- forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {\r
-\r
- private Set<Statement> current = null;\r
- private Set<Statement> run = new HashSet<Statement>();\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Statement result) {\r
-\r
- boolean found = false;\r
-\r
- if(current != null) {\r
-\r
- found = current.remove(result);\r
-\r
- }\r
-\r
- if(!found) procedure.add(graph, result);\r
-\r
- run.add(result);\r
-\r
- }\r
-\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
-\r
- if(current != null) { \r
- for(Statement r : current) procedure.remove(graph, r);\r
- }\r
-\r
- current = run;\r
-\r
- run = new HashSet<Statement>();\r
-\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- procedure.exception(graph, t);\r
- }\r
-\r
- @Override\r
- public boolean isDisposed() {\r
- return procedure.isDisposed();\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,\r
- final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
- try {\r
- procedure.execute(graph, querySupport.getStatement(s, p, o));\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- try {\r
- procedure.finished(graph);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- private static ListenerBase getListenerBase(Object procedure) {\r
- if(procedure instanceof ListenerBase) return (ListenerBase)procedure;\r
- else return null;\r
- }\r
-\r
- @Override\r
- final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- Objects.runner(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int i) {\r
- try {\r
- procedure.execute(querySupport.getResource(i));\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- try {\r
- procedure.finished();\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- System.out.println("forEachObject exception " + t);\r
- try {\r
- procedure.exception(t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
-\r
-// @Override\r
-// final public void forEachDirectObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {\r
-//\r
-// assert(subject != null);\r
-// assert(predicate != null);\r
-// assert(procedure != null);\r
-//\r
-// final ListenerBase listener = getListenerBase(procedure);\r
-//\r
-// int sId = querySupport.getId(subject);\r
-// int pId = querySupport.getId(predicate);\r
-//\r
-// MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, support);\r
-//\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectObjects" + sId + "#" + pId);\r
-// else impl.state.barrier.inc(null, null);\r
-//\r
-// // final Exception caller = new Exception();\r
-//\r
-// // final Pair<Exception, Exception> exceptions = Pair.make(callerException, new Exception());\r
-//\r
-// DirectObjects.queryEach(impl, sId, pId, processor, impl.parent, listener, proc);\r
-//\r
-// }\r
-\r
- @Override\r
- final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);\r
-\r
- int sId = querySupport.getId(subject);\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectPredicates" + sId);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- DirectPredicates.queryEach(impl, sId, this, impl.parent, listener, proc);\r
-\r
- }\r
-\r
- @Override\r
- final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);\r
-\r
- }\r
-\r
- @Override\r
- final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);\r
-\r
- }\r
-\r
- private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);\r
-\r
- @Override\r
- final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {\r
-\r
- forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {\r
-\r
- private Resource single = null;\r
-\r
- @Override\r
- public synchronized void execute(AsyncReadGraph graph, Resource result) {\r
- if(single == null) {\r
- single = result;\r
- } else {\r
- single = INVALID_RESOURCE;\r
- }\r
- }\r
-\r
- @Override\r
- public synchronized void finished(AsyncReadGraph graph) {\r
- if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);\r
- else procedure.execute(graph, single);\r
- }\r
-\r
- @Override\r
- public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {\r
- procedure.exception(graph, throwable);\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {\r
- \r
- final int sId = querySupport.getId(subject);\r
- final int pId = querySupport.getId(predicate);\r
-\r
- Objects.runner(impl, sId, pId, impl.parent, listener, procedure);\r
- \r
- }\r
- \r
- final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {\r
-\r
- final int sId = querySupport.getId(subject);\r
- final int pId = querySupport.getId(predicate);\r
-\r
- return Objects.runner2(impl, sId, pId, impl.parent);\r
- \r
- }\r
-\r
- final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- if(impl.parent != null || listener != null) {\r
-\r
- IntProcedure ip = new IntProcedure() {\r
-\r
- AtomicBoolean first = new AtomicBoolean(true);\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int i) {\r
- try {\r
- if(first.get()) {\r
- procedure.execute(impl, querySupport.getResource(i));\r
- } else {\r
- procedure.execute(impl.newRestart(graph), querySupport.getResource(i));\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.finished(impl);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.finished(impl.newRestart(graph));\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec(this);\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return "forEachObject with " + procedure;\r
- }\r
-\r
- };\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- forEachObject(impl, subject, predicate, listener, ip);\r
-\r
- } else {\r
-\r
- IntProcedure ip = new IntProcedure() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int i) {\r
- procedure.execute(graph, querySupport.getResource(i));\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- procedure.finished(graph);\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- procedure.exception(graph, t);\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return "forEachObject with " + procedure;\r
- }\r
-\r
- };\r
-\r
- forEachObject(impl, subject, predicate, listener, ip);\r
-\r
- }\r
-\r
- }\r
-\r
- @Override\r
- final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
- forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {\r
-\r
- private Set<Resource> current = null;\r
- private Set<Resource> run = new HashSet<Resource>();\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Resource result) {\r
-\r
- boolean found = false;\r
-\r
- if(current != null) {\r
-\r
- found = current.remove(result);\r
-\r
- }\r
-\r
- if(!found) procedure.add(graph, result);\r
-\r
- run.add(result);\r
-\r
- }\r
-\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
-\r
- if(current != null) { \r
- for(Resource r : current) procedure.remove(graph, r);\r
- }\r
-\r
- current = run;\r
-\r
- run = new HashSet<Resource>();\r
-\r
- }\r
-\r
- @Override\r
- public boolean isDisposed() {\r
- return procedure.isDisposed();\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- procedure.exception(graph, t);\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return "forObjectSet " + procedure;\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {\r
-\r
- private Set<Resource> current = null;\r
- private Set<Resource> run = new HashSet<Resource>();\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Resource result) {\r
-\r
- boolean found = false;\r
-\r
- if(current != null) {\r
-\r
- found = current.remove(result);\r
-\r
- }\r
-\r
- if(!found) procedure.add(graph, result);\r
-\r
- run.add(result);\r
-\r
- }\r
-\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
-\r
- if(current != null) { \r
- for(Resource r : current) procedure.remove(graph, r);\r
- }\r
-\r
- current = run;\r
-\r
- run = new HashSet<Resource>();\r
-\r
- }\r
-\r
- @Override\r
- public boolean isDisposed() {\r
- return procedure.isDisposed();\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- procedure.exception(graph, t);\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return "forPredicateSet " + procedure;\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {\r
-\r
- private Set<Resource> current = null;\r
- private Set<Resource> run = new HashSet<Resource>();\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Resource result) {\r
-\r
- boolean found = false;\r
-\r
- if(current != null) {\r
-\r
- found = current.remove(result);\r
-\r
- }\r
-\r
- if(!found) procedure.add(graph, result);\r
-\r
- run.add(result);\r
-\r
- }\r
-\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
-\r
- if(current != null) { \r
- for(Resource r : current) procedure.remove(graph, r);\r
- }\r
-\r
- current = run;\r
-\r
- run = new HashSet<Resource>();\r
-\r
- }\r
-\r
- @Override\r
- public boolean isDisposed() {\r
- return procedure.isDisposed();\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- procedure.exception(graph, t);\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return "forPrincipalTypeSet " + procedure;\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
- forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {\r
-\r
- private Set<Resource> current = null;\r
- private Set<Resource> run = new HashSet<Resource>();\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Resource result) {\r
-\r
- boolean found = false;\r
-\r
- if(current != null) {\r
-\r
- found = current.remove(result);\r
-\r
- }\r
-\r
- if(!found) procedure.add(graph, result);\r
-\r
- run.add(result);\r
-\r
- }\r
-\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
-\r
- if(current != null) { \r
- for(Resource r : current) procedure.remove(graph, r);\r
- }\r
-\r
- current = run;\r
-\r
- run = new HashSet<Resource>();\r
-\r
- }\r
-\r
- @Override\r
- public boolean isDisposed() {\r
- return procedure.isDisposed();\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- procedure.exception(graph, t);\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return "forObjectSet " + procedure;\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
- forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {\r
-\r
- private Set<Statement> current = null;\r
- private Set<Statement> run = new HashSet<Statement>();\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Statement result) {\r
-\r
- boolean found = false;\r
-\r
- if(current != null) {\r
-\r
- found = current.remove(result);\r
-\r
- }\r
-\r
- if(!found) procedure.add(graph, result);\r
-\r
- run.add(result);\r
-\r
- }\r
-\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
-\r
- if(current != null) { \r
- for(Statement s : current) procedure.remove(graph, s);\r
- }\r
-\r
- current = run;\r
-\r
- run = new HashSet<Statement>();\r
-\r
- }\r
-\r
- @Override\r
- public boolean isDisposed() {\r
- return procedure.isDisposed();\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- procedure.exception(graph, t);\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return "forStatementSet " + procedure;\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,\r
- final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedure() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
- try {\r
- procedure.execute(graph, querySupport.getResource(o));\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- try { \r
- procedure.finished(graph);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- IntProcedure ip = new IntProcedure() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int i) {\r
- try {\r
- procedure.execute(graph, querySupport.getResource(i));\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- try {\r
- procedure.finished(graph);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec(this);\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec(this);\r
- }\r
-\r
- };\r
-\r
- int sId = querySupport.getId(subject);\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- PrincipalTypes.queryEach(impl, sId, this, impl.parent, listener, ip);\r
-\r
- }\r
-\r
- @Override\r
- final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- PrincipalTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int i) {\r
- try {\r
- procedure.execute(querySupport.getResource(i));\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- try {\r
- procedure.finished();\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {\r
-\r
- AtomicBoolean first = new AtomicBoolean(true);\r
-\r
- @Override\r
- public void execute(final ReadGraphImpl graph, IntSet set) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.execute(graph, set);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.execute(impl.newRestart(graph), set);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.exception(graph, t);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.exception(impl.newRestart(graph), t);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- };\r
-\r
- int sId = querySupport.getId(subject);\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Types" + sId);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- Types.queryEach(impl, sId, this, impl.parent, listener, ip);\r
-\r
- }\r
- \r
- @Override\r
- final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {\r
-\r
- assert(subject != null);\r
- \r
- return Types.queryEach2(impl, querySupport.getId(subject), this, impl.parent);\r
-\r
- }\r
-\r
- @Override\r
- final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- RelationInfoQuery.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<RelationInfo>() {\r
-\r
- AtomicBoolean first = new AtomicBoolean(true);\r
-\r
- @Override\r
- public void execute(final ReadGraphImpl graph, RelationInfo set) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.execute(graph, set);\r
-// impl.state.barrier.dec();\r
- } else {\r
- procedure.execute(impl.newRestart(graph), set);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.exception(graph, t);\r
-// impl.state.barrier.dec("ReadGraphSupportImpl.1353");\r
- } else {\r
- procedure.exception(impl.newRestart(graph), t);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- SuperTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<IntSet>() {\r
-\r
- AtomicBoolean first = new AtomicBoolean(true);\r
-\r
- @Override\r
- public void execute(final ReadGraphImpl graph, IntSet set) {\r
-// final HashSet<Resource> result = new HashSet<Resource>();\r
-// set.forEach(new TIntProcedure() {\r
-//\r
-// @Override\r
-// public boolean execute(int type) {\r
-// result.add(querySupport.getResource(type));\r
-// return true;\r
-// }\r
-//\r
-// });\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.execute(graph, set);\r
-// impl.state.barrier.dec();\r
- } else {\r
- procedure.execute(impl.newRestart(graph), set);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.exception(graph, t);\r
-// impl.state.barrier.dec();\r
- } else {\r
- procedure.exception(impl.newRestart(graph), t);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- IntProcedure ip = new IntProcedureAdapter() {\r
-\r
- @Override\r
- public void execute(final ReadGraphImpl graph, int superRelation) {\r
- try {\r
- procedure.execute(graph, querySupport.getResource(superRelation));\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(final ReadGraphImpl graph) {\r
- try {\r
- procedure.finished(graph);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec(this);\r
- }\r
-\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec(this);\r
- }\r
-\r
- };\r
-\r
- int sId = querySupport.getId(subject); \r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);\r
-\r
- }\r
-\r
- @Override\r
- final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);\r
-\r
- }\r
-\r
- @Override\r
- final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {\r
-\r
- @Override\r
- public void execute(final ReadGraphImpl graph, IntSet set) {\r
-// final HashSet<Resource> result = new HashSet<Resource>();\r
-// set.forEach(new TIntProcedure() {\r
-//\r
-// @Override\r
-// public boolean execute(int type) {\r
-// result.add(querySupport.getResource(type));\r
-// return true;\r
-// }\r
-//\r
-// });\r
- try {\r
- procedure.execute(graph, set);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec(this);\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec(this);\r
- }\r
-\r
- };\r
-\r
- int sId = querySupport.getId(subject);\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- SuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);\r
-\r
- }\r
-\r
- final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {\r
-\r
- int sId = querySupport.getId(subject);\r
- return ValueQuery.queryEach(impl, sId, impl.parent);\r
-\r
- }\r
-\r
- final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {\r
-\r
- return ValueQuery.queryEach(impl, subject, impl.parent);\r
-\r
- }\r
-\r
- @Override\r
- final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {\r
-\r
- assert(subject != null);\r
-\r
- int sId = querySupport.getId(subject);\r
-\r
- if(procedure != null) {\r
- \r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {\r
-\r
- AtomicBoolean first = new AtomicBoolean(true);\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, byte[] result) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.execute(graph, result);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.execute(impl.newRestart(graph), result);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.exception(graph, t);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.exception(impl.newRestart(graph), t);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- };\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- return ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);\r
-\r
- } else {\r
-\r
- return ValueQuery.queryEach(impl, sId, impl.parent, null, null);\r
- \r
- }\r
-\r
- }\r
-\r
- @Override\r
- final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- if(impl.parent != null || listener != null) {\r
-\r
- InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {\r
-\r
- AtomicBoolean first = new AtomicBoolean(true);\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, byte[] result) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.execute(graph, result);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.execute(impl.newRestart(graph), result);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- if(first.compareAndSet(true, false)) {\r
- procedure.exception(graph, t);\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.exception(impl.newRestart(graph), t);\r
- }\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- };\r
-\r
- int sId = querySupport.getId(subject);\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);\r
-\r
- } else {\r
-\r
- InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, byte[] result) {\r
-\r
- procedure.execute(graph, result);\r
-\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
-\r
- procedure.exception(graph, t);\r
-\r
- }\r
-\r
- };\r
-\r
- int sId = querySupport.getId(subject);\r
-\r
- ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);\r
-\r
- }\r
-\r
- }\r
-\r
- @Override\r
- final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {\r
-\r
- assert(relation != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
- IntProcedure ip = new IntProcedure() {\r
-\r
- private int result = 0;\r
- \r
- final AtomicBoolean found = new AtomicBoolean(false);\r
- final AtomicBoolean done = new AtomicBoolean(false);\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- \r
- // Shall fire exactly once!\r
- if(done.compareAndSet(false, true)) {\r
- try {\r
- if(result == 0) {\r
- procedure.exception(graph, new NoInverseException(""));\r
-// impl.state.barrier.dec(this);\r
- } else {\r
- procedure.execute(graph, querySupport.getResource(result));\r
-// impl.state.barrier.dec(this);\r
- }\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- }\r
- }\r
- \r
- }\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int i) {\r
- \r
- if(found.compareAndSet(false, true)) {\r
- this.result = i;\r
- } else {\r
- // Shall fire exactly once!\r
- if(done.compareAndSet(false, true)) {\r
- try {\r
- procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));\r
-// impl.state.barrier.dec(this);\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- }\r
- }\r
- }\r
- \r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- // Shall fire exactly once!\r
- if(done.compareAndSet(false, true)) {\r
- try {\r
- procedure.exception(graph, t);\r
-// impl.state.barrier.dec(this);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
- }\r
-\r
- };\r
-\r
- int sId = querySupport.getId(relation);\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- Objects.runner(impl, sId, getInverseOf(), impl.parent, listener, ip);\r
-\r
- }\r
-\r
- @Override\r
- final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {\r
-\r
- assert(id != null);\r
- assert(procedure != null);\r
-\r
- InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, Integer result) {\r
- try {\r
- procedure.execute(graph, querySupport.getResource(result));\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec(this);\r
- } \r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
-\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec(this);\r
- }\r
-\r
- };\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- forResource(impl, id, impl.parent, ip);\r
-\r
- }\r
-\r
- @Override\r
- final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {\r
-\r
- assert(id != null);\r
- assert(procedure != null);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, Integer result) {\r
- try {\r
- procedure.execute(graph, querySupport.getResource(result)); \r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- } \r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- DirectPredicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
-\r
- boolean found = false;\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int object) {\r
- found = true;\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- try {\r
- procedure.execute(graph, found);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
- AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {\r
-\r
- boolean found = false;\r
-\r
- @Override\r
- synchronized public void execute(AsyncReadGraph graph, Resource resource) {\r
- found = true;\r
- }\r
-\r
- @Override\r
- synchronized public void finished(AsyncReadGraph graph) {\r
- try {\r
- procedure.execute(graph, found);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec(this);\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec(this);\r
- }\r
-\r
- };\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- forEachObject(impl, subject, predicate, ip);\r
-\r
- }\r
-\r
- @Override\r
- final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {\r
-\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(procedure != null);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {\r
-\r
- boolean found = false;\r
-\r
- @Override\r
- synchronized public void execute(AsyncReadGraph graph, Resource resource) {\r
- if(resource.equals(object)) found = true;\r
- }\r
-\r
- @Override\r
- synchronized public void finished(AsyncReadGraph graph) {\r
- try {\r
- procedure.execute(graph, found);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- ValueQuery.queryEach(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, byte[] object) {\r
- boolean result = object != null;\r
- try {\r
- procedure.execute(graph, result);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
-\r
- assert(subject != null);\r
- assert(procedure != null);\r
-\r
- final ListenerBase listener = getListenerBase(procedure);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- OrderedSet.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
-\r
- @Override\r
- public void exception(ReadGraphImpl graph, Throwable t) {\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public void execute(ReadGraphImpl graph, int i) {\r
- try {\r
- procedure.execute(graph, querySupport.getResource(i));\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(ReadGraphImpl graph) {\r
- try {\r
- procedure.finished(graph);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-// impl.state.barrier.dec();\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) {\r
-\r
- assert(request != null);\r
- assert(procedure != null);\r
-\r
-// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(request, "#" + request.toString() + ".1999");\r
-// else impl.state.barrier.inc(null, null);\r
-\r
- runAsyncRead(impl, request, parent, listener, procedure);\r
-\r
- }\r
-\r
- @Override\r
- final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {\r
-\r
- assert(graph != null);\r
- assert(request != null);\r
-\r
- final ReadEntry entry = readMap.get(request);\r
- if(entry != null && entry.isReady()) {\r
- return (T)entry.get(graph, this, null);\r
- } else {\r
- return request.perform(graph);\r
- }\r
-\r
- }\r
-\r
- final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {\r
-\r
- assert(graph != null);\r
- assert(request != null);\r
-\r
- final ExternalReadEntry<T> entry = externalReadMap.get(request);\r
- if(entry != null && entry.isReady()) {\r
- if(entry.isExcepted()) {\r
- Throwable t = (Throwable)entry.getResult();\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException(t);\r
- } else {\r
- return (T)entry.getResult();\r
- } \r
- } else {\r
-\r
- final DataContainer<T> result = new DataContainer<T>();\r
- final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
- \r
- request.register(graph, new Listener<T>() {\r
- \r
- @Override\r
- public void exception(Throwable t) {\r
- exception.set(t);\r
- }\r
-\r
- @Override\r
- public void execute(T t) {\r
- result.set(t);\r
- }\r
-\r
- @Override\r
- public boolean isDisposed() {\r
- return true;\r
- }\r
- \r
- });\r
- \r
- Throwable t = exception.get();\r
- if(t != null) {\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException(t);\r
- }\r
- \r
- return result.get();\r
-\r
- }\r
-\r
- }\r
- \r
- @Override\r
- final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {\r
-\r
- assert(graph != null);\r
- assert(request != null);\r
-\r
- final AsyncReadEntry entry = asyncReadMap.get(request);\r
- if(entry != null && entry.isReady()) {\r
- if(entry.isExcepted()) {\r
- procedure.exception(graph, (Throwable)entry.getResult());\r
- } else {\r
- procedure.execute(graph, (T)entry.getResult());\r
- }\r
- } else {\r
- request.perform(graph, procedure);\r
- }\r
-\r
- }\r
-\r
- @Override\r
- final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {\r
-\r
- assert(request != null);\r
- assert(procedure != null);\r
-\r
-// impl.state.barrier.inc(null, null);\r
-\r
- queryMultiRead(impl, request, parent, listener, procedure);\r
-\r
- }\r
-\r
- @Override\r
- final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {\r
-\r
- assert(request != null);\r
- assert(procedure != null);\r
-\r
-// impl.state.barrier.inc();\r
-\r
- runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {\r
-\r
- public void execute(AsyncReadGraph graph, T result) {\r
-\r
- try {\r
- procedure.execute(graph, result);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
-\r
- try {\r
- procedure.finished(graph);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-\r
-// impl.state.barrier.dec();\r
-\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return procedure.toString();\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
-\r
- try {\r
- procedure.exception(graph, t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
-\r
-// impl.state.barrier.dec();\r
-\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {\r
-\r
- assert(request != null);\r
- assert(procedure != null);\r
-\r
- queryPrimitiveRead(impl, request, parent, listener, new Procedure<T>() {\r
-\r
- @Override\r
- public void execute(T result) {\r
- try {\r
- procedure.execute(result);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return procedure.toString();\r
- }\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- try {\r
- procedure.exception(t);\r
- } catch (Throwable t2) {\r
- Logger.defaultLogError(t2);\r
- }\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {\r
- \r
- return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));\r
- \r
- }\r
- \r
- @Override\r
- public VirtualGraph getProvider(Resource subject, Resource predicate) {\r
- \r
- return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));\r
- \r
- }\r
-\r
- @Override\r
- public VirtualGraph getValueProvider(Resource subject) {\r
- \r
- return querySupport.getValueProvider(querySupport.getId(subject));\r
- \r
- }\r
-\r
- public boolean resumeTasks(ReadGraphImpl graph) {\r
-\r
- return querySupport.resume(graph);\r
-\r
- }\r
- \r
- public boolean isImmutable(int resourceId) {\r
- return querySupport.isImmutable(resourceId);\r
- }\r
-\r
- public boolean isImmutable(Resource resource) {\r
- ResourceImpl impl = (ResourceImpl)resource;\r
- return isImmutable(impl.id);\r
- }\r
- \r
- private Layer0 L0;\r
- \r
- public Layer0 getL0(ReadGraph graph) {\r
- if(L0 == null) {\r
- L0 = Layer0.getInstance(graph);\r
- }\r
- return L0;\r
- }\r
- \r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.db.impl.query;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.DirectStatements;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.RelationInfo;
+import org.simantics.db.Resource;
+import org.simantics.db.Session;
+import org.simantics.db.Statement;
+import org.simantics.db.VirtualGraph;
+import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
+import org.simantics.db.exception.NoInverseException;
+import org.simantics.db.exception.ResourceNotFoundException;
+import org.simantics.db.impl.ResourceImpl;
+import org.simantics.db.impl.graph.BarrierTracing;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.graph.ReadGraphSupport;
+import org.simantics.db.impl.procedure.IntProcedureAdapter;
+import org.simantics.db.impl.procedure.InternalProcedure;
+import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
+import org.simantics.db.impl.support.ResourceSupport;
+import org.simantics.db.procedure.AsyncMultiListener;
+import org.simantics.db.procedure.AsyncMultiProcedure;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.procedure.AsyncSetListener;
+import org.simantics.db.procedure.ListenerBase;
+import org.simantics.db.procedure.MultiProcedure;
+import org.simantics.db.procedure.StatementProcedure;
+import org.simantics.db.procedure.SyncMultiProcedure;
+import org.simantics.db.request.AsyncMultiRead;
+import org.simantics.db.request.ExternalRead;
+import org.simantics.db.request.MultiRead;
+import org.simantics.db.request.RequestFlags;
+import org.simantics.layer0.Layer0;
+import org.simantics.utils.DataContainer;
+import org.simantics.utils.Development;
+import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.datastructures.collections.CollectionUtils;
+import org.simantics.utils.datastructures.disposable.AbstractDisposable;
+
+import gnu.trove.procedure.TIntProcedure;
+import gnu.trove.procedure.TLongProcedure;
+import gnu.trove.procedure.TObjectProcedure;
+import gnu.trove.set.hash.THashSet;
+import gnu.trove.set.hash.TIntHashSet;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {
+
+ public static int indent = 0;
+
+
+ // Garbage collection
+
+ public int boundQueries = 0;
+
+
+ final private int functionalRelation;
+
+ final private int superrelationOf;
+
+ final private int instanceOf;
+
+ final private int inverseOf;
+
+ final private int asserts;
+
+ final private int hasPredicate;
+
+ final private int hasPredicateInverse;
+
+ final private int hasObject;
+
+ final private int inherits;
+
+ final private int subrelationOf;
+
+ final private int rootLibrary;
+
+ /**
+ * A cache for the root library resource. Initialized in
+ * {@link #getRootLibraryResource()}.
+ */
+ private volatile ResourceImpl rootLibraryResource;
+
+ final private int library;
+
+ final private int consistsOf;
+
+ final private int hasName;
+
+ AtomicInteger sleepers = new AtomicInteger(0);
+
+ boolean updating = false;
+
+
+ final public QueryCache cache;
+ final public QuerySupport querySupport;
+ final public Session session;
+ final public ResourceSupport resourceSupport;
+
+ final public Semaphore requests = new Semaphore(1);
+
+ final public QueryListening listening = new QueryListening(this);
+
+ QueryThread[] executors;
+
+ enum ThreadState {
+
+ INIT, RUN, SLEEP, DISPOSED
+
+ }
+
+ final Scheduling scheduling;
+
+ public ThreadState[] threadStates;
+
+ final Object querySupportLock;
+
+ public Long modificationCounter = 0L;
+
+ public void close() {
+ }
+
+
+ /*
+ * We are running errands while waiting for requests to complete.
+ * We can only run work that is part of the current root request to avoid any deadlocks
+ */
+ public boolean performPending(ReadGraphImpl under) {
+ SessionTask task = scheduling.getSubTask(under);
+ if(task != null) {
+ task.run(thread.get());
+ return true;
+ }
+ return false;
+ }
+
+ final public void scheduleNow(SessionTask request) {
+ SessionTask toExecute = scheduleOrReturnForExecution(request);
+ if(toExecute != null)
+ toExecute.run(thread.get());
+ }
+
+ final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
+
+ return scheduling.scheduleOrReturnForExecution(request);
+
+ }
+
+
+ final int THREADS;
+ final public int THREAD_MASK;
+
+ final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
+
+ public static abstract class SessionTask {
+
+ final protected ReadGraphImpl rootGraph;
+ private int counter = 0;
+ protected int position = 1;
+ private Exception trace;
+
+ public SessionTask() {
+ this(null);
+ }
+
+ public SessionTask(ReadGraphImpl rootGraph) {
+ this.rootGraph = rootGraph;
+ }
+
+ public boolean isSubtask(ReadGraphImpl graph) {
+ return graph.isParent(rootGraph);
+ }
+
+ public abstract void run0(int thread);
+
+ public final void run(int thread) {
+ if(counter++ > 0) {
+ if(BarrierTracing.BOOKKEEPING) {
+ trace.printStackTrace();
+ new Exception().printStackTrace();
+ }
+ throw new IllegalStateException("Multiple invocations of SessionTask!");
+ }
+ if(BarrierTracing.BOOKKEEPING) {
+ trace = new Exception();
+ }
+ run0(thread);
+ }
+
+ public boolean maybeReady() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ if(rootGraph == null)
+ return "SessionTask[no graph]";
+ else
+ return "SessionTask[" + rootGraph.parent + "]";
+ }
+
+ }
+
+ public static abstract class SessionRead extends SessionTask {
+
+ final public Semaphore notify;
+ final public DataContainer<Throwable> throwable;
+
+ public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
+ super(null);
+ this.throwable = throwable;
+ this.notify = notify;
+ }
+
+ }
+
+ public boolean resume(ReadGraphImpl graph) {
+ return executors[0].runSynchronized();
+ }
+
+ public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
+ throws DatabaseException {
+
+ THREADS = threads;
+ THREAD_MASK = threads - 1;
+
+ scheduling = new Scheduling(requests);
+
+ querySupport = core;
+ cache = new QueryCache(core, threads);
+ session = querySupport.getSession();
+ resourceSupport = querySupport.getSupport();
+ querySupportLock = core.getLock();
+
+ executors = new QueryThread[THREADS];
+ threadStates = new ThreadState[THREADS];
+
+ for (int i = 0; i < THREADS; i++) {
+ threadStates[i] = ThreadState.INIT;
+ }
+
+ for (int i = 0; i < THREADS; i++) {
+
+ final int index = i;
+
+ executors[i] = new QueryThread(session, this, index, "Query Thread " + index);
+
+ threadSet.add(executors[i]);
+
+ }
+
+ // Now start threads
+ for (int i = 0; i < THREADS; i++) {
+ executors[i].start();
+ }
+
+ // Make sure that query threads are up and running
+ while(sleepers.get() != THREADS) {
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ rootLibrary = core.getBuiltin("http:/");
+ boolean builtinsInstalled = rootLibrary != 0;
+
+ if (builtinsInstalled) {
+ functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);
+ assert (functionalRelation != 0);
+ } else
+ functionalRelation = 0;
+
+ if (builtinsInstalled) {
+ instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);
+ assert (instanceOf != 0);
+ } else
+ instanceOf = 0;
+
+ if (builtinsInstalled) {
+ inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);
+ assert (inverseOf != 0);
+ } else
+ inverseOf = 0;
+
+
+ if (builtinsInstalled) {
+ inherits = core.getBuiltin(Layer0.URIs.Inherits);
+ assert (inherits != 0);
+ } else
+ inherits = 0;
+
+ if (builtinsInstalled) {
+ asserts = core.getBuiltin(Layer0.URIs.Asserts);
+ assert (asserts != 0);
+ } else
+ asserts = 0;
+
+ if (builtinsInstalled) {
+ hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);
+ assert (hasPredicate != 0);
+ } else
+ hasPredicate = 0;
+
+ if (builtinsInstalled) {
+ hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);
+ assert (hasPredicateInverse != 0);
+ } else
+ hasPredicateInverse = 0;
+
+ if (builtinsInstalled) {
+ hasObject = core.getBuiltin(Layer0.URIs.HasObject);
+ assert (hasObject != 0);
+ } else
+ hasObject = 0;
+
+ if (builtinsInstalled) {
+ subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);
+ assert (subrelationOf != 0);
+ } else
+ subrelationOf = 0;
+
+ if (builtinsInstalled) {
+ superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);
+ assert (superrelationOf != 0);
+ } else
+ superrelationOf = 0;
+
+ if (builtinsInstalled) {
+ library = core.getBuiltin(Layer0.URIs.Library);
+ assert (library != 0);
+ } else
+ library = 0;
+
+ if (builtinsInstalled) {
+ consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);
+ assert (consistsOf != 0);
+ } else
+ consistsOf = 0;
+
+ if (builtinsInstalled) {
+ hasName = core.getBuiltin(Layer0.URIs.HasName);
+ assert (hasName != 0);
+ } else
+ hasName = 0;
+
+ }
+
+ final public void releaseWrite(ReadGraphImpl graph) {
+ propagateChangesInQueryCache(graph);
+ modificationCounter++;
+ }
+
+ final public int getId(final Resource r) {
+ return querySupport.getId(r);
+ }
+
+ public QuerySupport getCore() {
+ return querySupport;
+ }
+
+ public int getFunctionalRelation() {
+ return functionalRelation;
+ }
+
+ public int getInherits() {
+ return inherits;
+ }
+
+ public int getInstanceOf() {
+ return instanceOf;
+ }
+
+ public int getInverseOf() {
+ return inverseOf;
+ }
+
+ public int getSubrelationOf() {
+ return subrelationOf;
+ }
+
+ public int getSuperrelationOf() {
+ return superrelationOf;
+ }
+
+ public int getAsserts() {
+ return asserts;
+ }
+
+ public int getHasPredicate() {
+ return hasPredicate;
+ }
+
+ public int getHasPredicateInverse() {
+ return hasPredicateInverse;
+ }
+
+ public int getHasObject() {
+ return hasObject;
+ }
+
+ public int getRootLibrary() {
+ return rootLibrary;
+ }
+
+ public Resource getRootLibraryResource() {
+ if (rootLibraryResource == null) {
+ // Synchronization is not needed here, it doesn't matter if multiple
+ // threads simultaneously set rootLibraryResource once.
+ int root = getRootLibrary();
+ if (root == 0)
+ throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");
+ this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);
+ }
+ return rootLibraryResource;
+ }
+
+ public int getLibrary() {
+ return library;
+ }
+
+ public int getConsistsOf() {
+ return consistsOf;
+ }
+
+ public int getHasName() {
+ return hasName;
+ }
+
+ public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {
+
+ try {
+
+ QueryCache.runnerURIToResource(graph, id, parent, null, new InternalProcedure<Integer>() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, Integer result) throws DatabaseException {
+
+ if (result != null && result != 0) {
+ procedure.execute(graph, result);
+ return;
+ }
+
+ // Fall back to using the fixed builtins.
+// result = querySupport.getBuiltin(id);
+// if (result != 0) {
+// procedure.execute(graph, result);
+// return;
+// }
+
+// try {
+// result = querySupport.getRandomAccessReference(id);
+// } catch (ResourceNotFoundException e) {
+// procedure.exception(graph, e);
+// return;
+// }
+
+ if (result != 0) {
+ procedure.execute(graph, result);
+ } else {
+ procedure.exception(graph, new ResourceNotFoundException(id));
+ }
+
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
+ procedure.exception(graph, t);
+ }
+
+ });
+ } catch (DatabaseException e) {
+
+ try {
+
+ procedure.exception(graph, e);
+
+ } catch (DatabaseException e1) {
+
+ Logger.defaultLogError(e1);
+
+ }
+
+ }
+
+ }
+
+ public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) throws DatabaseException {
+
+ Integer result = querySupport.getBuiltin(id);
+ if (result != 0) {
+ procedure.execute(graph, result);
+ } else {
+ procedure.exception(graph, new ResourceNotFoundException(id));
+ }
+
+ }
+
+ final <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final SyncMultiProcedure<T> procedure) {
+
+ try {
+ QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
+ } catch (DatabaseException e) {
+ throw new IllegalStateException(e);
+ }
+
+ }
+
+ public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {
+
+
+ try {
+ QueryCache.runnerAsyncMultiReadEntry(graph, query, parent, listener, procedure);
+ } catch (DatabaseException e) {
+ throw new IllegalStateException(e);
+ }
+
+ }
+
+ final <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
+ QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
+ }
+
+// @Override
+// public <T> T query(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws DatabaseException {
+//
+// return QueryCache.resultReadEntry(graph, query, parent, listener, procedure);
+//
+// }
+
+ public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final SyncMultiProcedure<T> procedure) throws DatabaseException {
+
+ QueryCache.runnerMultiReadEntry(graph, query, parent, listener, procedure);
+
+ }
+
+ public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) throws DatabaseException {
+
+ QueryCache.runnerExternalReadEntry(graph, query, parent, listener, procedure);
+
+ }
+
+ boolean isBound(ExternalReadEntry<?> entry) {
+ if(entry.hasParents()) return true;
+ else if(listening.hasListener(entry)) return true;
+ else return false;
+ }
+
+ static class Dummy implements InternalProcedure<Object>, IntProcedure {
+
+ @Override
+ public void execute(ReadGraphImpl graph, int i) {
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+ }
+
+ @Override
+ public void execute(ReadGraphImpl graph, Object result) {
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable throwable) {
+ }
+
+ }
+
+ private static final Dummy dummy = new Dummy();
+
+ /*
+ public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {
+
+ if (DebugPolicy.PERFORM)
+ System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);
+
+ assert (!dirty);
+ assert (!collecting);
+
+ assert(query.assertNotDiscarded());
+
+ registerDependencies(graph, query, parent, listener, procedure, false);
+
+ // FRESH, REFUTED, EXCEPTED go here
+ if (!query.isReady()) {
+
+ size++;
+ misses++;
+
+ query.computeForEach(graph, this, (Procedure)dummy, true);
+ return query.get(graph, this, null);
+
+ } else {
+
+ hits++;
+
+ return query.get(graph, this, procedure);
+
+ }
+
+ }
+ */
+
+
+ interface QueryCollectorSupport {
+ public CacheCollectionResult allCaches();
+ public Collection<CacheEntry> getRootList();
+ public int getCurrentSize();
+ public int calculateCurrentSize();
+ public CacheEntryBase iterate(int level);
+ public void remove();
+ public void setLevel(CacheEntryBase entry, int level);
+ public boolean start(boolean flush);
+ }
+
+ interface QueryCollector {
+
+ public void collect(int youngTarget, int allowedTimeInMs);
+
+ }
+
+ class QueryCollectorSupportImpl implements QueryCollectorSupport {
+
+ private static final boolean DEBUG = false;
+ private static final double ITERATION_RATIO = 0.2;
+
+ private CacheCollectionResult iteration = new CacheCollectionResult();
+ private boolean fresh = true;
+ private boolean needDataInStart = true;
+
+ QueryCollectorSupportImpl() {
+ iteration.restart();
+ }
+
+ public CacheCollectionResult allCaches() {
+ CacheCollectionResult result = new CacheCollectionResult();
+ QueryProcessor.this.allCaches(result);
+ result.restart();
+ return result;
+ }
+
+ public boolean start(boolean flush) {
+ // We need new data from query maps
+ fresh = true;
+ if(needDataInStart || flush) {
+ // Last run ended after processing all queries => refresh data
+ restart(flush ? 0.0 : ITERATION_RATIO);
+ } else {
+ // continue with previous big data
+ }
+ // Notify caller about iteration situation
+ return iteration.isAtStart();
+ }
+
+ private void restart(double targetRatio) {
+
+ needDataInStart = true;
+
+ long start = System.nanoTime();
+ if(fresh) {
+
+ // We need new data from query maps
+
+ int iterationSize = iteration.size()+1;
+ int diff = calculateCurrentSize()-iterationSize;
+
+ double ratio = (double)diff / (double)iterationSize;
+ boolean dirty = Math.abs(ratio) >= targetRatio;
+
+ if(dirty) {
+ iteration = allCaches();
+ if(DEBUG) {
+ System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");
+ for(int i=0;i<CacheCollectionResult.LEVELS;i++)
+ System.err.print(" " + iteration.levels[i].size());
+ System.err.println("");
+ }
+ } else {
+ iteration.restart();
+ }
+
+ fresh = false;
+ needDataInStart = false;
+ } else {
+ // We are returning here within the same GC round - reuse the cache table
+ iteration.restart();
+ }
+
+ return;
+
+ }
+
+ @Override
+ public CacheEntryBase iterate(int level) {
+
+ CacheEntryBase entry = iteration.next(level);
+ if(entry == null) {
+ restart(ITERATION_RATIO);
+ return null;
+ }
+
+ while(entry != null && entry.isDiscarded()) {
+ entry = iteration.next(level);
+ }
+
+ return entry;
+
+ }
+
+ @Override
+ public void remove() {
+ iteration.remove();
+ }
+
+ @Override
+ public void setLevel(CacheEntryBase entry, int level) {
+ iteration.setLevel(entry, level);
+ }
+
+ public Collection<CacheEntry> getRootList() {
+ return cache.getRootList();
+ }
+
+ @Override
+ public int calculateCurrentSize() {
+ return cache.calculateCurrentSize();
+ }
+
+ @Override
+ public int getCurrentSize() {
+ return cache.size;
+ }
+
+ }
+ // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);
+
+ private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();
+ private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);
+
+ public int querySize() {
+ return cache.size;
+ }
+
+ public void gc(int youngTarget, int allowedTimeInMs) {
+
+ collector.collect(youngTarget, allowedTimeInMs);
+
+ }
+
+
+ void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
+
+ if(entry.isDiscarded()) return;
+ if(workarea.containsKey(entry)) return;
+
+ Iterable<CacheEntry> parents = entry.getParents(this);
+ HashSet<CacheEntry> ps = new HashSet<CacheEntry>();
+ for(CacheEntry e : parents) {
+ if(e.isDiscarded()) continue;
+ ps.add(e);
+ processParentReport(e, workarea);
+ }
+ workarea.put(entry, ps);
+
+ }
+
+ public synchronized String reportQueryActivity(File file) throws IOException {
+
+ System.err.println("reportQueries " + file.getAbsolutePath());
+
+ if (!isAlive())
+ return "Disposed!";
+
+ PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
+
+ List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);
+ Collections.reverse(entries);
+
+ for(Pair<String,Integer> entry : entries) {
+ b.println(entry.first + ": " + entry.second);
+ }
+
+ b.close();
+
+ Development.histogram.clear();
+
+ return "OK";
+
+ }
+
+ public synchronized String reportQueries(File file) throws IOException {
+
+ System.err.println("reportQueries " + file.getAbsolutePath());
+
+ if (!isAlive())
+ return "Disposed!";
+
+ PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
+
+ long start = System.nanoTime();
+
+// ArrayList<CacheEntry> all = ;
+
+ Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();
+ Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
+ for(CacheEntryBase entry : caches) {
+ processParentReport(entry, workarea);
+ }
+
+ // for(CacheEntry e : all) System.err.println("entry: " + e);
+
+ long duration = System.nanoTime() - start;
+ System.err.println("Query root set in " + 1e-9*duration + "s.");
+
+ start = System.nanoTime();
+
+ HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>();
+
+ int listeners = 0;
+
+ for(CacheEntry entry : workarea.keySet()) {
+ boolean listener = listening.hasListenerAfterDisposing(entry);
+ boolean hasParents = entry.getParents(this).iterator().hasNext();
+ if(listener) {
+ // Bound
+ flagMap.put(entry, 0);
+ } else if (!hasParents) {
+ // Unbound
+ flagMap.put(entry, 1);
+ } else {
+ // Unknown
+ flagMap.put(entry, 2);
+ }
+ // // Write leaf bit
+ // entry.flags |= 4;
+ }
+
+ boolean done = true;
+ int loops = 0;
+
+ do {
+
+ done = true;
+
+ long start2 = System.nanoTime();
+
+ int boundCounter = 0;
+ int unboundCounter = 0;
+ int unknownCounter = 0;
+
+ for(CacheEntry<?> entry : workarea.keySet()) {
+
+ //System.err.println("process " + entry);
+
+ int flags = flagMap.get(entry);
+ int bindStatus = flags & 3;
+
+ if(bindStatus == 0) boundCounter++;
+ else if(bindStatus == 1) unboundCounter++;
+ else if(bindStatus == 2) unknownCounter++;
+
+ if(bindStatus < 2) continue;
+
+ int newStatus = 1;
+ for(CacheEntry parent : entry.getParents(this)) {
+
+ if(parent.isDiscarded()) flagMap.put(parent, 1);
+
+ int flags2 = flagMap.get(parent);
+ int bindStatus2 = flags2 & 3;
+ // Parent is bound => child is bound
+ if(bindStatus2 == 0) {
+ newStatus = 0;
+ break;
+ }
+ // Parent is unknown => child is unknown
+ else if (bindStatus2 == 2) {
+ newStatus = 2;
+ done = false;
+ break;
+ }
+ }
+
+ flagMap.put(entry, newStatus);
+
+ }
+
+ duration = System.nanoTime() - start2;
+ System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
+ b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");
+
+ } while(!done && loops++ < 20);
+
+ if(loops >= 20) {
+
+ for(CacheEntry entry : workarea.keySet()) {
+
+ int bindStatus = flagMap.get(entry);
+ if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);
+
+ }
+
+ }
+
+ duration = System.nanoTime() - start;
+ System.err.println("Query analysis in " + 1e-9*duration + "s.");
+
+ Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();
+
+ for(CacheEntry entry : workarea.keySet()) {
+ Class<?> clazz = entry.getClass();
+ if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).id.getClass();
+ else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).id.getClass();
+ else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).id.getClass();
+ else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).id.getClass();
+ else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).id.getClass();
+ Integer c = counts.get(clazz);
+ if(c == null) counts.put(clazz, -1);
+ else counts.put(clazz, c-1);
+ }
+
+ b.print("// Simantics DB client query report file\n");
+ b.print("// This file contains the following information\n");
+ b.print("// -The amount of cached query instances per query class\n");
+ b.print("// -The sizes of retained child sets\n");
+ b.print("// -List of parents for each query (search for 'P <query name>')\n");
+ b.print("// -Followed by status, where\n");
+ b.print("// -0=bound\n");
+ b.print("// -1=free\n");
+ b.print("// -2=unknown\n");
+ b.print("// -L=has listener\n");
+ b.print("// -List of children for each query (search for 'C <query name>')\n");
+
+ b.print("----------------------------------------\n");
+
+ b.print("// Queries by class\n");
+ for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {
+ b.print(-p.second + " " + p.first.getName() + "\n");
+ }
+
+ Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();
+ for(CacheEntry e : workarea.keySet())
+ hist.put(e, -1);
+
+ boolean changed = true;
+ int iter = 0;
+ while(changed && iter++<50) {
+
+ changed = false;
+
+ Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();
+ for(CacheEntry e : workarea.keySet())
+ newHist.put(e, -1);
+
+ for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {
+ Integer c = hist.get(e.getKey());
+ for(CacheEntry p : e.getValue()) {
+ Integer i = newHist.get(p);
+ newHist.put(p, i+c);
+ }
+ }
+ for(CacheEntry e : workarea.keySet()) {
+ Integer value = newHist.get(e);
+ Integer old = hist.get(e);
+ if(!value.equals(old)) {
+ hist.put(e, value);
+// System.err.println("hist " + e + ": " + old + " => " + value);
+ changed = true;
+ }
+ }
+
+ System.err.println("Retained set iteration " + iter);
+
+ }
+
+ b.print("// Queries by retained set\n");
+ for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
+ b.print("" + -p.second + " " + p.first + "\n");
+ }
+
+ HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();
+
+ b.print("// Entry parent listing\n");
+ for(CacheEntry entry : workarea.keySet()) {
+ int status = flagMap.get(entry);
+ boolean hasListener = listening.hasListenerAfterDisposing(entry);
+ b.print("Q " + entry.toString());
+ if(hasListener) {
+ b.print(" (L" + status + ")");
+ listeners++;
+ } else {
+ b.print(" (" + status + ")");
+ }
+ b.print("\n");
+ for(CacheEntry parent : workarea.get(entry)) {
+ Collection<CacheEntry> inv = inverse.get(parent);
+ if(inv == null) {
+ inv = new ArrayList<CacheEntry>();
+ inverse.put(parent, inv);
+ }
+ inv.add(entry);
+ b.print(" " + parent.toString());
+ b.print("\n");
+ }
+ }
+
+ b.print("// Entry child listing\n");
+ for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {
+ b.print("C " + entry.getKey().toString());
+ b.print("\n");
+ for(CacheEntry child : entry.getValue()) {
+ Integer h = hist.get(child);
+ if(h != null) {
+ b.print(" " + h);
+ } else {
+ b.print(" <no children>");
+ }
+ b.print(" " + child.toString());
+ b.print("\n");
+ }
+ }
+
+ b.print("#queries: " + workarea.keySet().size() + "\n");
+ b.print("#listeners: " + listeners + "\n");
+
+ b.close();
+
+ return "Dumped " + workarea.keySet().size() + " queries.";
+
+ }
+
+ boolean removeQuery(CacheEntry entry) {
+
+ // This entry has been removed before. No need to do anything here.
+ if(entry.isDiscarded()) return false;
+
+ assert (!entry.isDiscarded());
+
+ Query query = entry.getQuery();
+
+ query.removeEntry(this);
+
+ cache.updates++;
+ cache.size--;
+
+ if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)
+ boundQueries--;
+
+ entry.discard();
+
+ return true;
+
+ }
+
+ /**
+ *
+ * @return true if this entry is being listened
+ */
+ private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {
+
+ assert (e != null);
+
+ CacheEntry entry = e.entry;
+
+ /*
+ * If the dependency graph forms a DAG, some entries are inserted in the
+ * todo list many times. They only need to be processed once though.
+ */
+ if (entry.isDiscarded()) {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ System.err.print("D");
+ for (int i = 0; i < e.indent; i++)
+ System.err.print(" ");
+ System.err.println(entry.getQuery());
+ }
+ }
+// System.err.println(" => DISCARDED");
+ return false;
+ }
+
+// if (entry.isRefuted()) {
+// if (Development.DEVELOPMENT) {
+// if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+// System.err.print("R");
+// for (int i = 0; i < e.indent; i++)
+// System.err.print(" ");
+// System.err.println(entry.getQuery());
+// }
+// }
+// return false;
+// }
+
+ if (entry.isExcepted()) {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ System.err.print("E");
+ }
+ }
+ }
+
+ if (entry.isPending()) {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ System.err.print("P");
+ }
+ }
+ }
+
+ cache.updates++;
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ System.err.print("U ");
+ for (int i = 0; i < e.indent; i++)
+ System.err.print(" ");
+ System.err.print(entry.getQuery());
+ }
+ }
+
+ Query query = entry.getQuery();
+ int type = query.type();
+
+ boolean hasListener = listening.hasListener(entry);
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ if(listening.hasListener(entry)) {
+ System.err.println(" (L)");
+ } else {
+ System.err.println("");
+ }
+ }
+ }
+
+ if(entry.isPending() || entry.isExcepted()) {
+
+ // If updated
+ if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
+
+ immediates.put(entry, entry);
+
+ } else {
+
+ if(hasListener) {
+ entry.refute();
+ } else {
+ removeQuery(entry);
+ }
+
+ }
+
+ } else {
+
+ // If updated
+ if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {
+
+ immediates.put(entry, entry);
+
+ } else {
+
+ if(hasListener) {
+ entry.refute();
+ } else {
+ removeQuery(entry);
+ }
+
+ }
+
+ }
+
+// System.err.println(" => FOO " + type);
+
+ if (hasListener) {
+ ArrayList<ListenerEntry> entries = listening.listeners.get(entry);
+ if(entries != null) {
+ for (ListenerEntry le : entries) {
+ listening.scheduleListener(le);
+ }
+ }
+ }
+
+ // If invalid, update parents
+ if (type == RequestFlags.INVALIDATE) {
+ listening.updateParents(e.indent, entry, todo);
+ }
+
+ return hasListener;
+
+ }
+
+ /**
+ * @param av1 an array (guaranteed)
+ * @param av2 any object
+ * @return <code>true</code> if the two arrays are equal
+ */
+ private final boolean arrayEquals(Object av1, Object av2) {
+ if (av2 == null)
+ return false;
+ Class<?> c1 = av1.getClass().getComponentType();
+ Class<?> c2 = av2.getClass().getComponentType();
+ if (c2 == null || !c1.equals(c2))
+ return false;
+ boolean p1 = c1.isPrimitive();
+ boolean p2 = c2.isPrimitive();
+ if (p1 != p2)
+ return false;
+ if (!p1)
+ return Arrays.equals((Object[]) av1, (Object[]) av2);
+ if (boolean.class.equals(c1))
+ return Arrays.equals((boolean[]) av1, (boolean[]) av2);
+ else if (byte.class.equals(c1))
+ return Arrays.equals((byte[]) av1, (byte[]) av2);
+ else if (int.class.equals(c1))
+ return Arrays.equals((int[]) av1, (int[]) av2);
+ else if (long.class.equals(c1))
+ return Arrays.equals((long[]) av1, (long[]) av2);
+ else if (float.class.equals(c1))
+ return Arrays.equals((float[]) av1, (float[]) av2);
+ else if (double.class.equals(c1))
+ return Arrays.equals((double[]) av1, (double[]) av2);
+ throw new RuntimeException("??? Contact application querySupport.");
+ }
+
+
+
+ final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
+
+ try {
+
+ Query query = entry.getQuery();
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_RECOMPUTE, Bindings.BOOLEAN)) {
+ System.err.println("R " + query);
+ }
+ }
+
+ entry.prepareRecompute(querySupport);
+
+ ReadGraphImpl parentGraph = graph.forRecompute(entry);
+ parentGraph.asyncBarrier.inc();
+ query.recompute(parentGraph);
+ parentGraph.asyncBarrier.dec();
+
+ if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
+
+ Object newValue = entry.getResult();
+
+ if (ListenerEntry.NO_VALUE == oldValue) {
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
+ System.out.println("C " + query);
+ System.out.println("- " + oldValue);
+ System.out.println("- " + newValue);
+ }
+ }
+ return newValue;
+ }
+
+ boolean changed = false;
+
+ if (newValue != null) {
+ if (newValue.getClass().isArray()) {
+ changed = !arrayEquals(newValue, oldValue);
+ } else {
+ changed = !newValue.equals(oldValue);
+ }
+ } else
+ changed = (oldValue != null);
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
+ System.err.println("C " + query);
+ System.err.println("- " + oldValue);
+ System.err.println("- " + newValue);
+ }
+ }
+
+ return changed ? newValue : ListenerEntry.NOT_CHANGED;
+
+ } catch (Throwable t) {
+
+ Logger.defaultLogError(t);
+ entry.except(t);
+ return ListenerEntry.NO_VALUE;
+
+ }
+
+ }
+
+
+ /**
+ *
+ * @return true if this entry still has listeners
+ */
+ public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {
+
+ assert (!cache.collecting);
+ assert (!updating);
+ updating = true;
+
+ boolean hadListeners = false;
+ boolean listenersUnknown = false;
+
+ try {
+
+ assert(entry != null);
+ LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();
+ IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();
+ todo.add(new UpdateEntry(null, entry, 0));
+
+ while(true) {
+
+ // Walk the tree and collect immediate updates
+ while (!todo.isEmpty()) {
+ UpdateEntry e = todo.pop();
+ hadListeners |= updateQuery(e, todo, immediates);
+ }
+
+ if(immediates.isEmpty()) break;
+
+ // Evaluate all immediate updates and collect parents to update
+ for(CacheEntry immediate : immediates.values()) {
+
+ if(immediate.isDiscarded()) {
+ continue;
+ }
+
+ if(immediate.isExcepted()) {
+
+ Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
+ if (newValue != ListenerEntry.NOT_CHANGED)
+ listening.updateParents(0, immediate, todo);
+
+ } else {
+
+ Object oldValue = immediate.getResult();
+ Object newValue = compareTo(graph, immediate, oldValue);
+
+ if (newValue != ListenerEntry.NOT_CHANGED) {
+ listening.updateParents(0, immediate, todo);
+ } else {
+ // If not changed, keep the old value
+ immediate.setResult(oldValue);
+ immediate.setReady();
+ listenersUnknown = true;
+ }
+
+ }
+
+ }
+ immediates.clear();
+
+ }
+
+ } catch (Throwable t) {
+ Logger.defaultLogError(t);
+ }
+
+ assert (updating);
+ updating = false;
+
+ return hadListeners | listenersUnknown;
+
+ }
+
+ private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();
+ private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();
+ private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();
+ // Maybe use a mutex from util.concurrent?
+ private Object primitiveUpdateLock = new Object();
+ private THashSet scheduledPrimitiveUpdates = new THashSet();
+
+ private ArrayList<CacheEntry> refutations = new ArrayList<>();
+
+ private void markForUpdate(ReadGraphImpl graph, CacheEntry e) {
+ e.refute();
+ refutations.add(e);
+ }
+
+ private void updateRefutations(ReadGraphImpl graph) {
+
+ for(CacheEntry e : refutations)
+ update(graph, e);
+
+ refutations.clear();
+
+ }
+
+ public void propagateChangesInQueryCache(final ReadGraphImpl graph) {
+
+ // Make sure that listening has performed its work
+ listening.sync();
+
+ cache.dirty = false;
+ lastInvalidate = 0;
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ System.err.println("== Query update ==");
+ }
+ }
+
+ // Special case - one statement
+ if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
+
+ long arg0 = scheduledObjectUpdates.getFirst();
+
+ final int subject = (int)(arg0 >>> 32);
+ final int predicate = (int)(arg0 & 0xffffffff);
+
+ for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
+ for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
+ for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
+
+ if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
+ PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
+ if(principalTypes != null) markForUpdate(graph, principalTypes);
+ Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
+ if(types != null) markForUpdate(graph, types);
+ }
+
+ if(predicate == subrelationOf) {
+ SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
+ if(superRelations != null) markForUpdate(graph, superRelations);
+ }
+
+ DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
+ if(dp != null) markForUpdate(graph, dp);
+ OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
+ if(os != null) markForUpdate(graph, os);
+
+ updateRefutations(graph);
+
+ scheduledObjectUpdates.clear();
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ System.err.println("== Query update ends ==");
+ }
+ }
+
+ return;
+
+ }
+
+ // Special case - one value
+ if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {
+
+ int arg0 = scheduledValueUpdates.getFirst();
+
+ ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
+ if(valueQuery != null) markForUpdate(graph, valueQuery);
+
+ updateRefutations(graph);
+
+ scheduledValueUpdates.clear();
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ System.err.println("== Query update ends ==");
+ }
+ }
+
+ return;
+
+ }
+
+ final TIntHashSet predicates = new TIntHashSet();
+ final TIntHashSet orderedSets = new TIntHashSet();
+
+ THashSet primitiveUpdates;
+ synchronized (primitiveUpdateLock) {
+ primitiveUpdates = scheduledPrimitiveUpdates;
+ scheduledPrimitiveUpdates = new THashSet();
+ }
+
+ scheduledValueUpdates.forEach(new TIntProcedure() {
+
+ @Override
+ public boolean execute(int arg0) {
+ ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
+ if(valueQuery != null) markForUpdate(graph, valueQuery);
+ return true;
+ }
+
+ });
+
+ scheduledInvalidates.forEach(new TIntProcedure() {
+
+ @Override
+ public boolean execute(int resource) {
+
+ ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
+ if(valueQuery != null) markForUpdate(graph, valueQuery);
+
+ PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
+ if(principalTypes != null) markForUpdate(graph, principalTypes);
+ Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
+ if(types != null) markForUpdate(graph, types);
+
+ SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
+ if(superRelations != null) markForUpdate(graph, superRelations);
+
+ predicates.add(resource);
+
+ return true;
+ }
+
+ });
+
+ scheduledObjectUpdates.forEach(new TLongProcedure() {
+
+ @Override
+ public boolean execute(long arg0) {
+
+ final int subject = (int)(arg0 >>> 32);
+ final int predicate = (int)(arg0 & 0xffffffff);
+
+ if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
+ PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
+ if(principalTypes != null) markForUpdate(graph, principalTypes);
+ Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
+ if(types != null) markForUpdate(graph, types);
+ }
+
+ if(predicate == subrelationOf) {
+ SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
+ if(superRelations != null) markForUpdate(graph, superRelations);
+ }
+
+ predicates.add(subject);
+ orderedSets.add(predicate);
+
+ return true;
+
+ }
+
+ });
+
+ predicates.forEach(new TIntProcedure() {
+
+ @Override
+ public boolean execute(final int subject) {
+
+ for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
+ for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
+ for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
+
+ DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
+ if(entry != null) markForUpdate(graph, entry);
+
+ return true;
+
+ }
+
+ });
+
+ orderedSets.forEach(new TIntProcedure() {
+
+ @Override
+ public boolean execute(int orderedSet) {
+
+ OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
+ if(entry != null) markForUpdate(graph, entry);
+
+ return true;
+
+ }
+
+ });
+
+ updateRefutations(graph);
+
+ primitiveUpdates.forEach(new TObjectProcedure() {
+
+ @Override
+ public boolean execute(Object arg0) {
+
+ ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
+ if (query != null) {
+ boolean listening = update(graph, query);
+ if (!listening && !query.hasParents()) {
+ cache.externalReadEntryMap.remove(arg0);
+ query.discard();
+ }
+ }
+ return true;
+ }
+
+ });
+
+ scheduledValueUpdates.clear();
+ scheduledObjectUpdates.clear();
+ scheduledInvalidates.clear();
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ System.err.println("== Query update ends ==");
+ }
+ }
+
+ }
+
+ public void updateValue(final int resource) {
+ scheduledValueUpdates.add(resource);
+ cache.dirty = true;
+ }
+
+ public void updateStatements(final int resource, final int predicate) {
+ scheduledObjectUpdates.add((((long)resource) << 32) + predicate);
+ cache.dirty = true;
+ }
+
+ private int lastInvalidate = 0;
+
+ public void invalidateResource(final int resource) {
+ if(lastInvalidate == resource) return;
+ scheduledValueUpdates.add(resource);
+ lastInvalidate = resource;
+ cache.dirty = true;
+ }
+
+ public void updatePrimitive(final ExternalRead primitive) {
+
+ // External reads may be updated from arbitrary threads.
+ // Synchronize to prevent race-conditions.
+ synchronized (primitiveUpdateLock) {
+ scheduledPrimitiveUpdates.add(primitive);
+ }
+ querySupport.dirtyPrimitives();
+
+ }
+
+ @Override
+ public synchronized String toString() {
+ return "QueryProvider [size = " + cache.size + ", hits = " + cache.hits + " misses = " + cache.misses + ", updates = " + cache.updates + "]";
+ }
+
+ @Override
+ protected void doDispose() {
+
+ requests.release(Integer.MAX_VALUE / 2);
+
+ for(int index = 0; index < THREADS; index++) {
+ executors[index].dispose();
+ }
+
+ // First just wait
+ for(int i=0;i<100;i++) {
+
+ boolean alive = false;
+ for(int index = 0; index < THREADS; index++) {
+ alive |= executors[index].isAlive();
+ }
+ if(!alive) return;
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ // Then start interrupting
+ for(int i=0;i<100;i++) {
+
+ boolean alive = false;
+ for(int index = 0; index < THREADS; index++) {
+ alive |= executors[index].isAlive();
+ }
+ if(!alive) return;
+ for(int index = 0; index < THREADS; index++) {
+ executors[index].interrupt();
+ }
+ }
+
+ // // Then just destroy
+ // for(int index = 0; index < THREADS; index++) {
+ // executors[index].destroy();
+ // }
+
+ for(int index = 0; index < THREADS; index++) {
+ try {
+ executors[index].join(5000);
+ } catch (InterruptedException e) {
+ Logger.defaultLogError("QueryThread " + index + " will not die.", e);
+ }
+ executors[index] = null;
+ }
+
+ }
+
+ public int getHits() {
+ return cache.hits;
+ }
+
+ public int getMisses() {
+ return cache.misses;
+ }
+
+ public int getSize() {
+ return cache.size;
+ }
+
+ public Set<Long> getReferencedClusters() {
+ HashSet<Long> result = new HashSet<Long>();
+ for (CacheEntry entry : QueryCache.entriesObjects(this)) {
+ Objects query = (Objects) entry.getQuery();
+ result.add(querySupport.getClusterId(query.r1()));
+ }
+ for (CacheEntry entry : QueryCache.entriesDirectPredicates(this)) {
+ DirectPredicates query = (DirectPredicates) entry.getQuery();
+ result.add(querySupport.getClusterId(query.id));
+ }
+ for (CacheEntry entry : cache.valueQueryMap.values()) {
+ ValueQuery query = (ValueQuery) entry.getQuery();
+ result.add(querySupport.getClusterId(query.id));
+ }
+ return result;
+ }
+
+ public void assertDone() {
+ }
+
+ CacheCollectionResult allCaches(CacheCollectionResult result) {
+
+ return cache.allCaches(result);
+
+ }
+
+ public void printDiagnostics() {
+ }
+
+ public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {
+ querySupport.requestCluster(graph, clusterId, runnable);
+ }
+
+ public int clean() {
+ collector.collect(0, Integer.MAX_VALUE);
+ return cache.size;
+ }
+
+ public void clean(final Collection<ExternalRead<?>> requests) {
+ QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {
+ Iterator<ExternalRead<?>> iterator = requests.iterator();
+ @Override
+ public CacheCollectionResult allCaches() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public CacheEntryBase iterate(int level) {
+ if(iterator.hasNext()) {
+ ExternalRead<?> request = iterator.next();
+ ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
+ if (entry != null) return entry;
+ else return iterate(level);
+ } else {
+ iterator = requests.iterator();
+ return null;
+ }
+ }
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public void setLevel(CacheEntryBase entry, int level) {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public Collection<CacheEntry> getRootList() {
+ ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());
+ for (ExternalRead<?> request : requests) {
+ ExternalReadEntry entry = cache.externalReadEntryMap.get(request);
+ if (entry != null)
+ result.add(entry);
+ }
+ return result;
+ }
+ @Override
+ public int getCurrentSize() {
+ return cache.size;
+ }
+ @Override
+ public int calculateCurrentSize() {
+ // This tells the collector to attempt collecting everything.
+ return Integer.MAX_VALUE;
+ }
+ @Override
+ public boolean start(boolean flush) {
+ return true;
+ }
+ };
+ new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);
+ }
+
+ public void scanPending() {
+
+ cache.scanPending();
+
+ }
+
+ public ReadGraphImpl graphForVirtualRequest() {
+ return ReadGraphImpl.createAsync(this);
+ }
+
+
+ private HashMap<Resource, Class<?>> builtinValues;
+
+ public Class<?> getBuiltinValue(Resource r) {
+ if(builtinValues == null) initBuiltinValues();
+ return builtinValues.get(r);
+ }
+
+ Exception callerException = null;
+
+ public interface AsyncBarrier {
+ public void inc();
+ public void dec();
+ public void waitBarrier(Object request, ReadGraphImpl impl);
+ public boolean isBlocking();
+ }
+
+// final public QueryProcessor processor;
+// final public QuerySupport support;
+
+ // boolean disposed = false;
+
+ private void initBuiltinValues() {
+
+ Layer0 b = getSession().peekService(Layer0.class);
+ if(b == null) return;
+
+ builtinValues = new HashMap<Resource, Class<?>>();
+
+ builtinValues.put(b.String, String.class);
+ builtinValues.put(b.Double, Double.class);
+ builtinValues.put(b.Float, Float.class);
+ builtinValues.put(b.Long, Long.class);
+ builtinValues.put(b.Integer, Integer.class);
+ builtinValues.put(b.Byte, Byte.class);
+ builtinValues.put(b.Boolean, Boolean.class);
+
+ builtinValues.put(b.StringArray, String[].class);
+ builtinValues.put(b.DoubleArray, double[].class);
+ builtinValues.put(b.FloatArray, float[].class);
+ builtinValues.put(b.LongArray, long[].class);
+ builtinValues.put(b.IntegerArray, int[].class);
+ builtinValues.put(b.ByteArray, byte[].class);
+ builtinValues.put(b.BooleanArray, boolean[].class);
+
+ }
+
+// public ReadGraphSupportImpl(final QueryProcessor provider2) {
+//
+// if (null == provider2) {
+// this.processor = null;
+// support = null;
+// return;
+// }
+// this.processor = provider2;
+// support = provider2.getCore();
+// initBuiltinValues();
+//
+// }
+
+// final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {
+// return new ReadGraphSupportImpl(impl.processor);
+// }
+
+ @Override
+ final public Session getSession() {
+ return session;
+ }
+
+ final public ResourceSupport getResourceSupport() {
+ return resourceSupport;
+ }
+
+ @Override
+ final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
+
+ try {
+
+ for(Resource predicate : getPredicates(impl, subject))
+ procedure.execute(impl, predicate);
+
+ procedure.finished(impl);
+
+ } catch (Throwable e) {
+ procedure.exception(impl, e);
+ }
+
+ }
+
+ @Override
+ final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
+
+ throw new UnsupportedOperationException();
+
+// assert(subject != null);
+// assert(procedure != null);
+//
+// final ListenerBase listener = getListenerBase(procedure);
+//
+// try {
+// QueryCache.runnerPredicates(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
+//
+// @Override
+// public void execute(ReadGraphImpl graph, int i) {
+// try {
+// procedure.execute(querySupport.getResource(i));
+// } catch (Throwable t2) {
+// Logger.defaultLogError(t2);
+// }
+// }
+//
+// @Override
+// public void finished(ReadGraphImpl graph) {
+// try {
+// procedure.finished();
+// } catch (Throwable t2) {
+// Logger.defaultLogError(t2);
+// }
+//// impl.state.barrier.dec();
+// }
+//
+// @Override
+// public void exception(ReadGraphImpl graph, Throwable t) {
+// try {
+// procedure.exception(t);
+// } catch (Throwable t2) {
+// Logger.defaultLogError(t2);
+// }
+//// impl.state.barrier.dec();
+// }
+//
+// });
+// } catch (DatabaseException e) {
+// Logger.defaultLogError(e);
+// }
+
+ }
+
+ @Override
+ final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
+ return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null);
+ }
+
+ @Override
+ final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
+ final Resource predicate, final MultiProcedure<Statement> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+// impl.state.barrier.inc();
+
+ try {
+ Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, int s, int p, int o) {
+ try {
+ procedure.execute(querySupport.getStatement(s, p, o));
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+ try {
+ procedure.finished();
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ procedure.exception(t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ });
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ @Override
+ final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
+ final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
+
+ boolean first = true;
+
+ @Override
+ public void execute(ReadGraphImpl graph, int s, int p, int o) {
+ try {
+ if(first) {
+ procedure.execute(graph, querySupport.getStatement(s, p, o));
+ } else {
+ procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+
+ try {
+ if(first) {
+ first = false;
+ procedure.finished(graph);
+// impl.state.barrier.dec(this);
+ } else {
+ procedure.finished(impl.newRestart(graph));
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+
+ try {
+ if(first) {
+ first = false;
+ procedure.exception(graph, t);
+// impl.state.barrier.dec(this);
+ } else {
+ procedure.exception(impl.newRestart(graph), t);
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+
+ }
+
+ };
+
+ int sId = querySupport.getId(subject);
+ int pId = querySupport.getId(predicate);
+
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
+// else impl.state.barrier.inc(null, null);
+
+ try {
+ Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ @Override
+ final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
+ final Resource predicate, final StatementProcedure procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {
+
+ boolean first = true;
+
+ @Override
+ public void execute(ReadGraphImpl graph, int s, int p, int o) {
+ try {
+ if(first) {
+ procedure.execute(graph, s, p, o);
+ } else {
+ procedure.execute(impl.newRestart(graph), s, p, o);
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+
+ try {
+ if(first) {
+ first = false;
+ procedure.finished(graph);
+// impl.state.barrier.dec(this);
+ } else {
+ procedure.finished(impl.newRestart(graph));
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+
+ try {
+ if(first) {
+ first = false;
+ procedure.exception(graph, t);
+// impl.state.barrier.dec(this);
+ } else {
+ procedure.exception(impl.newRestart(graph), t);
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+
+ }
+
+ };
+
+ int sId = querySupport.getId(subject);
+ int pId = querySupport.getId(predicate);
+
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);
+// else impl.state.barrier.inc(null, null);
+
+ try {
+ Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ @Override
+ final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+ forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
+
+ private Set<Statement> current = null;
+ private Set<Statement> run = new HashSet<Statement>();
+
+ @Override
+ public void execute(AsyncReadGraph graph, Statement result) {
+
+ boolean found = false;
+
+ if(current != null) {
+
+ found = current.remove(result);
+
+ }
+
+ if(!found) procedure.add(graph, result);
+
+ run.add(result);
+
+ }
+
+ @Override
+ public void finished(AsyncReadGraph graph) {
+
+ if(current != null) {
+ for(Statement r : current) procedure.remove(graph, r);
+ }
+
+ current = run;
+
+ run = new HashSet<Statement>();
+
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ procedure.exception(graph, t);
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return procedure.isDisposed();
+ }
+
+ });
+
+ }
+
+ @Override
+ final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,
+ final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+// impl.state.barrier.inc();
+
+ try {
+ QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedureAdapter() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, int s, int p, int o) {
+ try {
+ procedure.execute(graph, querySupport.getStatement(s, p, o));
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+ try {
+ procedure.finished(graph);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ });
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ private static ListenerBase getListenerBase(Object procedure) {
+ if(procedure instanceof ListenerBase) return (ListenerBase)procedure;
+ else return null;
+ }
+
+ @Override
+ final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+// impl.state.barrier.inc();
+
+ try {
+ QueryCache.runnerObjects(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, int i) {
+ try {
+ procedure.execute(querySupport.getResource(i));
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+ try {
+ procedure.finished();
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ System.out.println("forEachObject exception " + t);
+ try {
+ procedure.exception(t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ });
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ @Override
+ final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ int sId = querySupport.getId(subject);
+
+ try {
+ QueryCache.runnerDirectPredicates(impl, sId, impl.parent, listener, new InternalProcedure<IntSet>() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, IntSet result) throws DatabaseException {
+ procedure.execute(graph, result);
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable throwable) throws DatabaseException {
+ procedure.exception(graph, throwable);
+ }
+
+ });
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ final public DirectStatements getDirectStatements(final ReadGraphImpl impl, final Resource subject, final boolean ignoreVirtual) {
+
+// assert(subject != null);
+// assert(procedure != null);
+//
+// final ListenerBase listener = getListenerBase(procedure);
+//
+// org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
+
+ return querySupport.getStatements(impl, querySupport.getId(subject), this, ignoreVirtual);
+
+ }
+
+// @Override
+// final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final SyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {
+//
+// assert(subject != null);
+// assert(procedure != null);
+//
+// final ListenerBase listener = getListenerBase(procedure);
+//
+// org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);
+//
+// }
+
+ private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);
+
+ @Override
+ final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {
+
+ forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {
+
+ private Resource single = null;
+
+ @Override
+ public synchronized void execute(AsyncReadGraph graph, Resource result) {
+ if(single == null) {
+ single = result;
+ } else {
+ single = INVALID_RESOURCE;
+ }
+ }
+
+ @Override
+ public synchronized void finished(AsyncReadGraph graph) {
+ if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);
+ else procedure.execute(graph, single);
+ }
+
+ @Override
+ public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {
+ procedure.exception(graph, throwable);
+ }
+
+ });
+
+ }
+
+ final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {
+
+ final int sId = querySupport.getId(subject);
+ final int pId = querySupport.getId(predicate);
+
+ try {
+ QueryCache.runnerObjects(impl, sId, pId, impl.parent, listener, procedure);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ static class Runner2Procedure implements IntProcedure {
+
+ public int single = 0;
+ public Throwable t = null;
+
+ public void clear() {
+ single = 0;
+ t = null;
+ }
+
+ @Override
+ public void execute(ReadGraphImpl graph, int i) {
+ if(single == 0) single = i;
+ else single = -1;
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+ if(single == -1) single = 0;
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable throwable) {
+ single = 0;
+ this.t = throwable;
+ }
+
+ public int get() throws DatabaseException {
+ if(t != null) {
+ if(t instanceof DatabaseException) throw (DatabaseException)t;
+ else throw new DatabaseException(t);
+ }
+ return single;
+ }
+
+ }
+
+ final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {
+
+ final int sId = querySupport.getId(subject);
+ final int pId = querySupport.getId(predicate);
+
+ Runner2Procedure proc = new Runner2Procedure();
+ QueryCache.runnerObjects(impl, sId, pId, impl.parent, null, proc);
+ return proc.get();
+
+ }
+
+ final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ if(impl.parent != null || listener != null) {
+
+ IntProcedure ip = new IntProcedure() {
+
+ AtomicBoolean first = new AtomicBoolean(true);
+
+ @Override
+ public void execute(ReadGraphImpl graph, int i) {
+ try {
+ if(first.get()) {
+ procedure.execute(impl, querySupport.getResource(i));
+ } else {
+ procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+ try {
+ if(first.compareAndSet(true, false)) {
+ procedure.finished(impl);
+// impl.state.barrier.dec(this);
+ } else {
+ procedure.finished(impl.newRestart(graph));
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec(this);
+ }
+
+ @Override
+ public String toString() {
+ return "forEachObject with " + procedure;
+ }
+
+ };
+
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);
+// else impl.state.barrier.inc(null, null);
+
+ forEachObject(impl, subject, predicate, listener, ip);
+
+ } else {
+
+ IntProcedure ip = new IntProcedure() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, int i) {
+ procedure.execute(graph, querySupport.getResource(i));
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+ procedure.finished(graph);
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ procedure.exception(graph, t);
+ }
+
+ @Override
+ public String toString() {
+ return "forEachObject with " + procedure;
+ }
+
+ };
+
+ forEachObject(impl, subject, predicate, listener, ip);
+
+ }
+
+ }
+
+ @Override
+ final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+ forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
+
+ private Set<Resource> current = null;
+ private Set<Resource> run = new HashSet<Resource>();
+
+ @Override
+ public void execute(AsyncReadGraph graph, Resource result) {
+
+ boolean found = false;
+
+ if(current != null) {
+
+ found = current.remove(result);
+
+ }
+
+ if(!found) procedure.add(graph, result);
+
+ run.add(result);
+
+ }
+
+ @Override
+ public void finished(AsyncReadGraph graph) {
+
+ if(current != null) {
+ for(Resource r : current) procedure.remove(graph, r);
+ }
+
+ current = run;
+
+ run = new HashSet<Resource>();
+
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return procedure.isDisposed();
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ procedure.exception(graph, t);
+ }
+
+ @Override
+ public String toString() {
+ return "forObjectSet " + procedure;
+ }
+
+ });
+
+ }
+
+ @Override
+ final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {
+
+ private Set<Resource> current = null;
+ private Set<Resource> run = new HashSet<Resource>();
+
+ @Override
+ public void execute(AsyncReadGraph graph, Resource result) {
+
+ boolean found = false;
+
+ if(current != null) {
+
+ found = current.remove(result);
+
+ }
+
+ if(!found) procedure.add(graph, result);
+
+ run.add(result);
+
+ }
+
+ @Override
+ public void finished(AsyncReadGraph graph) {
+
+ if(current != null) {
+ for(Resource r : current) procedure.remove(graph, r);
+ }
+
+ current = run;
+
+ run = new HashSet<Resource>();
+
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return procedure.isDisposed();
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ procedure.exception(graph, t);
+ }
+
+ @Override
+ public String toString() {
+ return "forPredicateSet " + procedure;
+ }
+
+ });
+
+ }
+
+ @Override
+ final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {
+
+ private Set<Resource> current = null;
+ private Set<Resource> run = new HashSet<Resource>();
+
+ @Override
+ public void execute(AsyncReadGraph graph, Resource result) {
+
+ boolean found = false;
+
+ if(current != null) {
+
+ found = current.remove(result);
+
+ }
+
+ if(!found) procedure.add(graph, result);
+
+ run.add(result);
+
+ }
+
+ @Override
+ public void finished(AsyncReadGraph graph) {
+
+ if(current != null) {
+ for(Resource r : current) procedure.remove(graph, r);
+ }
+
+ current = run;
+
+ run = new HashSet<Resource>();
+
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return procedure.isDisposed();
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ procedure.exception(graph, t);
+ }
+
+ @Override
+ public String toString() {
+ return "forPrincipalTypeSet " + procedure;
+ }
+
+ });
+
+ }
+
+ @Override
+ final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+ forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {
+
+ private Set<Resource> current = null;
+ private Set<Resource> run = new HashSet<Resource>();
+
+ @Override
+ public void execute(AsyncReadGraph graph, Resource result) {
+
+ boolean found = false;
+
+ if(current != null) {
+
+ found = current.remove(result);
+
+ }
+
+ if(!found) procedure.add(graph, result);
+
+ run.add(result);
+
+ }
+
+ @Override
+ public void finished(AsyncReadGraph graph) {
+
+ if(current != null) {
+ for(Resource r : current) procedure.remove(graph, r);
+ }
+
+ current = run;
+
+ run = new HashSet<Resource>();
+
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return procedure.isDisposed();
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ procedure.exception(graph, t);
+ }
+
+ @Override
+ public String toString() {
+ return "forObjectSet " + procedure;
+ }
+
+ });
+
+ }
+
+ @Override
+ final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+ forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {
+
+ private Set<Statement> current = null;
+ private Set<Statement> run = new HashSet<Statement>();
+
+ @Override
+ public void execute(AsyncReadGraph graph, Statement result) {
+
+ boolean found = false;
+
+ if(current != null) {
+
+ found = current.remove(result);
+
+ }
+
+ if(!found) procedure.add(graph, result);
+
+ run.add(result);
+
+ }
+
+ @Override
+ public void finished(AsyncReadGraph graph) {
+
+ if(current != null) {
+ for(Statement s : current) procedure.remove(graph, s);
+ }
+
+ current = run;
+
+ run = new HashSet<Statement>();
+
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return procedure.isDisposed();
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ procedure.exception(graph, t);
+ }
+
+ @Override
+ public String toString() {
+ return "forStatementSet " + procedure;
+ }
+
+ });
+
+ }
+
+ @Override
+ final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,
+ final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ try {
+ QueryCache.runnerAssertedStatements(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new TripleIntProcedure() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, int s, int p, int o) {
+ try {
+ procedure.execute(graph, querySupport.getResource(o));
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+ try {
+ procedure.finished(graph);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ });
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ @Override
+ final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ IntProcedure ip = new IntProcedure() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, int i) {
+ try {
+ procedure.execute(graph, querySupport.getResource(i));
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+ try {
+ procedure.finished(graph);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec(this);
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec(this);
+ }
+
+ };
+
+ int sId = querySupport.getId(subject);
+
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);
+// else impl.state.barrier.inc(null, null);
+
+ try {
+ QueryCache.runnerPrincipalTypes(impl, sId, impl.parent, listener, ip);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ @Override
+ final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+// impl.state.barrier.inc();
+
+ try {
+ QueryCache.runnerPrincipalTypes(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, int i) {
+ try {
+ procedure.execute(querySupport.getResource(i));
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+ try {
+ procedure.finished();
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ procedure.exception(t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ });
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+
+ final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+ assert(listener == null);
+
+ InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
+
+ @Override
+ public void execute(final ReadGraphImpl graph, IntSet set) {
+ procedure.execute(graph, set);
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ procedure.exception(graph, t);
+ }
+
+ };
+
+ int sId = querySupport.getId(subject);
+
+ try {
+ QueryCache.runnerTypes(impl, sId, impl.parent, listener, ip);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ @Override
+ final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {
+
+ assert(subject != null);
+
+ return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
+
+ }
+
+ @Override
+ final public RelationInfo getRelationInfo(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
+
+ assert(subject != null);
+
+ return QueryCache.resultRelationInfoQuery(impl, querySupport.getId(subject), impl.parent, null);
+
+ }
+
+ @Override
+ final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ try {
+ QueryCache.runnerSuperTypes(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<IntSet>() {
+
+ AtomicBoolean first = new AtomicBoolean(true);
+
+ @Override
+ public void execute(final ReadGraphImpl graph, IntSet set) {
+// final HashSet<Resource> result = new HashSet<Resource>();
+// set.forEach(new TIntProcedure() {
+//
+// @Override
+// public boolean execute(int type) {
+// result.add(querySupport.getResource(type));
+// return true;
+// }
+//
+// });
+ try {
+ if(first.compareAndSet(true, false)) {
+ procedure.execute(graph, set);
+// impl.state.barrier.dec();
+ } else {
+ procedure.execute(impl.newRestart(graph), set);
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ if(first.compareAndSet(true, false)) {
+ procedure.exception(graph, t);
+// impl.state.barrier.dec();
+ } else {
+ procedure.exception(impl.newRestart(graph), t);
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ });
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ @Override
+ final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ IntProcedure ip = new IntProcedureAdapter() {
+
+ @Override
+ public void execute(final ReadGraphImpl graph, int superRelation) {
+ try {
+ procedure.execute(graph, querySupport.getResource(superRelation));
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void finished(final ReadGraphImpl graph) {
+ try {
+ procedure.finished(graph);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec(this);
+ }
+
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec(this);
+ }
+
+ };
+
+ int sId = querySupport.getId(subject);
+
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);
+// else impl.state.barrier.inc(null, null);
+
+ try {
+ QueryCache.runnerDirectSuperRelations(impl, sId, impl.parent, listener, ip);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+// DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);
+
+ }
+
+ @Override
+ final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+// impl.state.barrier.inc();
+
+ PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);
+
+ }
+
+ @Override
+ final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {
+
+ @Override
+ public void execute(final ReadGraphImpl graph, IntSet set) {
+// final HashSet<Resource> result = new HashSet<Resource>();
+// set.forEach(new TIntProcedure() {
+//
+// @Override
+// public boolean execute(int type) {
+// result.add(querySupport.getResource(type));
+// return true;
+// }
+//
+// });
+ try {
+ procedure.execute(graph, set);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec(this);
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec(this);
+ }
+
+ };
+
+ int sId = querySupport.getId(subject);
+
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);
+// else impl.state.barrier.inc(null, null);
+
+ try {
+ QueryCache.runnerSuperRelations(impl, sId, impl.parent, listener, ip);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {
+ return getValue(impl, querySupport.getId(subject));
+ }
+
+ final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
+ return QueryCache.resultValueQuery(impl, subject, impl.parent, null);
+ }
+
+ @Override
+ final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ int sId = querySupport.getId(subject);
+
+// if(procedure != null) {
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
+
+ AtomicBoolean first = new AtomicBoolean(true);
+
+ @Override
+ public void execute(ReadGraphImpl graph, byte[] result) {
+ try {
+ if(first.compareAndSet(true, false)) {
+ procedure.execute(graph, result);
+// impl.state.barrier.dec(this);
+ } else {
+ procedure.execute(impl.newRestart(graph), result);
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ if(first.compareAndSet(true, false)) {
+ procedure.exception(graph, t);
+// impl.state.barrier.dec(this);
+ } else {
+ procedure.exception(impl.newRestart(graph), t);
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ };
+
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
+// else impl.state.barrier.inc(null, null);
+
+ try {
+ QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
+ } catch (DatabaseException e) {
+ throw new IllegalStateException("Internal error");
+ }
+
+// } else {
+//
+// return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
+//
+// }
+//
+// throw new IllegalStateException("Internal error");
+
+ }
+
+ @Override
+ final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ if(impl.parent != null || listener != null) {
+
+ InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
+
+ AtomicBoolean first = new AtomicBoolean(true);
+
+ @Override
+ public void execute(ReadGraphImpl graph, byte[] result) {
+ try {
+ if(first.compareAndSet(true, false)) {
+ procedure.execute(graph, result);
+// impl.state.barrier.dec(this);
+ } else {
+ procedure.execute(impl.newRestart(graph), result);
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ if(first.compareAndSet(true, false)) {
+ procedure.exception(graph, t);
+// impl.state.barrier.dec(this);
+ } else {
+ procedure.exception(impl.newRestart(graph), t);
+ }
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ };
+
+ int sId = querySupport.getId(subject);
+
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);
+// else impl.state.barrier.inc(null, null);
+
+ try {
+ QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ } else {
+
+ InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, byte[] result) {
+
+ procedure.execute(graph, result);
+
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+
+ procedure.exception(graph, t);
+
+ }
+
+ };
+
+ int sId = querySupport.getId(subject);
+
+ try {
+ QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ }
+
+ @Override
+ final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {
+
+ assert(relation != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ IntProcedure ip = new IntProcedure() {
+
+ private int result = 0;
+
+ final AtomicBoolean found = new AtomicBoolean(false);
+ final AtomicBoolean done = new AtomicBoolean(false);
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+
+ // Shall fire exactly once!
+ if(done.compareAndSet(false, true)) {
+ try {
+ if(result == 0) {
+ procedure.exception(graph, new NoInverseException(""));
+// impl.state.barrier.dec(this);
+ } else {
+ procedure.execute(graph, querySupport.getResource(result));
+// impl.state.barrier.dec(this);
+ }
+ } catch (Throwable t) {
+ Logger.defaultLogError(t);
+ }
+ }
+
+ }
+
+ @Override
+ public void execute(ReadGraphImpl graph, int i) {
+
+ if(found.compareAndSet(false, true)) {
+ this.result = i;
+ } else {
+ // Shall fire exactly once!
+ if(done.compareAndSet(false, true)) {
+ try {
+ procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));
+// impl.state.barrier.dec(this);
+ } catch (Throwable t) {
+ Logger.defaultLogError(t);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ // Shall fire exactly once!
+ if(done.compareAndSet(false, true)) {
+ try {
+ procedure.exception(graph, t);
+// impl.state.barrier.dec(this);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+ }
+
+ };
+
+ int sId = querySupport.getId(relation);
+
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);
+// else impl.state.barrier.inc(null, null);
+
+ try {
+ QueryCache.runnerObjects(impl, sId, getInverseOf(), impl.parent, listener, ip);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ @Override
+ final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
+
+ assert(id != null);
+ assert(procedure != null);
+
+ InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, Integer result) {
+ try {
+ procedure.execute(graph, querySupport.getResource(result));
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec(this);
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec(this);
+ }
+
+ };
+
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");
+// else impl.state.barrier.inc(null, null);
+
+ forResource(impl, id, impl.parent, ip);
+
+ }
+
+ @Override
+ final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {
+
+ assert(id != null);
+ assert(procedure != null);
+
+// impl.state.barrier.inc();
+
+ try {
+ forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, Integer result) {
+ try {
+ procedure.execute(graph, querySupport.getResource(result));
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ });
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ @Override
+ final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ try {
+ IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
+ procedure.execute(impl, !result.isEmpty());
+ } catch (DatabaseException e) {
+ procedure.exception(impl, e);
+ }
+
+ }
+
+ @Override
+ final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+ AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {
+
+ boolean found = false;
+
+ @Override
+ synchronized public void execute(AsyncReadGraph graph, Resource resource) {
+ found = true;
+ }
+
+ @Override
+ synchronized public void finished(AsyncReadGraph graph) {
+ try {
+ procedure.execute(graph, found);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec(this);
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec(this);
+ }
+
+ };
+
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);
+// else impl.state.barrier.inc(null, null);
+
+ forEachObject(impl, subject, predicate, ip);
+
+ }
+
+ @Override
+ final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {
+
+ assert(subject != null);
+ assert(predicate != null);
+ assert(procedure != null);
+
+// impl.state.barrier.inc();
+
+ forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {
+
+ boolean found = false;
+
+ @Override
+ synchronized public void execute(AsyncReadGraph graph, Resource resource) {
+ if(resource.equals(object)) found = true;
+ }
+
+ @Override
+ synchronized public void finished(AsyncReadGraph graph) {
+ try {
+ procedure.execute(graph, found);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ });
+
+ }
+
+ @Override
+ final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+// impl.state.barrier.inc();
+
+ try {
+ QueryCache.runnerValueQuery(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {
+
+ @Override
+ public void execute(ReadGraphImpl graph, byte[] object) {
+ boolean result = object != null;
+ try {
+ procedure.execute(graph, result);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ });
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+ @Override
+ final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
+
+ assert(subject != null);
+ assert(procedure != null);
+
+ final ListenerBase listener = getListenerBase(procedure);
+
+ try {
+
+ QueryCache.runnerOrderedSet(impl, querySupport.getId(subject), impl.parent, listener, new IntProcedure() {
+
+ @Override
+ public void exception(ReadGraphImpl graph, Throwable t) {
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ @Override
+ public void execute(ReadGraphImpl graph, int i) {
+ try {
+ procedure.execute(graph, querySupport.getResource(i));
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void finished(ReadGraphImpl graph) {
+ try {
+ procedure.finished(graph);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+// impl.state.barrier.dec();
+ }
+
+ });
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ }
+
+// @Override
+// final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) throws DatabaseException {
+//
+// assert(request != null);
+// assert(procedure != null);
+//
+// QueryCache.runnerAsyncReadEntry(impl, request, parent, listener, procedure);
+//
+// }
+
+// @Override
+// final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {
+//
+// assert(graph != null);
+// assert(request != null);
+//
+// final ReadEntry entry = (ReadEntry)cache.getCached(request);
+// if(entry != null && entry.isReady()) {
+// return (T)entry.get(graph, this, null);
+// } else {
+// return request.perform(graph);
+// }
+//
+// }
+
+// final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {
+//
+// assert(graph != null);
+// assert(request != null);
+//
+// final ExternalReadEntry<T> entry = cache.externalReadMap.get(request);
+// if(entry != null && entry.isReady()) {
+// if(entry.isExcepted()) {
+// Throwable t = (Throwable)entry.getResult();
+// if(t instanceof DatabaseException) throw (DatabaseException)t;
+// else throw new DatabaseException(t);
+// } else {
+// return (T)entry.getResult();
+// }
+// } else {
+//
+// final DataContainer<T> result = new DataContainer<T>();
+// final DataContainer<Throwable> exception = new DataContainer<Throwable>();
+//
+// request.register(graph, new Listener<T>() {
+//
+// @Override
+// public void exception(Throwable t) {
+// exception.set(t);
+// }
+//
+// @Override
+// public void execute(T t) {
+// result.set(t);
+// }
+//
+// @Override
+// public boolean isDisposed() {
+// return true;
+// }
+//
+// });
+//
+// Throwable t = exception.get();
+// if(t != null) {
+// if(t instanceof DatabaseException) throw (DatabaseException)t;
+// else throw new DatabaseException(t);
+// }
+//
+// return result.get();
+//
+// }
+//
+// }
+
+// @Override
+// final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {
+//
+// assert(graph != null);
+// assert(request != null);
+//
+// final AsyncReadEntry entry = cache.asyncReadMap.get(request);
+// if(entry != null && entry.isReady()) {
+// if(entry.isExcepted()) {
+// procedure.exception(graph, (Throwable)entry.getResult());
+// } else {
+// procedure.execute(graph, (T)entry.getResult());
+// }
+// } else {
+// request.perform(graph, procedure);
+// }
+//
+// }
+
+ @Override
+ final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final SyncMultiProcedure<T> procedure, ListenerBase listener) {
+
+ assert(request != null);
+ assert(procedure != null);
+
+ try {
+
+ queryMultiRead(impl, request, parent, listener, procedure);
+
+ } catch (DatabaseException e) {
+
+ throw new IllegalStateException(e);
+
+ }
+
+ }
+
+ @Override
+ final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {
+
+ assert(request != null);
+ assert(procedure != null);
+
+// impl.state.barrier.inc();
+
+ runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {
+
+ public void execute(AsyncReadGraph graph, T result) {
+
+ try {
+ procedure.execute(graph, result);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+ }
+
+ @Override
+ public void finished(AsyncReadGraph graph) {
+
+ try {
+ procedure.finished(graph);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+
+// impl.state.barrier.dec();
+
+ }
+
+ @Override
+ public String toString() {
+ return procedure.toString();
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+
+ try {
+ procedure.exception(graph, t);
+ } catch (Throwable t2) {
+ Logger.defaultLogError(t2);
+ }
+
+// impl.state.barrier.dec();
+
+ }
+
+ });
+
+ }
+
+// @Override
+// final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
+//
+// assert(request != null);
+// assert(procedure != null);
+//
+// try {
+//
+// queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
+//
+// @Override
+// public String toString() {
+// return procedure.toString();
+// }
+//
+// @Override
+// public void execute(AsyncReadGraph graph, T result) {
+// try {
+// procedure.execute(result);
+// } catch (Throwable t2) {
+// Logger.defaultLogError(t2);
+// }
+// }
+//
+// @Override
+// public void exception(AsyncReadGraph graph, Throwable throwable) {
+// try {
+// procedure.exception(throwable);
+// } catch (Throwable t2) {
+// Logger.defaultLogError(t2);
+// }
+// }
+//
+// });
+//
+// } catch (DatabaseException e) {
+//
+// throw new IllegalStateException(e);
+//
+// }
+//
+// }
+
+ @Override
+ public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
+
+ return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
+
+ }
+
+ @Override
+ public VirtualGraph getProvider(Resource subject, Resource predicate) {
+
+ return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));
+
+ }
+
+ @Override
+ public VirtualGraph getValueProvider(Resource subject) {
+
+ return querySupport.getValueProvider(querySupport.getId(subject));
+
+ }
+
+ public boolean resumeTasks(ReadGraphImpl graph) {
+
+ return querySupport.resume(graph);
+
+ }
+
+ public boolean isImmutable(int resourceId) {
+ return querySupport.isImmutable(resourceId);
+ }
+
+ public boolean isImmutable(Resource resource) {
+ ResourceImpl impl = (ResourceImpl)resource;
+ return isImmutable(impl.id);
+ }
+
+ private Layer0 L0;
+
+ public Layer0 getL0(ReadGraph graph) {
+ if(L0 == null) {
+ L0 = Layer0.getInstance(graph);
+ }
+ return L0;
+ }
+
+ public Layer0 getL0() {
+ return L0;
+ }
+
+ public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
+ protected Integer initialValue() {
+ return -1;
+ }
+ };
+
+}