--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.db.impl.query;\r
+\r
+import java.io.BufferedOutputStream;\r
+import java.io.File;\r
+import java.io.FileOutputStream;\r
+import java.io.IOException;\r
+import java.io.PrintStream;\r
+import java.util.ArrayList;\r
+import java.util.Arrays;\r
+import java.util.Collection;\r
+import java.util.Collections;\r
+import java.util.HashMap;\r
+import java.util.HashSet;\r
+import java.util.IdentityHashMap;\r
+import java.util.Iterator;\r
+import java.util.LinkedList;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.atomic.AtomicBoolean;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+import java.util.concurrent.locks.Condition;\r
+import java.util.concurrent.locks.ReentrantLock;\r
+\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.db.AsyncReadGraph;\r
+import org.simantics.db.DevelopmentKeys;\r
+import org.simantics.db.DirectStatements;\r
+import org.simantics.db.ReadGraph;\r
+import org.simantics.db.RelationInfo;\r
+import org.simantics.db.Resource;\r
+import org.simantics.db.Session;\r
+import org.simantics.db.Statement;\r
+import org.simantics.db.VirtualGraph;\r
+import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.debug.ListenerReport;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;\r
+import org.simantics.db.exception.NoInverseException;\r
+import org.simantics.db.exception.ResourceNotFoundException;\r
+import org.simantics.db.impl.DebugPolicy;\r
+import org.simantics.db.impl.ResourceImpl;\r
+import org.simantics.db.impl.graph.MultiIntProcedure;\r
+import org.simantics.db.impl.graph.ReadGraphImpl;\r
+import org.simantics.db.impl.graph.ReadGraphSupport;\r
+import org.simantics.db.impl.graph.WriteGraphImpl;\r
+import org.simantics.db.impl.procedure.IntProcedureAdapter;\r
+import org.simantics.db.impl.procedure.InternalProcedure;\r
+import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;\r
+import org.simantics.db.impl.support.ResourceSupport;\r
+import org.simantics.db.procedure.AsyncMultiListener;\r
+import org.simantics.db.procedure.AsyncMultiProcedure;\r
+import org.simantics.db.procedure.AsyncProcedure;\r
+import org.simantics.db.procedure.AsyncSetListener;\r
+import org.simantics.db.procedure.Listener;\r
+import org.simantics.db.procedure.ListenerBase;\r
+import org.simantics.db.procedure.MultiProcedure;\r
+import org.simantics.db.procedure.Procedure;\r
+import org.simantics.db.procedure.StatementProcedure;\r
+import org.simantics.db.request.AsyncMultiRead;\r
+import org.simantics.db.request.AsyncRead;\r
+import org.simantics.db.request.ExternalRead;\r
+import org.simantics.db.request.MultiRead;\r
+import org.simantics.db.request.Read;\r
+import org.simantics.db.request.RequestFlags;\r
+import org.simantics.db.request.WriteTraits;\r
+import org.simantics.layer0.Layer0;\r
+import org.simantics.utils.DataContainer;\r
+import org.simantics.utils.Development;\r
+import org.simantics.utils.datastructures.Pair;\r
+import org.simantics.utils.datastructures.collections.CollectionUtils;\r
+import org.simantics.utils.datastructures.disposable.AbstractDisposable;\r
+\r
+import gnu.trove.map.hash.THashMap;\r
+import gnu.trove.procedure.TIntProcedure;\r
+import gnu.trove.procedure.TLongProcedure;\r
+import gnu.trove.procedure.TObjectProcedure;\r
+import gnu.trove.set.hash.THashSet;\r
+import gnu.trove.set.hash.TIntHashSet;\r
+\r
+@SuppressWarnings({"rawtypes", "unchecked"})\r
+final public class QueryProcessor extends AbstractDisposable implements ReadGraphSupport {\r
+\r
+ final public UnaryQueryHashMap<IntProcedure> directPredicatesMap;\r
+ final public UnaryQueryHashMap<IntProcedure> principalTypesMap;\r
+ final public THashMap<String, URIToResource> uriToResourceMap;\r
+ final public THashMap<String, NamespaceIndex> namespaceIndexMap22;\r
+ final public UnaryQueryHashMap<IntProcedure> projectsMap;\r
+ final public UnaryQueryHashMap<InternalProcedure<RelationInfo>> relationInfoMap;\r
+ final public UnaryQueryHashMap<InternalProcedure<IntSet>> superTypesMap;\r
+ final public UnaryQueryHashMap<InternalProcedure<IntSet>> typeHierarchyMap;\r
+ final public UnaryQueryHashMap<InternalProcedure<IntSet>> superRelationsMap;\r
+ final public UnaryQueryHashMap<InternalProcedure<IntSet>> typesMap;\r
+ final public UnaryQueryHashMap<InternalProcedure<byte[]>> valueMap;\r
+ final public DoubleKeyQueryHashMap<IntProcedure> directObjectsMap;\r
+ final public DoubleKeyQueryHashMap<IntProcedure> objectsMap;\r
+ final public UnaryQueryHashMap<IntProcedure> orderedSetMap;\r
+ final public UnaryQueryHashMap<IntProcedure> predicatesMap;\r
+ final public DoubleKeyQueryHashMap<TripleIntProcedure> statementsMap;\r
+ final public UnaryQueryHashMap<IntProcedure> assertedPredicatesMap;\r
+ final public BinaryQueryHashMap<TripleIntProcedure> assertedStatementsMap;\r
+ final public StableHashMap<ExternalRead, ExternalReadEntry> externalReadMap; \r
+ final public StableHashMap<AsyncRead, AsyncReadEntry> asyncReadMap; \r
+ final public StableHashMap<Read, ReadEntry> readMap;\r
+ final public StableHashMap<AsyncMultiRead, AsyncMultiReadEntry> asyncMultiReadMap; \r
+ final public StableHashMap<MultiRead, MultiReadEntry> multiReadMap; \r
+\r
+ final private THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners;\r
+\r
+ public static int indent = 0;\r
+\r
+ public int size = 0;\r
+ \r
+ \r
+ // Garbage collection\r
+ \r
+ public int boundQueries = 0;\r
+\r
+ // Statistics\r
+ private int hits = 0;\r
+\r
+ private int misses = 0;\r
+\r
+ private int updates = 0;\r
+\r
+ final private int functionalRelation;\r
+\r
+ final private int superrelationOf;\r
+\r
+ final private int instanceOf;\r
+\r
+ final private int inverseOf;\r
+\r
+ final private int asserts;\r
+\r
+ final private int hasPredicate;\r
+\r
+ final private int hasPredicateInverse;\r
+\r
+ final private int hasObject;\r
+\r
+ final private int inherits;\r
+\r
+ final private int subrelationOf;\r
+\r
+ final private int rootLibrary;\r
+\r
+ /**\r
+ * A cache for the root library resource. Initialized in\r
+ * {@link #getRootLibraryResource()}.\r
+ */\r
+ private volatile ResourceImpl rootLibraryResource;\r
+\r
+ final private int library;\r
+\r
+ final private int consistsOf;\r
+\r
+ final private int hasName;\r
+\r
+ AtomicInteger sleepers = new AtomicInteger(0);\r
+\r
+ private boolean updating = false;\r
+\r
+ static public boolean collecting = false;\r
+\r
+ private boolean firingListeners = false;\r
+\r
+ final public QuerySupport querySupport;\r
+ final public Session session;\r
+ final public ResourceSupport resourceSupport;\r
+\r
+ private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();\r
+\r
+ QueryThread[] executors;\r
+\r
+ public ArrayList<SessionTask>[] queues;\r
+\r
+ enum ThreadState {\r
+\r
+ INIT, RUN, SLEEP, DISPOSED\r
+\r
+ }\r
+\r
+ public ThreadState[] threadStates;\r
+ public ReentrantLock[] threadLocks;\r
+ public Condition[] threadConditions;\r
+\r
+ public ArrayList<SessionTask>[] ownTasks;\r
+\r
+ public ArrayList<SessionTask>[] ownSyncTasks;\r
+\r
+ ArrayList<SessionTask>[] delayQueues;\r
+ \r
+ public boolean synch = true;\r
+\r
+ final Object querySupportLock;\r
+ \r
+ public Long modificationCounter = 0L;\r
+\r
+ public void close() {\r
+ }\r
+\r
+ final public void scheduleOwn(int caller, SessionTask request) {\r
+ ownTasks[caller].add(request);\r
+ }\r
+\r
+ final public void scheduleAlways(int caller, SessionTask request) {\r
+\r
+ int performer = request.thread;\r
+ if(caller == performer) {\r
+ ownTasks[caller].add(request);\r
+ } else {\r
+ schedule(caller, request);\r
+ }\r
+\r
+ }\r
+\r
+ final public void schedule(int caller, SessionTask request) {\r
+\r
+ int performer = request.thread;\r
+\r
+ if(DebugPolicy.SCHEDULE)\r
+ System.out.println("schedule " + request + " " + caller + " -> " + performer);\r
+\r
+ assert(performer >= 0);\r
+\r
+ assert(request != null);\r
+\r
+ if(caller == performer) {\r
+ request.run(caller);\r
+ } else {\r
+ ReentrantLock queueLock = threadLocks[performer];\r
+ queueLock.lock();\r
+ queues[performer].add(request);\r
+ // This thread could have been sleeping\r
+ if(queues[performer].size() == 1) {\r
+ if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();\r
+ threadConditions[performer].signalAll();\r
+ }\r
+ queueLock.unlock();\r
+ }\r
+\r
+ }\r
+\r
+\r
+ final int THREADS;\r
+\r
+ final public int THREAD_MASK;\r
+ final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group"); \r
+\r
+ public static abstract class SessionTask {\r
+\r
+ final public int thread;\r
+ final public int syncCaller;\r
+ final public Object object;\r
+\r
+ public SessionTask(WriteTraits object, int thread) {\r
+ this.thread = thread;\r
+ this.syncCaller = -1;\r
+ this.object = object;\r
+ }\r
+\r
+ public SessionTask(Object object, int thread, int syncCaller) {\r
+ this.thread = thread;\r
+ this.syncCaller = syncCaller;\r
+ this.object = object;\r
+ }\r
+\r
+ public abstract void run(int thread);\r
+\r
+ @Override\r
+ public String toString() {\r
+ return "SessionTask[" + object + "]";\r
+ }\r
+\r
+ }\r
+\r
+ public static abstract class SessionRead extends SessionTask {\r
+\r
+ final public Semaphore notify;\r
+ final public DataContainer<Throwable> throwable; \r
+\r
+ public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {\r
+ super(object, thread, thread);\r
+ this.throwable = throwable;\r
+ this.notify = notify;\r
+ }\r
+\r
+ public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {\r
+ super(object, thread, syncThread);\r
+ this.throwable = throwable;\r
+ this.notify = notify;\r
+ }\r
+\r
+ }\r
+\r
+ long waitingTime = 0;\r
+\r
+ static int koss = 0;\r
+ static int koss2 = 0;\r
+\r
+ public boolean resume(ReadGraphImpl graph) {\r
+ return executors[0].runSynchronized();\r
+ }\r
+\r
+ public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)\r
+ throws DatabaseException {\r
+\r
+ THREADS = threads;\r
+ THREAD_MASK = threads - 1;\r
+\r
+ querySupport = core;\r
+ session = querySupport.getSession();\r
+ resourceSupport = querySupport.getSupport();\r
+ querySupportLock = core.getLock();\r
+\r
+ executors = new QueryThread[THREADS];\r
+ queues = new ArrayList[THREADS];\r
+ threadLocks = new ReentrantLock[THREADS];\r
+ threadConditions = new Condition[THREADS];\r
+ threadStates = new ThreadState[THREADS];\r
+ ownTasks = new ArrayList[THREADS];\r
+ ownSyncTasks = new ArrayList[THREADS];\r
+ delayQueues = new ArrayList[THREADS * THREADS];\r
+\r
+ // freeSchedule = new AtomicInteger(0);\r
+\r
+ for (int i = 0; i < THREADS * THREADS; i++) {\r
+ delayQueues[i] = new ArrayList<SessionTask>();\r
+ }\r
+\r
+ for (int i = 0; i < THREADS; i++) {\r
+\r
+ // tasks[i] = new ArrayList<Runnable>();\r
+ ownTasks[i] = new ArrayList<SessionTask>();\r
+ ownSyncTasks[i] = new ArrayList<SessionTask>();\r
+ queues[i] = new ArrayList<SessionTask>();\r
+ threadLocks[i] = new ReentrantLock();\r
+ threadConditions[i] = threadLocks[i].newCondition();\r
+ // limits[i] = false;\r
+ threadStates[i] = ThreadState.INIT;\r
+\r
+ }\r
+\r
+ for (int i = 0; i < THREADS; i++) {\r
+\r
+ final int index = i;\r
+\r
+ executors[i] = new QueryThread(session, this, index, "Query Thread " + index);\r
+\r
+ threadSet.add(executors[i]);\r
+\r
+ }\r
+\r
+ directPredicatesMap = new UnaryQueryHashMap();\r
+ valueMap = new UnaryQueryHashMap();\r
+ principalTypesMap = new UnaryQueryHashMap();\r
+ uriToResourceMap = new THashMap<String, URIToResource>();\r
+ namespaceIndexMap22 = new THashMap<String, NamespaceIndex>();\r
+ projectsMap = new UnaryQueryHashMap();\r
+ relationInfoMap = new UnaryQueryHashMap();\r
+ typeHierarchyMap = new UnaryQueryHashMap();\r
+ superTypesMap = new UnaryQueryHashMap();\r
+ superRelationsMap = new UnaryQueryHashMap();\r
+ typesMap = new UnaryQueryHashMap();\r
+ objectsMap = new DoubleKeyQueryHashMap();\r
+ orderedSetMap = new UnaryQueryHashMap();\r
+ predicatesMap = new UnaryQueryHashMap();\r
+ statementsMap = new DoubleKeyQueryHashMap();\r
+ directObjectsMap = new DoubleKeyQueryHashMap();\r
+ assertedPredicatesMap = new UnaryQueryHashMap();\r
+ assertedStatementsMap = new BinaryQueryHashMap();\r
+ asyncReadMap = new StableHashMap<AsyncRead, AsyncReadEntry>(); \r
+ readMap = new StableHashMap<Read, ReadEntry>();\r
+ asyncMultiReadMap = new StableHashMap<AsyncMultiRead, AsyncMultiReadEntry>(); \r
+ multiReadMap = new StableHashMap<MultiRead, MultiReadEntry>(); \r
+ externalReadMap = new StableHashMap<ExternalRead, ExternalReadEntry>(); \r
+ listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);\r
+\r
+ // Now start threads\r
+ for (int i = 0; i < THREADS; i++) {\r
+ executors[i].start();\r
+ }\r
+\r
+ // Make sure that query threads are up and running\r
+ while(sleepers.get() != THREADS) {\r
+ try {\r
+ Thread.sleep(5);\r
+ } catch (InterruptedException e) {\r
+ e.printStackTrace();\r
+ }\r
+ }\r
+\r
+ rootLibrary = core.getBuiltin("http:/");\r
+ boolean builtinsInstalled = rootLibrary != 0;\r
+\r
+ if (builtinsInstalled) {\r
+ functionalRelation = core.getBuiltin(Layer0.URIs.FunctionalRelation);\r
+ assert (functionalRelation != 0);\r
+ } else\r
+ functionalRelation = 0;\r
+\r
+ if (builtinsInstalled) {\r
+ instanceOf = core.getBuiltin(Layer0.URIs.InstanceOf);\r
+ assert (instanceOf != 0);\r
+ } else\r
+ instanceOf = 0;\r
+\r
+ if (builtinsInstalled) {\r
+ inverseOf = core.getBuiltin(Layer0.URIs.InverseOf);\r
+ assert (inverseOf != 0);\r
+ } else\r
+ inverseOf = 0;\r
+\r
+\r
+ if (builtinsInstalled) {\r
+ inherits = core.getBuiltin(Layer0.URIs.Inherits);\r
+ assert (inherits != 0);\r
+ } else\r
+ inherits = 0;\r
+\r
+ if (builtinsInstalled) {\r
+ asserts = core.getBuiltin(Layer0.URIs.Asserts);\r
+ assert (asserts != 0);\r
+ } else\r
+ asserts = 0;\r
+\r
+ if (builtinsInstalled) {\r
+ hasPredicate = core.getBuiltin(Layer0.URIs.HasPredicate);\r
+ assert (hasPredicate != 0);\r
+ } else\r
+ hasPredicate = 0;\r
+\r
+ if (builtinsInstalled) {\r
+ hasPredicateInverse = core.getBuiltin(Layer0.URIs.HasPredicateInverse);\r
+ assert (hasPredicateInverse != 0);\r
+ } else\r
+ hasPredicateInverse = 0;\r
+\r
+ if (builtinsInstalled) {\r
+ hasObject = core.getBuiltin(Layer0.URIs.HasObject);\r
+ assert (hasObject != 0);\r
+ } else\r
+ hasObject = 0;\r
+\r
+ if (builtinsInstalled) {\r
+ subrelationOf = core.getBuiltin(Layer0.URIs.SubrelationOf);\r
+ assert (subrelationOf != 0);\r
+ } else\r
+ subrelationOf = 0;\r
+\r
+ if (builtinsInstalled) {\r
+ superrelationOf = core.getBuiltin(Layer0.URIs.SuperrelationOf);\r
+ assert (superrelationOf != 0);\r
+ } else\r
+ superrelationOf = 0;\r
+\r
+ if (builtinsInstalled) {\r
+ library = core.getBuiltin(Layer0.URIs.Library);\r
+ assert (library != 0);\r
+ } else\r
+ library = 0;\r
+\r
+ if (builtinsInstalled) {\r
+ consistsOf = core.getBuiltin(Layer0.URIs.ConsistsOf);\r
+ assert (consistsOf != 0);\r
+ } else\r
+ consistsOf = 0;\r
+\r
+ if (builtinsInstalled) {\r
+ hasName = core.getBuiltin(Layer0.URIs.HasName);\r
+ assert (hasName != 0);\r
+ } else\r
+ hasName = 0;\r
+\r
+ }\r
+\r
+ final public void releaseWrite(ReadGraphImpl graph) {\r
+ performDirtyUpdates(graph);\r
+ modificationCounter++;\r
+ }\r
+\r
+ final public int getId(final Resource r) {\r
+ return querySupport.getId(r);\r
+ }\r
+\r
+ public QuerySupport getCore() {\r
+ return querySupport;\r
+ }\r
+\r
+ public int getFunctionalRelation() {\r
+ return functionalRelation;\r
+ }\r
+\r
+ public int getInherits() {\r
+ return inherits;\r
+ }\r
+\r
+ public int getInstanceOf() {\r
+ return instanceOf;\r
+ }\r
+\r
+ public int getInverseOf() {\r
+ return inverseOf;\r
+ }\r
+\r
+ public int getSubrelationOf() {\r
+ return subrelationOf;\r
+ }\r
+\r
+ public int getSuperrelationOf() {\r
+ return superrelationOf;\r
+ }\r
+\r
+ public int getAsserts() {\r
+ return asserts;\r
+ }\r
+\r
+ public int getHasPredicate() {\r
+ return hasPredicate;\r
+ }\r
+\r
+ public int getHasPredicateInverse() {\r
+ return hasPredicateInverse;\r
+ }\r
+\r
+ public int getHasObject() {\r
+ return hasObject;\r
+ }\r
+\r
+ public int getRootLibrary() {\r
+ return rootLibrary;\r
+ }\r
+\r
+ public Resource getRootLibraryResource() {\r
+ if (rootLibraryResource == null) {\r
+ // Synchronization is not needed here, it doesn't matter if multiple\r
+ // threads simultaneously set rootLibraryResource once.\r
+ int root = getRootLibrary();\r
+ if (root == 0)\r
+ throw new UnsupportedOperationException("database is not initialized, cannot get root library resource");\r
+ this.rootLibraryResource = new ResourceImpl(querySupport.getSupport(), root);\r
+ }\r
+ return rootLibraryResource;\r
+ }\r
+\r
+ public int getLibrary() {\r
+ return library;\r
+ }\r
+\r
+ public int getConsistsOf() {\r
+ return consistsOf;\r
+ }\r
+\r
+ public int getHasName() {\r
+ return hasName;\r
+ }\r
+\r
+ public void forResource(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {\r
+\r
+ URIToResource.queryEach(graph, id, parent, null, new InternalProcedure<Integer>() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, Integer result) {\r
+\r
+ if (result != null && result != 0) {\r
+ procedure.execute(graph, result);\r
+ return;\r
+ }\r
+\r
+ // Fall back to using the fixed builtins.\r
+ result = querySupport.getBuiltin(id);\r
+ if (result != 0) {\r
+ procedure.execute(graph, result);\r
+ return;\r
+ } \r
+\r
+ try {\r
+ result = querySupport.getRandomAccessReference(id);\r
+ } catch (ResourceNotFoundException e) {\r
+ procedure.exception(graph, e);\r
+ return;\r
+ }\r
+\r
+ if (result != 0) {\r
+ procedure.execute(graph, result);\r
+ } else {\r
+ procedure.exception(graph, new ResourceNotFoundException(id));\r
+ }\r
+\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ procedure.exception(graph, t);\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ public void forBuiltin(ReadGraphImpl graph, final String id, CacheEntry parent, final InternalProcedure<Integer> procedure) {\r
+\r
+ Integer result = querySupport.getBuiltin(id);\r
+ if (result != 0) {\r
+ procedure.execute(graph, result);\r
+ } else {\r
+ procedure.exception(graph, new ResourceNotFoundException(id));\r
+ }\r
+\r
+ }\r
+\r
+ public final <T> void runAsyncRead(final ReadGraphImpl graph, final AsyncRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure) {\r
+\r
+ int hash = requestHash(query);\r
+\r
+ AsyncReadEntry<T> entry = asyncReadMap.get(query, hash); \r
+\r
+ if(parent == null && listener == null) {\r
+ if(entry != null && (entry.isReady() || entry.isExcepted())) {\r
+ System.out.println("ready " + query);\r
+ entry.performFromCache(graph, this, procedure);\r
+// graph.state.barrier.dec(query);\r
+ return;\r
+ } else {\r
+ query.perform(graph, procedure);\r
+// graph.state.barrier.dec(query);\r
+ return;\r
+ }\r
+ }\r
+\r
+ if(entry == null) {\r
+\r
+ entry = new AsyncReadEntry<T>(query);\r
+ entry.setPending();\r
+ entry.clearResult(querySupport);\r
+ asyncReadMap.put(query, entry, hash);\r
+\r
+ performForEach(graph, query, entry, parent, listener, procedure, false);\r
+\r
+ } else {\r
+\r
+ if(entry.isPending()) {\r
+ synchronized(entry) {\r
+ if(entry.isPending()) {\r
+ throw new IllegalStateException();\r
+ // final AsyncBarrierImpl parentBarrier = graph.state.barrier;\r
+ // if(entry.procs == null) entry.procs = new ArrayList<AsyncProcedure<T>>();\r
+ // entry.procs.add(new AsyncProcedure<T>() {\r
+ //\r
+ // @Override\r
+ // public void execute(AsyncReadGraph graph, T result) {\r
+ // procedure.execute(graph, result);\r
+ // parentBarrier.dec(query);\r
+ // }\r
+ //\r
+ // @Override\r
+ // public void exception(AsyncReadGraph graph, Throwable throwable) {\r
+ // procedure.exception(graph, throwable);\r
+ // parentBarrier.dec(query);\r
+ // }\r
+ // \r
+ // });\r
+// if(graph.parent != null || listener != null) {\r
+// registerDependencies(graph, entry, parent, listener, procedure, false);\r
+// }\r
+//\r
+// query.perform(graph, procedure);\r
+//\r
+// return;\r
+\r
+ }\r
+ }\r
+ }\r
+\r
+ if(entry.isReady()) { \r
+ entry.performFromCache(graph, this, procedure);\r
+ registerDependencies(graph, entry, parent, listener, procedure, false);\r
+ } else {\r
+ performForEach(graph, query, entry, parent, listener, procedure, false);\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+\r
+\r
+ final static <T> void runMultiRead(final ReadGraphImpl graph, MultiReadEntry cached, final MultiRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {\r
+\r
+ MultiReadEntry entry = cached != null ? cached : provider.multiReadMap.get(query); \r
+ if(entry == null) {\r
+\r
+ entry = new MultiReadEntry(query);\r
+ entry.setPending();\r
+ entry.clearResult(provider.querySupport);\r
+\r
+ provider.multiReadMap.put(query, entry);\r
+\r
+ provider.performForEach(graph, query, entry, parent, listener, procedure, false);\r
+\r
+ } else {\r
+\r
+ if(entry.isPending()) {\r
+\r
+ synchronized(entry) {\r
+\r
+ if(entry.isPending()) {\r
+ throw new IllegalStateException();\r
+\r
+// if(entry.procs == null) entry.procs = new ArrayList<Pair<AsyncMultiProcedure<T>, AsyncBarrier>>();\r
+// entry.procs.add(new Pair(procedure, parentBarrier));\r
+// if(graph.parent != null || listener != null) {\r
+// provider.registerDependencies(graph, entry, parent, listener, procedure, false);\r
+// }\r
+\r
+ // If this was synchronized we must wait here until completion\r
+ // if(graph.state.synchronizedExecution) {\r
+ // while(entry.isPending()) {\r
+ // graph.resumeTasks(graph.callerThread, null, null);\r
+ // }\r
+ // }\r
+//\r
+// return;\r
+\r
+ }\r
+ }\r
+\r
+ entry.performFromCache(graph, provider, procedure);\r
+// graph.state.barrier.dec(query);\r
+ return;\r
+\r
+ } else {\r
+\r
+ provider.performForEach(graph, query, entry, parent, listener, procedure, false);\r
+\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+\r
+ public final <T> void runAsyncMultiRead(final ReadGraphImpl graph, final AsyncMultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {\r
+\r
+ int hash = requestHash(query);\r
+\r
+ AsyncMultiReadEntry entry = asyncMultiReadMap.get(query, hash);\r
+\r
+ if(parent == null && listener == null) {\r
+ if(entry != null && (entry.isReady() || entry.isExcepted())) {\r
+ System.out.println("ready " + query);\r
+ entry.performFromCache(graph, this, procedure);\r
+ return;\r
+ } else {\r
+ query.perform(graph, procedure);\r
+ return;\r
+ }\r
+ }\r
+\r
+ if(entry == null) {\r
+\r
+ entry = new AsyncMultiReadEntry<T>(query); \r
+ entry.setPending();\r
+ entry.clearResult(querySupport);\r
+\r
+ asyncMultiReadMap.put(query, entry, hash);\r
+\r
+ performForEach(graph, query, entry, parent, listener, procedure, false);\r
+\r
+ } else {\r
+\r
+ if(entry.isPending()) {\r
+ \r
+ synchronized(entry) {\r
+ if(entry.isPending()) {\r
+ throw new IllegalStateException();\r
+// if(entry.procs == null) entry.procs = new ArrayList<AsyncMultiProcedure<T>>();\r
+// entry.procs.add(procedure);\r
+// if(graph.parent != null || listener != null) {\r
+// registerDependencies(graph, entry, parent, listener, procedure, false);\r
+// }\r
+// return;\r
+ }\r
+ }\r
+ }\r
+\r
+ performForEach(graph, query, entry, parent, listener, procedure, false);\r
+\r
+ }\r
+\r
+ }\r
+\r
+ final static <T> void runPrimitiveRead(ReadGraphImpl graph, ExternalReadEntry cached, final ExternalRead<T> query, final CacheEntry parent, final QueryProcessor provider, final ListenerBase listener, final Procedure<T> procedure) {\r
+\r
+ final ExternalReadEntry<T> entry = cached != null ? cached : provider.externalReadMap.get(query); \r
+ if(entry == null) {\r
+ provider.performForEach(graph, query, new ExternalReadEntry<T>(query), parent, listener, procedure, false);\r
+ } else {\r
+ if(entry.isPending()) {\r
+ synchronized(entry) {\r
+ if(entry.isPending()) {\r
+ throw new IllegalStateException();\r
+// if(entry.procs == null) entry.procs = new ArrayList<Procedure<T>>();\r
+// entry.procs.add(procedure);\r
+// return;\r
+ }\r
+ }\r
+ }\r
+ provider.performForEach(graph, query, entry, parent, listener, procedure, false);\r
+ }\r
+\r
+ }\r
+\r
+ public int requestHash(Object object) {\r
+ try {\r
+ return object.hashCode();\r
+ } catch (Throwable t) {\r
+ Logger.defaultLogError(t);\r
+ return 0;\r
+ }\r
+ }\r
+ \r
+ @Override\r
+ public <T> T queryRead(final ReadGraphImpl graph, final Read<T> query, final CacheEntry parent, final AsyncProcedure<T> procedure, final ListenerBase listener) throws Throwable {\r
+\r
+ assert(query != null);\r
+\r
+ ReadEntry entry = readMap.get(query);\r
+\r
+ if(entry != null) {\r
+ if(parent == null && (listener == null || listener.isDisposed()) && entry.isReady()) {\r
+ return (T)entry.get(graph, this, procedure);\r
+ } else if (entry.isPending()) {\r
+ throw new IllegalStateException();\r
+ }\r
+ }\r
+\r
+ if(entry == null) {\r
+\r
+ entry = new ReadEntry(query);\r
+ entry.setPending();\r
+ entry.clearResult(querySupport);\r
+\r
+ readMap.put(query, entry);\r
+\r
+ return (T)performForEach(graph, query, entry, parent, listener, procedure, false);\r
+\r
+ } else {\r
+\r
+ if(entry.isPending()) {\r
+ throw new IllegalStateException();\r
+ } else {\r
+ return (T)performForEach(graph, query, entry, parent, listener, procedure, false);\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+\r
+ public <T> void queryMultiRead(final ReadGraphImpl graph, final MultiRead<T> query, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure) {\r
+\r
+ assert(query != null);\r
+ assert(procedure != null);\r
+\r
+ final MultiReadEntry entry = multiReadMap.get(query);\r
+\r
+ if(parent == null && !(listener != null)) {\r
+ if(entry != null && entry.isReady()) { \r
+ entry.performFromCache(graph, this, procedure);\r
+ return;\r
+ }\r
+ }\r
+\r
+ runMultiRead(graph, entry, query, parent, this, listener, procedure);\r
+\r
+ }\r
+\r
+ public <T> void queryPrimitiveRead(final ReadGraphImpl graph, final ExternalRead<T> query, final CacheEntry parent, final ListenerBase listener, final Procedure<T> procedure) {\r
+\r
+ assert(query != null);\r
+ assert(procedure != null);\r
+\r
+ final ExternalReadEntry entry = externalReadMap.get(query);\r
+\r
+ if(parent == null && !(listener != null)) {\r
+ if(entry != null && entry.isReady()) { \r
+ entry.performFromCache(procedure);\r
+ return;\r
+ }\r
+ }\r
+\r
+ runPrimitiveRead(graph, entry, query, parent, this, listener, procedure);\r
+\r
+ }\r
+\r
+ public <T> void performForEach(ReadGraphImpl parentGraph, final AsyncRead<T> query, final AsyncReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final AsyncProcedure<T> procedure,\r
+ boolean inferredDependency) {\r
+\r
+ if (DebugPolicy.PERFORM)\r
+ System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
+\r
+ assert (!dirty);\r
+ assert (!collecting);\r
+\r
+ assert(!entry.isDiscarded());\r
+\r
+ final ListenerEntry listenerEntry = registerDependencies(parentGraph, entry, parent, base, procedure, inferredDependency);\r
+\r
+ // FRESH, REFUTED, EXCEPTED go here \r
+ if (!entry.isReady()) {\r
+\r
+ entry.setPending();\r
+\r
+ size++;\r
+\r
+ try {\r
+\r
+ final ReadGraphImpl finalParentGraph = parentGraph;\r
+\r
+ query.perform(parentGraph.withParent(entry), new AsyncProcedure<T>() {\r
+\r
+ @Override\r
+ public void execute(AsyncReadGraph returnGraph, T result) {\r
+ ReadGraphImpl impl = (ReadGraphImpl)returnGraph;\r
+ //AsyncReadGraph resumeGraph = finalParentGraph.newAsync();\r
+ entry.addOrSet(finalParentGraph, result);\r
+ if(listenerEntry != null) {\r
+ primeListenerEntry(listenerEntry, result);\r
+ }\r
+ try {\r
+ procedure.execute(finalParentGraph, result);\r
+ } catch (Throwable t) {\r
+ t.printStackTrace();\r
+ }\r
+// parentBarrier.dec(query);\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph returnGraph, Throwable t) {\r
+ ReadGraphImpl impl = (ReadGraphImpl)returnGraph;\r
+// AsyncReadGraph resumeGraph = finalParentGraph.newAsync();\r
+ entry.except(finalParentGraph, t);\r
+ try {\r
+ procedure.exception(finalParentGraph, t);\r
+ } catch (Throwable t2) {\r
+ t2.printStackTrace();\r
+ }\r
+// parentBarrier.dec(query);\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return procedure.toString();\r
+ }\r
+\r
+ });\r
+\r
+ } catch (Throwable t) {\r
+\r
+ entry.except(t);\r
+ try {\r
+ procedure.exception(parentGraph, t);\r
+ } catch (Throwable t2) {\r
+ t2.printStackTrace();\r
+ }\r
+// parentBarrier.dec(query);\r
+\r
+ }\r
+\r
+ misses++;\r
+\r
+ } else {\r
+\r
+ entry.performFromCache(parentGraph, this, new AsyncProcedure<T>() {\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable throwable) {\r
+ procedure.exception(graph, throwable);\r
+ }\r
+\r
+ @Override\r
+ public void execute(AsyncReadGraph graph, T result) {\r
+ procedure.execute(graph, result);\r
+ if(listenerEntry != null) {\r
+ primeListenerEntry(listenerEntry, result);\r
+ }\r
+ }\r
+\r
+ });\r
+\r
+// parentBarrier.dec(query);\r
+\r
+ hits++;\r
+\r
+ }\r
+\r
+ assert (!entry.isDiscarded());\r
+\r
+ }\r
+\r
+ public <T> T performForEach(final ReadGraphImpl graph, final Read<T> query, final ReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncProcedure<T> procedure,\r
+ boolean inferredDependency) throws Throwable {\r
+\r
+ if (DebugPolicy.PERFORM)\r
+ System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
+\r
+ assert (!dirty);\r
+ assert (!collecting);\r
+\r
+ entry.assertNotDiscarded();\r
+\r
+ if(entry.isReady()) {\r
+\r
+ // EXCEPTED goes here\r
+\r
+// if(procedure != null) entry.performFromCache(graph, this, procedure);\r
+// parentBarrier.dec(query);\r
+ hits++;\r
+\r
+ ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);\r
+ \r
+ T result = (T)entry.get(graph, this, procedure); \r
+\r
+ if(listenerEntry != null) primeListenerEntry(listenerEntry, result);\r
+\r
+ return result;\r
+\r
+ } else {\r
+\r
+ // FRESH, REFUTED, PENDING go here\r
+\r
+ entry.setPending();\r
+\r
+ size++;\r
+ misses++;\r
+\r
+ ListenerEntry listenerEntry = registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);\r
+\r
+ final ReadGraphImpl performGraph = graph.newSync(entry);\r
+\r
+ try {\r
+\r
+ if(Development.DEVELOPMENT)\r
+ Development.recordHistogram("run " + query);\r
+\r
+ T result = query.perform(performGraph);\r
+ entry.addOrSet(performGraph, result);\r
+ \r
+ if(listenerEntry != null) primeListenerEntry(listenerEntry, result);\r
+\r
+ return (T)entry.get(graph, this, procedure);\r
+\r
+ } catch (Throwable t) {\r
+\r
+ entry.except(t);\r
+ return (T)entry.get(graph, this, procedure);\r
+\r
+ }\r
+\r
+ } \r
+\r
+ }\r
+\r
+ public <T> void performForEach(final ReadGraphImpl graph, final MultiRead<T> query, final MultiReadEntry<T> entry, CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,\r
+ boolean inferredDependency) {\r
+\r
+ if (DebugPolicy.PERFORM)\r
+ System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
+\r
+ assert (!dirty);\r
+ assert (!collecting);\r
+\r
+ assert(!entry.isPending());\r
+ assert(!entry.isDiscarded());\r
+\r
+ // FRESH, REFUTED, EXCEPTED go here \r
+ if (!entry.isReady()) {\r
+\r
+ entry.setPending();\r
+ entry.clearResult(querySupport);\r
+\r
+ multiReadMap.put(query, entry);\r
+ size++;\r
+\r
+ final ReadGraphImpl newGraph = graph.newSync(entry);\r
+// newGraph.state.barrier.inc();\r
+\r
+ try {\r
+\r
+ query.perform(newGraph, new AsyncMultiProcedure<T>() {\r
+\r
+ @Override\r
+ public void execute(AsyncReadGraph graph, T result) {\r
+ entry.addOrSet(result);\r
+ try {\r
+ procedure.execute(graph, result);\r
+ } catch (Throwable t) {\r
+ t.printStackTrace();\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(AsyncReadGraph graph) {\r
+ entry.finish(graph);\r
+ try {\r
+ procedure.finished(graph);\r
+ } catch (Throwable t) {\r
+ t.printStackTrace();\r
+ }\r
+// newGraph.state.barrier.dec();\r
+// parentBarrier.dec();\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable t) {\r
+ entry.except(t);\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ t2.printStackTrace();\r
+ }\r
+// newGraph.state.barrier.dec();\r
+// parentBarrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ } catch (DatabaseException e) {\r
+\r
+ entry.except(e);\r
+ try {\r
+ procedure.exception(graph, e);\r
+ } catch (Throwable t2) {\r
+ t2.printStackTrace();\r
+ }\r
+// newGraph.state.barrier.dec();\r
+// parentBarrier.dec();\r
+\r
+ } catch (Throwable t) {\r
+\r
+ DatabaseException e = new DatabaseException(t);\r
+\r
+ entry.except(e);\r
+ try {\r
+ procedure.exception(graph, e);\r
+ } catch (Throwable t2) {\r
+ t2.printStackTrace();\r
+ }\r
+// newGraph.state.barrier.dec();\r
+// parentBarrier.dec();\r
+\r
+ }\r
+\r
+ misses++;\r
+\r
+ } else {\r
+\r
+ entry.performFromCache(graph, this, procedure);\r
+ hits++;\r
+\r
+\r
+ }\r
+\r
+ assert (!entry.isDiscarded());\r
+\r
+ registerDependencies(graph, entry, parent, listener, procedure, inferredDependency);\r
+\r
+ }\r
+\r
+\r
+ public <T> void performForEach(final ReadGraphImpl callerGraph, AsyncMultiRead<T> query, final AsyncMultiReadEntry<T> entry, final CacheEntry parent, final ListenerBase listener, final AsyncMultiProcedure<T> procedure,\r
+ boolean inferredDependency) {\r
+\r
+ if (DebugPolicy.PERFORM)\r
+ System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
+\r
+ assert (!dirty);\r
+ assert (!collecting);\r
+\r
+ try {\r
+\r
+ assert(!entry.isDiscarded());\r
+\r
+ // FRESH, REFUTED, EXCEPTED go here \r
+ if (!entry.isReady()) {\r
+\r
+ size++;\r
+\r
+ try {\r
+\r
+ ReadGraphImpl performGraph = callerGraph.withAsyncParent(entry);\r
+\r
+ query.perform(performGraph, new AsyncMultiProcedure<T>() {\r
+\r
+ @Override\r
+ public void execute(AsyncReadGraph graph, T result) {\r
+ ReadGraphImpl impl = (ReadGraphImpl)graph;\r
+// ReadGraphImpl executeGraph = callerGraph.newAsync();\r
+ entry.addOrSet(result);\r
+ try {\r
+ procedure.execute(callerGraph, result);\r
+ } catch (Throwable t) {\r
+ t.printStackTrace();\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(AsyncReadGraph graph) {\r
+ ReadGraphImpl impl = (ReadGraphImpl)graph;\r
+// ReadGraphImpl executeGraph = callerGraph.newAsync();\r
+ entry.finish(callerGraph);\r
+ try {\r
+ procedure.finished(callerGraph);\r
+ } catch (Throwable t) {\r
+ t.printStackTrace();\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable t) {\r
+ ReadGraphImpl impl = (ReadGraphImpl)graph;\r
+// ReadGraphImpl executeGraph = callerGraph.newAsync();\r
+ entry.except(callerGraph, t);\r
+ try {\r
+ procedure.exception(callerGraph, t);\r
+ } catch (Throwable t2) {\r
+ t2.printStackTrace();\r
+ }\r
+ }\r
+\r
+ });\r
+\r
+ } catch (Throwable t) {\r
+\r
+ entry.except(t);\r
+ try {\r
+ procedure.exception(callerGraph, t);\r
+ } catch (Throwable t2) {\r
+ t2.printStackTrace();\r
+ }\r
+\r
+ }\r
+\r
+\r
+ misses++;\r
+\r
+ } else {\r
+\r
+ entry.performFromCache(callerGraph, this, procedure);\r
+\r
+ hits++;\r
+\r
+ }\r
+\r
+ assert (!entry.isDiscarded());\r
+\r
+ registerDependencies(callerGraph, entry, parent, listener, procedure, inferredDependency);\r
+\r
+ } catch (Throwable t) {\r
+\r
+ Logger.defaultLogError(t);\r
+\r
+ } finally {\r
+\r
+ }\r
+\r
+ }\r
+\r
+ public <T> void performForEach(ReadGraphImpl graph, final ExternalRead<T> query, final ExternalReadEntry<T> entry, final CacheEntry parent, final ListenerBase base, final Procedure<T> procedure,\r
+ boolean inferredDependency) {\r
+\r
+ if (DebugPolicy.PERFORM)\r
+ System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
+\r
+ assert (!dirty);\r
+ assert (!collecting);\r
+\r
+ assert(!entry.isPending());\r
+ assert(!entry.isDiscarded());\r
+\r
+ registerDependencies(graph, entry, parent, base, procedure, inferredDependency);\r
+\r
+ // FRESH, REFUTED, EXCEPTED go here \r
+ if (!entry.isReady()) {\r
+\r
+ entry.setPending();\r
+ entry.clearResult(querySupport);\r
+\r
+ externalReadMap.put(query, entry);\r
+ size++;\r
+\r
+ try {\r
+\r
+ query.register(graph, new Listener<T>() {\r
+\r
+ AtomicBoolean used = new AtomicBoolean(false);\r
+\r
+ @Override\r
+ public void execute(T result) {\r
+ \r
+ // Just for safety\r
+ if(entry.isDiscarded()) return;\r
+ if(entry.isExcepted()) entry.setPending();\r
+ \r
+ if(used.compareAndSet(false, true)) {\r
+ entry.addOrSet(QueryProcessor.this, result);\r
+ procedure.execute(result);\r
+ } else {\r
+ entry.queue(result);\r
+ updatePrimitive(query);\r
+ }\r
+ \r
+ }\r
+\r
+ @Override\r
+ public void exception(Throwable t) {\r
+ \r
+ entry.except(t);\r
+\r
+ if(used.compareAndSet(false, true)) {\r
+ procedure.exception(t);\r
+ } else {\r
+// entry.queue(result);\r
+ updatePrimitive(query);\r
+ }\r
+ \r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return procedure.toString();\r
+ }\r
+\r
+ @Override\r
+ public boolean isDisposed() {\r
+ return entry.isDiscarded() || !isBound(entry);\r
+ }\r
+\r
+ });\r
+\r
+ } catch (Throwable t) {\r
+\r
+ entry.except(t);\r
+ procedure.exception(t);\r
+\r
+ }\r
+\r
+ misses++;\r
+\r
+ } else {\r
+\r
+ entry.performFromCache(procedure);\r
+\r
+ hits++;\r
+\r
+ }\r
+\r
+ assert (!entry.isDiscarded());\r
+\r
+ }\r
+\r
+ private boolean isBound(ExternalReadEntry<?> entry) {\r
+ if(entry.hasParents()) return true;\r
+ else if(hasListener(entry)) return true;\r
+ else return false;\r
+ }\r
+\r
+ synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {\r
+\r
+ if (parent != null && !inferred) {\r
+ try {\r
+ if(!child.isImmutable(graph))\r
+ child.addParent(parent);\r
+ } catch (DatabaseException e) {\r
+ Logger.defaultLogError(e);\r
+ }\r
+ if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);\r
+ }\r
+\r
+ if (listener != null) {\r
+ return registerListener(child, listener, procedure);\r
+ } else {\r
+ return null;\r
+ }\r
+\r
+ }\r
+\r
+ public <Procedure> void performForEach(ReadGraphImpl graph, BinaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {\r
+\r
+ if (DebugPolicy.PERFORM)\r
+ System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
+\r
+ assert (!dirty);\r
+ assert (!collecting);\r
+\r
+ try {\r
+\r
+ registerDependencies(graph, query, parent, listener, procedure, false);\r
+\r
+ // FRESH, REFUTED, EXCEPTED go here \r
+ if (!query.isReady()) {\r
+\r
+ boolean fresh = query.isFresh();\r
+\r
+ if(fresh) {\r
+ size++;\r
+ }\r
+\r
+ query.computeForEach(graph, this, procedure, true);\r
+\r
+ misses++;\r
+\r
+ } else {\r
+\r
+ query.performFromCache(graph, this, procedure);\r
+\r
+ hits++;\r
+\r
+ }\r
+\r
+ } catch (Throwable t) {\r
+\r
+ Logger.defaultLogError(t);\r
+\r
+ }\r
+ }\r
+\r
+ public <Procedure> Object performForEach(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) {\r
+\r
+ if (DebugPolicy.PERFORM)\r
+ System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
+\r
+ assert (!dirty);\r
+ assert (!collecting);\r
+\r
+ try {\r
+\r
+ assert(query.assertNotDiscarded());\r
+\r
+ registerDependencies(graph, query, parent, listener, procedure, false);\r
+\r
+ // FRESH, REFUTED, EXCEPTED go here \r
+ if (!query.isReady()) {\r
+\r
+ size++;\r
+ misses++;\r
+\r
+ return query.computeForEach(graph, this, procedure, true);\r
+\r
+\r
+ } else {\r
+\r
+ hits++;\r
+\r
+ return query.performFromCache(graph, this, procedure);\r
+\r
+ }\r
+\r
+ } catch (Throwable t) {\r
+\r
+ Logger.defaultLogError(t);\r
+ return null;\r
+\r
+ } \r
+\r
+ }\r
+ \r
+ static class Dummy implements InternalProcedure<Object>, IntProcedure {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int i) {\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ }\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, Object result) {\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable throwable) {\r
+ }\r
+ \r
+ }\r
+ \r
+ private static final Dummy dummy = new Dummy();\r
+\r
+ public <Procedure> Object performForEach2(ReadGraphImpl graph, UnaryQuery<Procedure> query, CacheEntry parent, ListenerBase listener, Procedure procedure) throws Throwable {\r
+\r
+ if (DebugPolicy.PERFORM)\r
+ System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
+\r
+ assert (!dirty);\r
+ assert (!collecting);\r
+\r
+ assert(query.assertNotDiscarded());\r
+\r
+ registerDependencies(graph, query, parent, listener, procedure, false);\r
+\r
+ // FRESH, REFUTED, EXCEPTED go here \r
+ if (!query.isReady()) {\r
+\r
+ size++;\r
+ misses++;\r
+\r
+ query.computeForEach(graph, this, (Procedure)dummy, true);\r
+ return query.get(graph, this, null);\r
+\r
+ } else {\r
+\r
+ hits++;\r
+\r
+ return query.get(graph, this, procedure);\r
+\r
+ }\r
+\r
+ }\r
+ \r
+ public <Procedure> void performForEach(ReadGraphImpl graph, StringQuery<Procedure> query, CacheEntry parent, final ListenerBase listener, Procedure procedure) {\r
+\r
+ if (DebugPolicy.PERFORM)\r
+ System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query);\r
+\r
+ assert (!dirty);\r
+ assert (!collecting);\r
+\r
+ try {\r
+\r
+ if(query.isDiscarded()) {\r
+ System.err.println("aff");\r
+ }\r
+ assert(!query.isDiscarded());\r
+\r
+ // FRESH, REFUTED, EXCEPTED go here \r
+ if (!query.isReady()) {\r
+\r
+ query.computeForEach(graph.withAsyncParent(query), this, procedure);\r
+\r
+ size++;\r
+ misses++;\r
+\r
+ } else {\r
+\r
+ query.performFromCache(graph, this, procedure);\r
+\r
+ hits++;\r
+\r
+ }\r
+\r
+ assert (!query.isDiscarded());\r
+\r
+ registerDependencies(graph, query, parent, listener, procedure, false);\r
+\r
+ } catch (Throwable t) {\r
+\r
+ t.printStackTrace();\r
+ Logger.defaultLogError(t);\r
+\r
+ }\r
+\r
+ }\r
+\r
+ interface QueryCollectorSupport {\r
+ public CacheCollectionResult allCaches();\r
+ public Collection<CacheEntry> getRootList();\r
+ public int getCurrentSize();\r
+ public int calculateCurrentSize();\r
+ public CacheEntryBase iterate(int level);\r
+ public void remove();\r
+ public void setLevel(CacheEntryBase entry, int level);\r
+ public boolean start(boolean flush);\r
+ }\r
+\r
+ interface QueryCollector {\r
+\r
+ public void collect(int youngTarget, int allowedTimeInMs);\r
+\r
+ }\r
+\r
+ class QueryCollectorSupportImpl implements QueryCollectorSupport {\r
+\r
+ private static final boolean DEBUG = false;\r
+ private static final double ITERATION_RATIO = 0.2;\r
+ \r
+ private CacheCollectionResult iteration = new CacheCollectionResult();\r
+ private boolean fresh = true;\r
+ private boolean needDataInStart = true;\r
+ \r
+ QueryCollectorSupportImpl() {\r
+ iteration.restart();\r
+ }\r
+\r
+ public CacheCollectionResult allCaches() {\r
+ CacheCollectionResult result = new CacheCollectionResult();\r
+ QueryProcessor.this.allCaches(result);\r
+ result.restart();\r
+ return result;\r
+ }\r
+ \r
+ public boolean start(boolean flush) {\r
+ // We need new data from query maps\r
+ fresh = true;\r
+ if(needDataInStart || flush) {\r
+ // Last run ended after processing all queries => refresh data\r
+ restart(flush ? 0.0 : ITERATION_RATIO);\r
+ } else {\r
+ // continue with previous big data\r
+ }\r
+ // Notify caller about iteration situation\r
+ return iteration.isAtStart();\r
+ }\r
+\r
+ private void restart(double targetRatio) {\r
+ \r
+ needDataInStart = true;\r
+\r
+ long start = System.nanoTime();\r
+ if(fresh) {\r
+ \r
+ // We need new data from query maps\r
+ \r
+ int iterationSize = iteration.size()+1;\r
+ int diff = calculateCurrentSize()-iterationSize;\r
+ \r
+ double ratio = (double)diff / (double)iterationSize;\r
+ boolean dirty = Math.abs(ratio) >= targetRatio;\r
+ \r
+ if(dirty) {\r
+ iteration = allCaches();\r
+ if(DEBUG) {\r
+ System.err.print("iterate: allCaches in " + 1e-9*(System.nanoTime()-start) + "s. (" + iteration.size() + ") ");\r
+ for(int i=0;i<CacheCollectionResult.LEVELS;i++)\r
+ System.err.print(" " + iteration.levels[i].size());\r
+ System.err.println("");\r
+ }\r
+ } else {\r
+ iteration.restart();\r
+ }\r
+ \r
+ fresh = false;\r
+ needDataInStart = false;\r
+ } else {\r
+ // We are returning here within the same GC round - reuse the cache table\r
+ iteration.restart();\r
+ }\r
+ \r
+ return;\r
+ \r
+ }\r
+ \r
+ @Override\r
+ public CacheEntryBase iterate(int level) {\r
+ \r
+ CacheEntryBase entry = iteration.next(level);\r
+ if(entry == null) {\r
+ restart(ITERATION_RATIO);\r
+ return null;\r
+ }\r
+ \r
+ while(entry != null && entry.isDiscarded()) {\r
+ entry = iteration.next(level);\r
+ }\r
+ \r
+ return entry;\r
+ \r
+ }\r
+ \r
+ @Override\r
+ public void remove() {\r
+ iteration.remove();\r
+ }\r
+ \r
+ @Override\r
+ public void setLevel(CacheEntryBase entry, int level) {\r
+ iteration.setLevel(entry, level);\r
+ }\r
+\r
+ public Collection<CacheEntry> getRootList() {\r
+\r
+ ArrayList<CacheEntry> result = new ArrayList<CacheEntry>();\r
+\r
+ for (Object e : valueMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+ for (Object e : directPredicatesMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+ for (Object e : objectsMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+ for (Object e : directObjectsMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+ for (Object e : principalTypesMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+ for (Object e : superRelationsMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+ for (Object e : superTypesMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+ for (Object e : typesMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+ for (Object e : objectsMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+ for (Object e : assertedStatementsMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+ for (Object e : readMap.values()) {\r
+ if(e instanceof CacheEntry) {\r
+ result.add((CacheEntry) e);\r
+ } else {\r
+ System.err.println("e=" + e);\r
+ }\r
+ }\r
+ for (Object e : asyncReadMap.values()) {\r
+ if(e instanceof CacheEntry) {\r
+ result.add((CacheEntry) e);\r
+ } else {\r
+ System.err.println("e=" + e);\r
+ }\r
+ }\r
+ for (Object e : externalReadMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+ for (Object e : orderedSetMap.values()) {\r
+ result.add((CacheEntry) e);\r
+ }\r
+\r
+ return result;\r
+\r
+ }\r
+\r
+ @Override\r
+ public int calculateCurrentSize() {\r
+ \r
+ int realSize = 0;\r
+ \r
+ realSize += directPredicatesMap.size();\r
+ realSize += principalTypesMap.size();\r
+ realSize += uriToResourceMap.size();\r
+ realSize += namespaceIndexMap22.size();\r
+ realSize += projectsMap.size();\r
+ \r
+ realSize += relationInfoMap.size();\r
+ realSize += superTypesMap.size();\r
+ realSize += typeHierarchyMap.size();\r
+ realSize += superRelationsMap.size();\r
+ realSize += typesMap.size();\r
+ \r
+ realSize += valueMap.size();\r
+ realSize += directObjectsMap.size();\r
+ realSize += objectsMap.size();\r
+ realSize += orderedSetMap.size();\r
+ realSize += predicatesMap.size();\r
+ \r
+ realSize += statementsMap.size();\r
+ realSize += assertedPredicatesMap.size();\r
+ realSize += assertedStatementsMap.size();\r
+ realSize += externalReadMap.size();\r
+ realSize += asyncReadMap.size();\r
+ \r
+ realSize += readMap.size();\r
+ realSize += asyncMultiReadMap.size();\r
+ realSize += multiReadMap.size();\r
+ \r
+ size = realSize;\r
+ \r
+ return realSize;\r
+ \r
+ }\r
+\r
+ @Override\r
+ public int getCurrentSize() {\r
+ return size;\r
+ }\r
+\r
+ }\r
+ // final private static int MINIMUM_SIZE = (int)(Runtime.getRuntime().maxMemory() / 600);\r
+\r
+ private QueryCollectorSupport collectorSupport = new QueryCollectorSupportImpl();\r
+ private QueryCollector collector = new QueryCollectorImpl(this, collectorSupport);\r
+\r
+ public int querySize() {\r
+ return size;\r
+ }\r
+\r
+ public void gc(int youngTarget, int allowedTimeInMs) {\r
+\r
+ collector.collect(youngTarget, allowedTimeInMs);\r
+\r
+ }\r
+\r
+ public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {\r
+\r
+ assert (entry != null);\r
+\r
+ if (base.isDisposed())\r
+ return null;\r
+\r
+ return addListener(entry, base, procedure);\r
+\r
+ }\r
+\r
+ private void primeListenerEntry(final ListenerEntry entry, final Object result) {\r
+ entry.setLastKnown(result);\r
+ }\r
+\r
+ private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {\r
+\r
+ assert (entry != null);\r
+ assert (procedure != null);\r
+\r
+ ArrayList<ListenerEntry> list = listeners.get(entry);\r
+ if (list == null) {\r
+ list = new ArrayList<ListenerEntry>(1);\r
+ listeners.put(entry, list);\r
+ }\r
+\r
+ ListenerEntry result = new ListenerEntry(entry, base, procedure);\r
+ int currentIndex = list.indexOf(result);\r
+ // There was already a listener\r
+ if(currentIndex > -1) {\r
+ ListenerEntry current = list.get(currentIndex);\r
+ if(!current.base.isDisposed()) return null;\r
+ list.set(currentIndex, result);\r
+ } else {\r
+ list.add(result);\r
+ }\r
+\r
+ if(DebugPolicy.LISTENER) {\r
+ new Exception().printStackTrace();\r
+ System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);\r
+ }\r
+\r
+ return result;\r
+\r
+ }\r
+\r
+ private void scheduleListener(ListenerEntry entry) {\r
+ assert (entry != null);\r
+ if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);\r
+ scheduledListeners.add(entry);\r
+ }\r
+\r
+ private void removeListener(ListenerEntry entry) {\r
+ assert (entry != null);\r
+ ArrayList<ListenerEntry> list = listeners.get(entry.entry);\r
+ if(list == null) return;\r
+ boolean success = list.remove(entry);\r
+ assert (success);\r
+ if (list.isEmpty())\r
+ listeners.remove(entry.entry);\r
+ }\r
+\r
+ private boolean hasListener(CacheEntry entry) {\r
+ if(listeners.get(entry) != null) return true;\r
+ return false;\r
+ }\r
+\r
+ boolean hasListenerAfterDisposing(CacheEntry entry) {\r
+ if(listeners.get(entry) != null) {\r
+ ArrayList<ListenerEntry> entries = listeners.get(entry);\r
+ ArrayList<ListenerEntry> list = null;\r
+ for (ListenerEntry e : entries) {\r
+ if (e.base.isDisposed()) {\r
+ if(list == null) list = new ArrayList<ListenerEntry>();\r
+ list.add(e);\r
+ }\r
+ }\r
+ if(list != null) {\r
+ for (ListenerEntry e : list) {\r
+ entries.remove(e);\r
+ }\r
+ }\r
+ if (entries.isEmpty()) {\r
+ listeners.remove(entry);\r
+ return false;\r
+ }\r
+ return true;\r
+ }\r
+ return false;\r
+ }\r
+\r
+ List<ListenerEntry> getListenerEntries(CacheEntry entry) {\r
+ hasListenerAfterDisposing(entry);\r
+ if(listeners.get(entry) != null)\r
+ return listeners.get(entry);\r
+ else \r
+ return Collections.emptyList();\r
+ }\r
+\r
+ void processListenerReport(CacheEntry entry, Map<CacheEntry, Set<ListenerBase>> workarea) {\r
+\r
+ if(!workarea.containsKey(entry)) {\r
+\r
+ HashSet<ListenerBase> ls = new HashSet<ListenerBase>();\r
+ for(ListenerEntry e : getListenerEntries(entry))\r
+ ls.add(e.base);\r
+\r
+ workarea.put(entry, ls);\r
+\r
+ for(CacheEntry parent : entry.getParents(this)) {\r
+ processListenerReport(parent, workarea);\r
+ ls.addAll(workarea.get(parent));\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+\r
+ public synchronized ListenerReport getListenerReport() throws IOException {\r
+\r
+ class ListenerReportImpl implements ListenerReport {\r
+\r
+ Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();\r
+\r
+ @Override\r
+ public void print(PrintStream b) {\r
+ Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();\r
+ for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {\r
+ for(ListenerBase l : e.getValue()) {\r
+ Integer i = hist.get(l);\r
+ hist.put(l, i != null ? i-1 : -1);\r
+ }\r
+ }\r
+\r
+ for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {\r
+ b.print("" + -p.second + " " + p.first + "\n");\r
+ }\r
+\r
+ b.flush();\r
+ }\r
+\r
+ }\r
+\r
+ ListenerReportImpl result = new ListenerReportImpl();\r
+\r
+ Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();\r
+ for(CacheEntryBase entry : all) {\r
+ hasListenerAfterDisposing(entry);\r
+ }\r
+ for(CacheEntryBase entry : all) {\r
+ processListenerReport(entry, result.workarea);\r
+ }\r
+\r
+ return result;\r
+\r
+ }\r
+\r
+ public synchronized String reportListeners(File file) throws IOException {\r
+\r
+ if (!isAlive())\r
+ return "Disposed!";\r
+\r
+ PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));\r
+ ListenerReport report = getListenerReport();\r
+ report.print(b);\r
+\r
+ return "Done reporting listeners.";\r
+\r
+ }\r
+\r
+ void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {\r
+\r
+ if(entry.isDiscarded()) return;\r
+ if(workarea.containsKey(entry)) return;\r
+ \r
+ Iterable<CacheEntry> parents = entry.getParents(this);\r
+ HashSet<CacheEntry> ps = new HashSet<CacheEntry>();\r
+ for(CacheEntry e : parents) {\r
+ if(e.isDiscarded()) continue;\r
+ ps.add(e);\r
+ processParentReport(e, workarea);\r
+ }\r
+ workarea.put(entry, ps);\r
+\r
+ }\r
+\r
+ public synchronized String reportQueryActivity(File file) throws IOException {\r
+ \r
+ System.err.println("reportQueries " + file.getAbsolutePath());\r
+\r
+ if (!isAlive())\r
+ return "Disposed!";\r
+\r
+ PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));\r
+\r
+ List<Pair<String,Integer>> entries = CollectionUtils.valueSortedEntries(Development.histogram);\r
+ Collections.reverse(entries);\r
+ \r
+ for(Pair<String,Integer> entry : entries) {\r
+ b.println(entry.first + ": " + entry.second);\r
+ }\r
+\r
+ b.close();\r
+ \r
+ Development.histogram.clear();\r
+\r
+ return "OK";\r
+\r
+ }\r
+ \r
+ public synchronized String reportQueries(File file) throws IOException {\r
+\r
+ System.err.println("reportQueries " + file.getAbsolutePath());\r
+\r
+ if (!isAlive())\r
+ return "Disposed!";\r
+\r
+ PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));\r
+\r
+ long start = System.nanoTime();\r
+\r
+// ArrayList<CacheEntry> all = ;\r
+ \r
+ Map<CacheEntry, Set<CacheEntry>> workarea = new HashMap<CacheEntry, Set<CacheEntry>>();\r
+ Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();\r
+ for(CacheEntryBase entry : caches) {\r
+ processParentReport(entry, workarea);\r
+ }\r
+ \r
+ // for(CacheEntry e : all) System.err.println("entry: " + e);\r
+\r
+ long duration = System.nanoTime() - start;\r
+ System.err.println("Query root set in " + 1e-9*duration + "s.");\r
+\r
+ start = System.nanoTime();\r
+\r
+ HashMap<CacheEntry, Integer> flagMap = new HashMap<CacheEntry, Integer>(); \r
+\r
+ int listeners = 0;\r
+\r
+ for(CacheEntry entry : workarea.keySet()) {\r
+ boolean listener = hasListenerAfterDisposing(entry);\r
+ boolean hasParents = entry.getParents(this).iterator().hasNext();\r
+ if(listener) {\r
+ // Bound\r
+ flagMap.put(entry, 0);\r
+ } else if (!hasParents) {\r
+ // Unbound\r
+ flagMap.put(entry, 1);\r
+ } else {\r
+ // Unknown\r
+ flagMap.put(entry, 2);\r
+ }\r
+ // // Write leaf bit\r
+ // entry.flags |= 4;\r
+ }\r
+\r
+ boolean done = true;\r
+ int loops = 0;\r
+\r
+ do {\r
+\r
+ done = true;\r
+\r
+ long start2 = System.nanoTime();\r
+\r
+ int boundCounter = 0;\r
+ int unboundCounter = 0;\r
+ int unknownCounter = 0;\r
+\r
+ for(CacheEntry entry : workarea.keySet()) {\r
+\r
+ //System.err.println("process " + entry);\r
+\r
+ int flags = flagMap.get(entry);\r
+ int bindStatus = flags & 3;\r
+\r
+ if(bindStatus == 0) boundCounter++;\r
+ else if(bindStatus == 1) unboundCounter++;\r
+ else if(bindStatus == 2) unknownCounter++;\r
+\r
+ if(bindStatus < 2) continue;\r
+\r
+ int newStatus = 1;\r
+ for(CacheEntry parent : entry.getParents(this)) {\r
+\r
+ if(parent.isDiscarded()) flagMap.put(parent, 1);\r
+\r
+ int flags2 = flagMap.get(parent);\r
+ int bindStatus2 = flags2 & 3;\r
+ // Parent is bound => child is bound\r
+ if(bindStatus2 == 0) {\r
+ newStatus = 0;\r
+ break;\r
+ }\r
+ // Parent is unknown => child is unknown\r
+ else if (bindStatus2 == 2) {\r
+ newStatus = 2;\r
+ done = false;\r
+ break;\r
+ }\r
+ }\r
+\r
+ flagMap.put(entry, newStatus);\r
+\r
+ }\r
+\r
+ duration = System.nanoTime() - start2;\r
+ System.err.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");\r
+ b.println("Query analysis pass (" + boundCounter + "/" + unboundCounter + "/" + unknownCounter + ") in "+ 1e-9*duration + "s.");\r
+\r
+ } while(!done && loops++ < 20);\r
+\r
+ if(loops >= 20) {\r
+\r
+ for(CacheEntry entry : workarea.keySet()) {\r
+\r
+ int bindStatus = flagMap.get(entry);\r
+ if(bindStatus == 2) System.err.println("Undefined bind status for " + entry);\r
+\r
+ }\r
+\r
+ }\r
+\r
+ duration = System.nanoTime() - start;\r
+ System.err.println("Query analysis in " + 1e-9*duration + "s.");\r
+\r
+ Map<Class<?>, Integer> counts = new HashMap<Class<?>, Integer>();\r
+\r
+ for(CacheEntry entry : workarea.keySet()) {\r
+ Class<?> clazz = entry.getClass();\r
+ if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass(); \r
+ else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass(); \r
+ else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass(); \r
+ else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass(); \r
+ else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass(); \r
+ Integer c = counts.get(clazz);\r
+ if(c == null) counts.put(clazz, -1);\r
+ else counts.put(clazz, c-1);\r
+ }\r
+\r
+ b.print("// Simantics DB client query report file\n");\r
+ b.print("// This file contains the following information\n");\r
+ b.print("// -The amount of cached query instances per query class\n");\r
+ b.print("// -The sizes of retained child sets\n");\r
+ b.print("// -List of parents for each query (search for 'P <query name>')\n");\r
+ b.print("// -Followed by status, where\n");\r
+ b.print("// -0=bound\n");\r
+ b.print("// -1=free\n");\r
+ b.print("// -2=unknown\n");\r
+ b.print("// -L=has listener\n");\r
+ b.print("// -List of children for each query (search for 'C <query name>')\n");\r
+\r
+ b.print("----------------------------------------\n");\r
+\r
+ b.print("// Queries by class\n");\r
+ for(Pair<Class<?>, Integer> p : CollectionUtils.valueSortedEntries(counts)) {\r
+ b.print(-p.second + " " + p.first.getName() + "\n");\r
+ }\r
+\r
+ Map<CacheEntry, Integer> hist = new HashMap<CacheEntry, Integer>();\r
+ for(CacheEntry e : workarea.keySet())\r
+ hist.put(e, -1);\r
+ \r
+ boolean changed = true;\r
+ int iter = 0;\r
+ while(changed && iter++<50) {\r
+ \r
+ changed = false;\r
+ \r
+ Map<CacheEntry, Integer> newHist = new HashMap<CacheEntry, Integer>();\r
+ for(CacheEntry e : workarea.keySet())\r
+ newHist.put(e, -1);\r
+\r
+ for(Map.Entry<CacheEntry, Set<CacheEntry>> e : workarea.entrySet()) {\r
+ Integer c = hist.get(e.getKey());\r
+ for(CacheEntry p : e.getValue()) {\r
+ Integer i = newHist.get(p);\r
+ newHist.put(p, i+c);\r
+ }\r
+ }\r
+ for(CacheEntry e : workarea.keySet()) {\r
+ Integer value = newHist.get(e);\r
+ Integer old = hist.get(e);\r
+ if(!value.equals(old)) {\r
+ hist.put(e, value);\r
+// System.err.println("hist " + e + ": " + old + " => " + value);\r
+ changed = true;\r
+ }\r
+ }\r
+ \r
+ System.err.println("Retained set iteration " + iter);\r
+\r
+ }\r
+\r
+ b.print("// Queries by retained set\n");\r
+ for(Pair<CacheEntry, Integer> p : CollectionUtils.valueSortedEntries(hist)) {\r
+ b.print("" + -p.second + " " + p.first + "\n");\r
+ }\r
+\r
+ HashMap<CacheEntry, Collection<CacheEntry>> inverse = new HashMap<CacheEntry, Collection<CacheEntry>>();\r
+\r
+ b.print("// Entry parent listing\n");\r
+ for(CacheEntry entry : workarea.keySet()) {\r
+ int status = flagMap.get(entry);\r
+ boolean hasListener = hasListenerAfterDisposing(entry);\r
+ b.print("Q " + entry.toString());\r
+ if(hasListener) {\r
+ b.print(" (L" + status + ")");\r
+ listeners++;\r
+ } else {\r
+ b.print(" (" + status + ")");\r
+ }\r
+ b.print("\n");\r
+ for(CacheEntry parent : workarea.get(entry)) {\r
+ Collection<CacheEntry> inv = inverse.get(parent);\r
+ if(inv == null) {\r
+ inv = new ArrayList<CacheEntry>();\r
+ inverse.put(parent, inv);\r
+ }\r
+ inv.add(entry);\r
+ b.print(" " + parent.toString());\r
+ b.print("\n");\r
+ }\r
+ }\r
+\r
+ b.print("// Entry child listing\n");\r
+ for(Map.Entry<CacheEntry, Collection<CacheEntry>> entry : inverse.entrySet()) {\r
+ b.print("C " + entry.getKey().toString());\r
+ b.print("\n");\r
+ for(CacheEntry child : entry.getValue()) {\r
+ Integer h = hist.get(child);\r
+ if(h != null) {\r
+ b.print(" " + h);\r
+ } else {\r
+ b.print(" <no children>");\r
+ }\r
+ b.print(" " + child.toString());\r
+ b.print("\n");\r
+ }\r
+ }\r
+\r
+ b.print("#queries: " + workarea.keySet().size() + "\n");\r
+ b.print("#listeners: " + listeners + "\n");\r
+\r
+ b.close();\r
+\r
+ return "Dumped " + workarea.keySet().size() + " queries.";\r
+\r
+ }\r
+\r
+ class UpdateEntry {\r
+\r
+ public CacheEntry caller;\r
+\r
+ public CacheEntry entry;\r
+\r
+ public int indent;\r
+\r
+ public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {\r
+ this.caller = caller;\r
+ this.entry = entry;\r
+ this.indent = indent;\r
+ }\r
+\r
+ };\r
+\r
+ boolean removeQuery(CacheEntry entry) {\r
+\r
+ // This entry has been removed before. No need to do anything here.\r
+ if(entry.isDiscarded()) return false;\r
+\r
+ assert (!entry.isDiscarded());\r
+\r
+ Query query = entry.getQuery();\r
+\r
+ query.removeEntry(this);\r
+\r
+ updates++;\r
+ size--;\r
+\r
+ if((entry.getGCStatus() & CacheEntry.HAS_BEEN_BOUND) != 0)\r
+ boundQueries--;\r
+ \r
+ entry.discard();\r
+\r
+ return true;\r
+\r
+ }\r
+\r
+ /**\r
+ * \r
+ * @return true if this entry is being listened\r
+ */\r
+ private boolean updateQuery(UpdateEntry e, LinkedList<UpdateEntry> todo, IdentityHashMap<CacheEntry, CacheEntry> immediates) throws DatabaseException {\r
+\r
+ assert (e != null);\r
+\r
+ CacheEntry entry = e.entry;\r
+\r
+// System.err.println("updateQuery " + entry);\r
+ \r
+ /*\r
+ * If the dependency graph forms a DAG, some entries are inserted in the\r
+ * todo list many times. They only need to be processed once though.\r
+ */\r
+ if (entry.isDiscarded()) {\r
+ if (Development.DEVELOPMENT) {\r
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
+ System.out.print("D");\r
+ for (int i = 0; i < e.indent; i++)\r
+ System.out.print(" ");\r
+ System.out.println(entry.getQuery());\r
+ }\r
+ }\r
+// System.err.println(" => DISCARDED");\r
+ return false;\r
+ }\r
+\r
+ if (entry.isRefuted()) {\r
+ if (Development.DEVELOPMENT) {\r
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
+ System.out.print("R");\r
+ for (int i = 0; i < e.indent; i++)\r
+ System.out.print(" ");\r
+ System.out.println(entry.getQuery());\r
+ }\r
+ }\r
+ return false;\r
+ }\r
+\r
+ if (entry.isExcepted()) {\r
+ if (Development.DEVELOPMENT) {\r
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
+ System.out.print("E");\r
+ }\r
+ }\r
+ }\r
+\r
+ if (entry.isPending()) {\r
+ if (Development.DEVELOPMENT) {\r
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
+ System.out.print("P");\r
+ }\r
+ }\r
+ }\r
+\r
+ updates++;\r
+\r
+ if (Development.DEVELOPMENT) {\r
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
+ System.out.print("U ");\r
+ for (int i = 0; i < e.indent; i++)\r
+ System.out.print(" ");\r
+ System.out.print(entry.getQuery());\r
+ }\r
+ }\r
+\r
+ Query query = entry.getQuery();\r
+ int type = query.type();\r
+\r
+ boolean hasListener = hasListener(entry); \r
+\r
+ if (Development.DEVELOPMENT) {\r
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
+ if(hasListener(entry)) {\r
+ System.out.println(" (L)");\r
+ } else {\r
+ System.out.println("");\r
+ }\r
+ }\r
+ }\r
+\r
+ if(entry.isPending() || entry.isExcepted()) {\r
+\r
+ // If updated\r
+ if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {\r
+\r
+ immediates.put(entry, entry);\r
+\r
+ } else {\r
+\r
+ if(hasListener) {\r
+ entry.refute();\r
+ } else {\r
+ removeQuery(entry);\r
+ }\r
+\r
+ }\r
+\r
+ } else {\r
+\r
+ // If updated\r
+ if ((type & RequestFlags.UPDATE_MASK) == RequestFlags.IMMEDIATE_UPDATE) {\r
+\r
+ immediates.put(entry, entry);\r
+\r
+ } else {\r
+\r
+ if(hasListener) {\r
+ entry.refute();\r
+ } else {\r
+ removeQuery(entry);\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+\r
+// System.err.println(" => FOO " + type);\r
+\r
+ if (hasListener) {\r
+ ArrayList<ListenerEntry> entries = listeners.get(entry);\r
+ if(entries != null) {\r
+ for (ListenerEntry le : entries) {\r
+ scheduleListener(le);\r
+ }\r
+ }\r
+ }\r
+\r
+ // If invalid, update parents\r
+ if (type == RequestFlags.INVALIDATE) {\r
+ updateParents(e.indent, entry, todo);\r
+ }\r
+\r
+ return hasListener;\r
+\r
+ }\r
+\r
+ private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {\r
+\r
+ Iterable<CacheEntry> oldParents = entry.getParents(this);\r
+ for (CacheEntry parent : oldParents) {\r
+// System.err.println("updateParents " + entry + " => " + parent);\r
+ if(!parent.isDiscarded())\r
+ todo.push(new UpdateEntry(entry, parent, indent + 2));\r
+ }\r
+\r
+ }\r
+\r
+ private boolean pruneListener(ListenerEntry entry) {\r
+ if (entry.base.isDisposed()) {\r
+ removeListener(entry);\r
+ return true;\r
+ } else {\r
+ return false;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * @param av1 an array (guaranteed)\r
+ * @param av2 any object\r
+ * @return <code>true</code> if the two arrays are equal\r
+ */\r
+ private final boolean arrayEquals(Object av1, Object av2) {\r
+ if (av2 == null)\r
+ return false;\r
+ Class<?> c1 = av1.getClass().getComponentType();\r
+ Class<?> c2 = av2.getClass().getComponentType();\r
+ if (c2 == null || !c1.equals(c2))\r
+ return false;\r
+ boolean p1 = c1.isPrimitive();\r
+ boolean p2 = c2.isPrimitive();\r
+ if (p1 != p2)\r
+ return false;\r
+ if (!p1)\r
+ return Arrays.equals((Object[]) av1, (Object[]) av2);\r
+ if (boolean.class.equals(c1))\r
+ return Arrays.equals((boolean[]) av1, (boolean[]) av2);\r
+ else if (byte.class.equals(c1))\r
+ return Arrays.equals((byte[]) av1, (byte[]) av2);\r
+ else if (int.class.equals(c1))\r
+ return Arrays.equals((int[]) av1, (int[]) av2);\r
+ else if (long.class.equals(c1))\r
+ return Arrays.equals((long[]) av1, (long[]) av2);\r
+ else if (float.class.equals(c1))\r
+ return Arrays.equals((float[]) av1, (float[]) av2);\r
+ else if (double.class.equals(c1))\r
+ return Arrays.equals((double[]) av1, (double[]) av2);\r
+ throw new RuntimeException("??? Contact application querySupport.");\r
+ }\r
+\r
+\r
+\r
+ final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {\r
+\r
+ try {\r
+\r
+ Query query = entry.getQuery();\r
+\r
+ if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);\r
+\r
+ entry.prepareRecompute(querySupport);\r
+ \r
+ ReadGraphImpl parentGraph = graph.withParent(entry);\r
+\r
+ query.recompute(parentGraph, this, entry);\r
+\r
+ if(entry.isExcepted()) return ListenerEntry.NO_VALUE;\r
+\r
+ Object newValue = entry.getResult();\r
+\r
+ if (ListenerEntry.NO_VALUE == oldValue) {\r
+ if(DebugPolicy.CHANGES) {\r
+ System.out.println("C " + query);\r
+ System.out.println("- " + oldValue);\r
+ System.out.println("- " + newValue);\r
+ }\r
+ return newValue;\r
+ }\r
+\r
+ boolean changed = false;\r
+\r
+ if (newValue != null) {\r
+ if (newValue.getClass().isArray()) {\r
+ changed = !arrayEquals(newValue, oldValue);\r
+ } else {\r
+ changed = !newValue.equals(oldValue);\r
+ }\r
+ } else\r
+ changed = (oldValue != null);\r
+\r
+ if(DebugPolicy.CHANGES && changed) {\r
+ System.out.println("C " + query);\r
+ System.out.println("- " + oldValue);\r
+ System.out.println("- " + newValue);\r
+ }\r
+\r
+ return changed ? newValue : ListenerEntry.NOT_CHANGED;\r
+\r
+ } catch (Throwable t) {\r
+\r
+ Logger.defaultLogError(t);\r
+ entry.except(t);\r
+ return ListenerEntry.NO_VALUE;\r
+\r
+ }\r
+\r
+ }\r
+\r
+ public boolean hasScheduledUpdates() {\r
+ return !scheduledListeners.isEmpty();\r
+ }\r
+\r
+ public void performScheduledUpdates(WriteGraphImpl graph) {\r
+\r
+ assert (!updating);\r
+ assert (!collecting);\r
+ assert (!firingListeners);\r
+\r
+ firingListeners = true;\r
+\r
+ try {\r
+\r
+ // Performing may cause further events to be scheduled.\r
+ while (!scheduledListeners.isEmpty()) {\r
+\r
+// graph.restart();\r
+// graph.state.barrier.inc();\r
+\r
+ // Clone current events to make new entries possible during\r
+ // firing.\r
+ THashSet<ListenerEntry> entries = scheduledListeners;\r
+ scheduledListeners = new THashSet<ListenerEntry>();\r
+\r
+ ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();\r
+\r
+ for (ListenerEntry listenerEntry : entries) {\r
+\r
+ if (pruneListener(listenerEntry)) {\r
+ if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);\r
+ continue;\r
+ }\r
+\r
+ final CacheEntry entry = listenerEntry.entry;\r
+ assert (entry != null);\r
+\r
+ Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());\r
+\r
+ if (newValue != ListenerEntry.NOT_CHANGED) {\r
+ if(DebugPolicy.LISTENER)\r
+ System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);\r
+ schedule.add(listenerEntry);\r
+ listenerEntry.setLastKnown(entry.getResult());\r
+ }\r
+\r
+ }\r
+\r
+ for(ListenerEntry listenerEntry : schedule) {\r
+ final CacheEntry entry = listenerEntry.entry;\r
+ if(DebugPolicy.LISTENER)\r
+ System.out.println("Firing " + listenerEntry.procedure);\r
+ try {\r
+ if(DebugPolicy.LISTENER)\r
+ System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);\r
+ entry.performFromCache(graph, this, listenerEntry.procedure);\r
+ } catch (Throwable t) {\r
+ t.printStackTrace();\r
+ }\r
+ }\r
+\r
+// graph.state.barrier.dec();\r
+// graph.waitAsync(null);\r
+// graph.state.barrier.assertReady();\r
+\r
+ }\r
+\r
+ } finally {\r
+ firingListeners = false;\r
+ }\r
+\r
+ }\r
+\r
+ /**\r
+ * \r
+ * @return true if this entry still has listeners\r
+ */\r
+ public boolean update(final ReadGraphImpl graph, final CacheEntry entry) {\r
+\r
+ assert (!collecting);\r
+ assert (!updating);\r
+ updating = true;\r
+\r
+ boolean hadListeners = false;\r
+ boolean listenersUnknown = false;\r
+\r
+ try {\r
+\r
+ assert(entry != null);\r
+ LinkedList<UpdateEntry> todo = new LinkedList<UpdateEntry>();\r
+ IdentityHashMap<CacheEntry, CacheEntry> immediates = new IdentityHashMap<CacheEntry, CacheEntry>();\r
+ todo.add(new UpdateEntry(null, entry, 0));\r
+\r
+ while(true) {\r
+\r
+ // Walk the tree and collect immediate updates\r
+ while (!todo.isEmpty()) {\r
+ UpdateEntry e = todo.pop();\r
+ hadListeners |= updateQuery(e, todo, immediates);\r
+ }\r
+\r
+ if(immediates.isEmpty()) break;\r
+\r
+ // Evaluate all immediate updates and collect parents to update\r
+ for(CacheEntry immediate : immediates.values()) {\r
+\r
+ if(immediate.isDiscarded()) {\r
+ continue;\r
+ }\r
+\r
+ if(immediate.isExcepted()) {\r
+\r
+ Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);\r
+ if (newValue != ListenerEntry.NOT_CHANGED)\r
+ updateParents(0, immediate, todo);\r
+\r
+ } else {\r
+\r
+ Object oldValue = immediate.getResult();\r
+ Object newValue = compareTo(graph, immediate, oldValue);\r
+\r
+ if (newValue != ListenerEntry.NOT_CHANGED) {\r
+ updateParents(0, immediate, todo);\r
+ } else {\r
+ // If not changed, keep the old value\r
+ immediate.setResult(oldValue);\r
+ listenersUnknown = true;\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+ immediates.clear();\r
+\r
+ }\r
+\r
+ } catch (Throwable t) {\r
+ Logger.defaultLogError(t);\r
+ }\r
+\r
+ assert (updating);\r
+ updating = false;\r
+\r
+ return hadListeners | listenersUnknown;\r
+\r
+ }\r
+\r
+ volatile public boolean dirty = false;\r
+\r
+ private ObjectUpdateSet scheduledObjectUpdates = new ObjectUpdateSet();\r
+ private ValueUpdateSet scheduledValueUpdates = new ValueUpdateSet();\r
+ private ValueUpdateSet scheduledInvalidates = new ValueUpdateSet();\r
+ // Maybe use a mutex from util.concurrent?\r
+ private Object primitiveUpdateLock = new Object();\r
+ private THashSet scheduledPrimitiveUpdates = new THashSet();\r
+\r
+ public void performDirtyUpdates(final ReadGraphImpl graph) {\r
+\r
+ dirty = false;\r
+ lastInvalidate = 0;\r
+\r
+ if (Development.DEVELOPMENT) {\r
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
+ System.err.println("== Query update ==");\r
+ }\r
+ }\r
+\r
+ // Special case - one statement\r
+ if(scheduledObjectUpdates.size() == 1 && scheduledValueUpdates.size() == 0 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {\r
+\r
+ long arg0 = scheduledObjectUpdates.getFirst();\r
+\r
+ final int subject = (int)(arg0 >>> 32);\r
+ final int predicate = (int)(arg0 & 0xffffffff);\r
+\r
+ for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);\r
+ for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);\r
+ for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);\r
+\r
+ if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {\r
+ PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);\r
+ if(principalTypes != null) update(graph, principalTypes);\r
+ Types types = Types.entry(QueryProcessor.this, subject);\r
+ if(types != null) update(graph, types);\r
+ }\r
+\r
+ if(predicate == subrelationOf) {\r
+ SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);\r
+ if(superRelations != null) update(graph, superRelations);\r
+ }\r
+\r
+ DirectPredicates dp = DirectPredicates.entry(QueryProcessor.this, subject);\r
+ if(dp != null) update(graph, dp);\r
+ OrderedSet os = OrderedSet.entry(QueryProcessor.this, predicate);\r
+ if(os != null) update(graph, os);\r
+\r
+ scheduledObjectUpdates.clear();\r
+ return;\r
+\r
+ }\r
+\r
+ // Special case - one value\r
+ if(scheduledObjectUpdates.size() == 0 && scheduledValueUpdates.size() == 1 && scheduledPrimitiveUpdates.size() == 0 && scheduledInvalidates.size() == 0) {\r
+\r
+ int arg0 = scheduledValueUpdates.getFirst();\r
+\r
+ ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);\r
+ if(valueQuery != null) update(graph, valueQuery);\r
+\r
+ scheduledValueUpdates.clear();\r
+ return;\r
+\r
+ }\r
+\r
+ final TIntHashSet predicates = new TIntHashSet();\r
+ final TIntHashSet orderedSets = new TIntHashSet();\r
+\r
+ THashSet primitiveUpdates;\r
+ synchronized (primitiveUpdateLock) {\r
+ primitiveUpdates = scheduledPrimitiveUpdates;\r
+ scheduledPrimitiveUpdates = new THashSet();\r
+ }\r
+\r
+ primitiveUpdates.forEach(new TObjectProcedure() {\r
+\r
+ @Override\r
+ public boolean execute(Object arg0) {\r
+\r
+ ExternalReadEntry query = (ExternalReadEntry)externalReadMap.get(arg0);\r
+ if (query != null) {\r
+ boolean listening = update(graph, query);\r
+ if (!listening && !query.hasParents()) {\r
+ externalReadMap.remove(arg0);\r
+ query.discard();\r
+ }\r
+ }\r
+ return true;\r
+ }\r
+\r
+ });\r
+\r
+ scheduledValueUpdates.forEach(new TIntProcedure() {\r
+\r
+ @Override\r
+ public boolean execute(int arg0) {\r
+ ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, arg0);\r
+ if(valueQuery != null) update(graph, valueQuery);\r
+ return true;\r
+ }\r
+\r
+ });\r
+\r
+ scheduledInvalidates.forEach(new TIntProcedure() {\r
+\r
+ @Override\r
+ public boolean execute(int resource) {\r
+ \r
+ ValueQuery valueQuery = ValueQuery.entry(QueryProcessor.this, resource);\r
+ if(valueQuery != null) update(graph, valueQuery);\r
+ \r
+ PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, resource);\r
+ if(principalTypes != null) update(graph, principalTypes);\r
+ Types types = Types.entry(QueryProcessor.this, resource);\r
+ if(types != null) update(graph, types);\r
+\r
+ SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);\r
+ if(superRelations != null) update(graph, superRelations);\r
+\r
+ predicates.add(resource);\r
+ \r
+ return true;\r
+ }\r
+\r
+ });\r
+\r
+ scheduledObjectUpdates.forEach(new TLongProcedure() {\r
+\r
+ @Override\r
+ public boolean execute(long arg0) {\r
+\r
+ final int subject = (int)(arg0 >>> 32);\r
+ final int predicate = (int)(arg0 & 0xffffffff);\r
+\r
+ if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {\r
+ PrincipalTypes principalTypes = PrincipalTypes.entry(QueryProcessor.this, subject);\r
+ if(principalTypes != null) update(graph, principalTypes);\r
+ Types types = Types.entry(QueryProcessor.this, subject);\r
+ if(types != null) update(graph, types);\r
+ }\r
+\r
+ if(predicate == subrelationOf) {\r
+ SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);\r
+ if(superRelations != null) update(graph, superRelations);\r
+ }\r
+\r
+ predicates.add(subject);\r
+ orderedSets.add(predicate);\r
+\r
+ return true;\r
+\r
+ }\r
+\r
+ });\r
+\r
+ predicates.forEach(new TIntProcedure() {\r
+\r
+ @Override\r
+ public boolean execute(final int subject) {\r
+\r
+ for(Objects o : Objects.entries(QueryProcessor.this, subject)) update(graph, o);\r
+ for(DirectObjects o : DirectObjects.entries(QueryProcessor.this, subject)) update(graph, o);\r
+ for(Statements o : Statements.entries(QueryProcessor.this, subject)) update(graph, o);\r
+\r
+ DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);\r
+ if(entry != null) update(graph, entry);\r
+\r
+ return true;\r
+\r
+ }\r
+\r
+ });\r
+\r
+ orderedSets.forEach(new TIntProcedure() {\r
+\r
+ @Override\r
+ public boolean execute(int orderedSet) {\r
+\r
+ OrderedSet entry = OrderedSet.entry(QueryProcessor.this, orderedSet);\r
+ if(entry != null) update(graph, entry);\r
+\r
+ return true;\r
+\r
+ }\r
+\r
+ });\r
+\r
+ // for (Integer subject : predicates) {\r
+ // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);\r
+ // if(entry != null) update(graph, entry);\r
+ // }\r
+\r
+\r
+ if (Development.DEVELOPMENT) {\r
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {\r
+ System.err.println("== Query update ends ==");\r
+ }\r
+ }\r
+\r
+ scheduledValueUpdates.clear();\r
+ scheduledObjectUpdates.clear();\r
+ scheduledInvalidates.clear();\r
+\r
+ }\r
+\r
+ public void updateValue(final int resource) {\r
+ scheduledValueUpdates.add(resource);\r
+ dirty = true;\r
+ }\r
+\r
+ public void updateStatements(final int resource, final int predicate) {\r
+ scheduledObjectUpdates.add((((long)resource) << 32) + predicate);\r
+ dirty = true;\r
+ }\r
+ \r
+ private int lastInvalidate = 0;\r
+ \r
+ public void invalidateResource(final int resource) {\r
+ if(lastInvalidate == resource) return;\r
+ scheduledValueUpdates.add(resource);\r
+ lastInvalidate = resource;\r
+ dirty = true;\r
+ }\r
+\r
+ public void updatePrimitive(final ExternalRead primitive) {\r
+\r
+ // External reads may be updated from arbitrary threads.\r
+ // Synchronize to prevent race-conditions.\r
+ synchronized (primitiveUpdateLock) {\r
+ scheduledPrimitiveUpdates.add(primitive);\r
+ }\r
+ querySupport.dirtyPrimitives();\r
+\r
+ }\r
+\r
+ @Override\r
+ public synchronized String toString() {\r
+ return "QueryProvider [size = " + size + ", hits = " + hits + " misses = " + misses + ", updates = " + updates + "]";\r
+ }\r
+\r
+ @Override\r
+ protected void doDispose() {\r
+\r
+ for(int index = 0; index < THREADS; index++) { \r
+ executors[index].dispose();\r
+ }\r
+\r
+ // First just wait\r
+ for(int i=0;i<100;i++) {\r
+\r
+ boolean alive = false;\r
+ for(int index = 0; index < THREADS; index++) { \r
+ alive |= executors[index].isAlive();\r
+ }\r
+ if(!alive) return;\r
+ try {\r
+ Thread.sleep(5);\r
+ } catch (InterruptedException e) {\r
+ Logger.defaultLogError(e);\r
+ }\r
+\r
+ }\r
+\r
+ // Then start interrupting\r
+ for(int i=0;i<100;i++) {\r
+\r
+ boolean alive = false;\r
+ for(int index = 0; index < THREADS; index++) { \r
+ alive |= executors[index].isAlive();\r
+ }\r
+ if(!alive) return;\r
+ for(int index = 0; index < THREADS; index++) {\r
+ executors[index].interrupt();\r
+ }\r
+ }\r
+\r
+ // // Then just destroy\r
+ // for(int index = 0; index < THREADS; index++) {\r
+ // executors[index].destroy();\r
+ // }\r
+\r
+ for(int index = 0; index < THREADS; index++) {\r
+ try {\r
+ executors[index].join(5000);\r
+ } catch (InterruptedException e) {\r
+ Logger.defaultLogError("QueryThread " + index + " will not die.", e);\r
+ }\r
+ executors[index] = null;\r
+ }\r
+\r
+ }\r
+\r
+ public int getHits() {\r
+ return hits;\r
+ }\r
+\r
+ public int getMisses() {\r
+ return misses;\r
+ }\r
+\r
+ public int getSize() {\r
+ return size;\r
+ }\r
+\r
+ public Set<Long> getReferencedClusters() {\r
+ HashSet<Long> result = new HashSet<Long>();\r
+ for (CacheEntry entry : objectsMap.values()) {\r
+ Objects query = (Objects) entry.getQuery();\r
+ result.add(querySupport.getClusterId(query.r1()));\r
+ }\r
+ for (CacheEntry entry : directPredicatesMap.values()) {\r
+ DirectPredicates query = (DirectPredicates) entry.getQuery();\r
+ result.add(querySupport.getClusterId(query.id));\r
+ }\r
+ for (CacheEntry entry : valueMap.values()) {\r
+ ValueQuery query = (ValueQuery) entry.getQuery();\r
+ result.add(querySupport.getClusterId(query.id));\r
+ }\r
+ return result;\r
+ }\r
+\r
+ public void assertDone() {\r
+ }\r
+\r
+ CacheCollectionResult allCaches(CacheCollectionResult result) {\r
+\r
+ int level = Integer.MAX_VALUE;\r
+ directPredicatesMap.values(level, result);\r
+ principalTypesMap.values(level, result);\r
+ for(CacheEntryBase e : uriToResourceMap.values())\r
+ if(e.getLevel() <= level)\r
+ result.add(e);\r
+ for(CacheEntryBase e : namespaceIndexMap22.values())\r
+ if(e.getLevel() <= level)\r
+ result.add(e);\r
+ projectsMap.values(level, result);\r
+ \r
+ relationInfoMap.values(level, result);\r
+ superTypesMap.values(level, result);\r
+ typeHierarchyMap.values(level, result);\r
+ superRelationsMap.values(level, result);\r
+ typesMap.values(level, result);\r
+\r
+ valueMap.values(level, result);\r
+ directObjectsMap.values(level, result);\r
+ objectsMap.values(level, result);\r
+ orderedSetMap.values(level, result);\r
+ predicatesMap.values(level, result);\r
+\r
+ statementsMap.values(level, result);\r
+ assertedPredicatesMap.values(level, result);\r
+ assertedStatementsMap.values(level, result);\r
+ externalReadMap.values(level, result);\r
+ asyncReadMap.values(level, result);\r
+ \r
+ readMap.values(level, result);\r
+ asyncMultiReadMap.values(level, result);\r
+ multiReadMap.values(level, result);\r
+\r
+ return result;\r
+\r
+ }\r
+\r
+ public void printDiagnostics() {\r
+ }\r
+\r
+ public void requestCluster(ReadGraphImpl graph, long clusterId, Runnable runnable) {\r
+ querySupport.requestCluster(graph, clusterId, runnable);\r
+ }\r
+\r
+ public int clean() {\r
+ collector.collect(0, Integer.MAX_VALUE);\r
+ return size;\r
+ }\r
+\r
+ public void clean(final Collection<ExternalRead<?>> requests) {\r
+ QueryCollectorSupport collectorSupport = new QueryCollectorSupport() {\r
+ Iterator<ExternalRead<?>> iterator = requests.iterator();\r
+ @Override\r
+ public CacheCollectionResult allCaches() {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+ @Override\r
+ public CacheEntryBase iterate(int level) {\r
+ if(iterator.hasNext()) {\r
+ ExternalRead<?> request = iterator.next();\r
+ ExternalReadEntry entry = externalReadMap.get(request);\r
+ if (entry != null) return entry;\r
+ else return iterate(level);\r
+ } else {\r
+ iterator = requests.iterator();\r
+ return null;\r
+ }\r
+ }\r
+ @Override\r
+ public void remove() {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+ @Override\r
+ public void setLevel(CacheEntryBase entry, int level) {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+ @Override\r
+ public Collection<CacheEntry> getRootList() {\r
+ ArrayList<CacheEntry> result = new ArrayList<CacheEntry>(requests.size());\r
+ for (ExternalRead<?> request : requests) {\r
+ ExternalReadEntry entry = externalReadMap.get(request);\r
+ if (entry != null)\r
+ result.add(entry);\r
+ }\r
+ return result;\r
+ }\r
+ @Override\r
+ public int getCurrentSize() {\r
+ return size;\r
+ }\r
+ @Override\r
+ public int calculateCurrentSize() {\r
+ // This tells the collector to attempt collecting everything.\r
+ return Integer.MAX_VALUE;\r
+ }\r
+ @Override\r
+ public boolean start(boolean flush) {\r
+ return true;\r
+ }\r
+ };\r
+ new QueryCollectorImpl2(this, collectorSupport).collect(0, Integer.MAX_VALUE);\r
+ }\r
+\r
+ public void scanPending() {\r
+\r
+ ArrayList<CacheEntry> entries = new ArrayList<CacheEntry>();\r
+\r
+ entries.addAll(directPredicatesMap.values());\r
+ entries.addAll(principalTypesMap.values());\r
+ entries.addAll(uriToResourceMap.values());\r
+ entries.addAll(namespaceIndexMap22.values());\r
+ entries.addAll(projectsMap.values());\r
+ entries.addAll(relationInfoMap.values());\r
+ entries.addAll(superTypesMap.values());\r
+ entries.addAll(superRelationsMap.values());\r
+ entries.addAll(typesMap.values());\r
+ entries.addAll(valueMap.values());\r
+ entries.addAll(directObjectsMap.values());\r
+ entries.addAll(objectsMap.values());\r
+ entries.addAll(orderedSetMap.values());\r
+ entries.addAll(predicatesMap.values());\r
+ entries.addAll(orderedSetMap.values());\r
+ entries.addAll(statementsMap.values());\r
+ // entries.addAll(assertedObjectsMap.values());\r
+ entries.addAll(assertedPredicatesMap.values());\r
+ entries.addAll(assertedStatementsMap.values());\r
+ entries.addAll(externalReadMap.values());\r
+ entries.addAll(asyncReadMap.values());\r
+ entries.addAll(externalReadMap.values());\r
+ entries.addAll(readMap.values());\r
+ entries.addAll(asyncMultiReadMap.values());\r
+ entries.addAll(multiReadMap.values());\r
+ entries.addAll(readMap.values());\r
+ System.out.println(entries.size() + " entries.");\r
+ for(Object e : entries) {\r
+ if(e instanceof CacheEntry) {\r
+ CacheEntry en = (CacheEntry)e;\r
+ if(en.isPending()) System.out.println("pending " + e);\r
+ if(en.isExcepted()) System.out.println("excepted " + e);\r
+ if(en.isDiscarded()) System.out.println("discarded " + e);\r
+ if(en.isRefuted()) System.out.println("refuted " + e);\r
+ if(en.isFresh()) System.out.println("fresh " + e);\r
+ } else {\r
+ //System.out.println("Unknown object " + e);\r
+ }\r
+ }\r
+\r
+ }\r
+\r
+ public ReadGraphImpl graphForVirtualRequest() {\r
+ return ReadGraphImpl.createAsync(this);\r
+ }\r
+\r
+ \r
+ private HashMap<Resource, Class<?>> builtinValues;\r
+ \r
+ public Class<?> getBuiltinValue(Resource r) {\r
+ if(builtinValues == null) initBuiltinValues();\r
+ return builtinValues.get(r);\r
+ }\r
+\r
+ Exception callerException = null;\r
+\r
+ public interface AsyncBarrier {\r
+ public void inc(); \r
+ public void dec();\r
+ // public void inc(String debug); \r
+ // public void dec(String debug);\r
+ }\r
+\r
+// final public QueryProcessor processor;\r
+// final public QuerySupport support;\r
+\r
+ // boolean disposed = false;\r
+\r
+ private void initBuiltinValues() {\r
+\r
+ Layer0 b = getSession().peekService(Layer0.class);\r
+ if(b == null) return;\r
+\r
+ builtinValues = new HashMap<Resource, Class<?>>();\r
+\r
+ builtinValues.put(b.String, String.class);\r
+ builtinValues.put(b.Double, Double.class);\r
+ builtinValues.put(b.Float, Float.class);\r
+ builtinValues.put(b.Long, Long.class);\r
+ builtinValues.put(b.Integer, Integer.class);\r
+ builtinValues.put(b.Byte, Byte.class);\r
+ builtinValues.put(b.Boolean, Boolean.class);\r
+\r
+ builtinValues.put(b.StringArray, String[].class);\r
+ builtinValues.put(b.DoubleArray, double[].class);\r
+ builtinValues.put(b.FloatArray, float[].class);\r
+ builtinValues.put(b.LongArray, long[].class);\r
+ builtinValues.put(b.IntegerArray, int[].class);\r
+ builtinValues.put(b.ByteArray, byte[].class);\r
+ builtinValues.put(b.BooleanArray, boolean[].class);\r
+\r
+ }\r
+\r
+// public ReadGraphSupportImpl(final QueryProcessor provider2) {\r
+//\r
+// if (null == provider2) {\r
+// this.processor = null;\r
+// support = null;\r
+// return;\r
+// }\r
+// this.processor = provider2;\r
+// support = provider2.getCore();\r
+// initBuiltinValues();\r
+//\r
+// }\r
+\r
+// final static public ReadGraphSupportImpl basedOn(ReadGraphSupportImpl impl) {\r
+// return new ReadGraphSupportImpl(impl.processor);\r
+// }\r
+\r
+ @Override\r
+ final public Session getSession() {\r
+ return session;\r
+ }\r
+ \r
+ final public ResourceSupport getResourceSupport() {\r
+ return resourceSupport;\r
+ }\r
+\r
+ @Override\r
+ final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ IntProcedure ip = new IntProcedure() {\r
+\r
+ AtomicBoolean first = new AtomicBoolean(true);\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int i) {\r
+ try {\r
+ if(first.get()) {\r
+ procedure.execute(graph, querySupport.getResource(i));\r
+ } else {\r
+ procedure.execute(impl.newRestart(graph), querySupport.getResource(i));\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.finished(graph);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.finished(impl.newRestart(graph));\r
+ }\r
+\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.exception(graph, t);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.exception(impl.newRestart(graph), t);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ };\r
+\r
+ int sId = querySupport.getId(subject);\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Predicates#" + sId);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ Predicates.queryEach(impl, sId, this, impl.parent, listener, ip);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ Predicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int i) {\r
+ try {\r
+ procedure.execute(querySupport.getResource(i));\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ try {\r
+ procedure.finished();\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+ \r
+ @Override\r
+ final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {\r
+\r
+ assert(subject != null);\r
+ \r
+ return Predicates.queryEach2(impl, querySupport.getId(subject), this, impl.parent);\r
+\r
+ }\r
+ \r
+\r
+ @Override\r
+ final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,\r
+ final Resource predicate, final MultiProcedure<Statement> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ Statements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
+ try {\r
+ procedure.execute(querySupport.getStatement(s, p, o));\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ try {\r
+ procedure.finished();\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,\r
+ final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {\r
+\r
+ boolean first = true;\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
+ try {\r
+ if(first) {\r
+ procedure.execute(graph, querySupport.getStatement(s, p, o));\r
+ } else {\r
+ procedure.execute(impl.newRestart(graph), querySupport.getStatement(s, p, o));\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+\r
+ try {\r
+ if(first) {\r
+ first = false;\r
+ procedure.finished(graph);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.finished(impl.newRestart(graph));\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+\r
+ try {\r
+ if(first) {\r
+ first = false;\r
+ procedure.exception(graph, t);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.exception(impl.newRestart(graph), t);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+\r
+ }\r
+\r
+ };\r
+\r
+ int sId = querySupport.getId(subject);\r
+ int pId = querySupport.getId(predicate);\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,\r
+ final Resource predicate, final StatementProcedure procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ TripleIntProcedureAdapter proc = new TripleIntProcedureAdapter() {\r
+\r
+ boolean first = true;\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
+ try {\r
+ if(first) {\r
+ procedure.execute(graph, s, p, o);\r
+ } else {\r
+ procedure.execute(impl.newRestart(graph), s, p, o);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+\r
+ try {\r
+ if(first) {\r
+ first = false;\r
+ procedure.finished(graph);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.finished(impl.newRestart(graph));\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+\r
+ try {\r
+ if(first) {\r
+ first = false;\r
+ procedure.exception(graph, t);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.exception(impl.newRestart(graph), t);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+\r
+ }\r
+\r
+ };\r
+\r
+ int sId = querySupport.getId(subject);\r
+ int pId = querySupport.getId(predicate);\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#Statements" + sId + "#" + pId);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ Statements.queryEach(impl, sId, pId, this, impl.parent, listener, proc);\r
+\r
+ }\r
+ \r
+ @Override\r
+ final public void forStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+ forEachStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {\r
+\r
+ private Set<Statement> current = null;\r
+ private Set<Statement> run = new HashSet<Statement>();\r
+\r
+ @Override\r
+ public void execute(AsyncReadGraph graph, Statement result) {\r
+\r
+ boolean found = false;\r
+\r
+ if(current != null) {\r
+\r
+ found = current.remove(result);\r
+\r
+ }\r
+\r
+ if(!found) procedure.add(graph, result);\r
+\r
+ run.add(result);\r
+\r
+ }\r
+\r
+ @Override\r
+ public void finished(AsyncReadGraph graph) {\r
+\r
+ if(current != null) { \r
+ for(Statement r : current) procedure.remove(graph, r);\r
+ }\r
+\r
+ current = run;\r
+\r
+ run = new HashSet<Statement>();\r
+\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable t) {\r
+ procedure.exception(graph, t);\r
+ }\r
+\r
+ @Override\r
+ public boolean isDisposed() {\r
+ return procedure.isDisposed();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forEachAssertedStatement(final ReadGraphImpl impl, final Resource subject,\r
+ final Resource predicate, final AsyncMultiProcedure<Statement> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedureAdapter() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
+ try {\r
+ procedure.execute(graph, querySupport.getStatement(s, p, o));\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ try {\r
+ procedure.finished(graph);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ private static ListenerBase getListenerBase(Object procedure) {\r
+ if(procedure instanceof ListenerBase) return (ListenerBase)procedure;\r
+ else return null;\r
+ }\r
+\r
+ @Override\r
+ final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final MultiProcedure<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ Objects.runner(impl, querySupport.getId(subject), querySupport.getId(predicate), impl.parent, listener, new IntProcedure() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int i) {\r
+ try {\r
+ procedure.execute(querySupport.getResource(i));\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ try {\r
+ procedure.finished();\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ System.out.println("forEachObject exception " + t);\r
+ try {\r
+ procedure.exception(t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+\r
+// @Override\r
+// final public void forEachDirectObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {\r
+//\r
+// assert(subject != null);\r
+// assert(predicate != null);\r
+// assert(procedure != null);\r
+//\r
+// final ListenerBase listener = getListenerBase(procedure);\r
+//\r
+// int sId = querySupport.getId(subject);\r
+// int pId = querySupport.getId(predicate);\r
+//\r
+// MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, support);\r
+//\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectObjects" + sId + "#" + pId);\r
+// else impl.state.barrier.inc(null, null);\r
+//\r
+// // final Exception caller = new Exception();\r
+//\r
+// // final Pair<Exception, Exception> exceptions = Pair.make(callerException, new Exception());\r
+//\r
+// DirectObjects.queryEach(impl, sId, pId, processor, impl.parent, listener, proc);\r
+//\r
+// }\r
+\r
+ @Override\r
+ final public void forEachDirectPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ MultiIntProcedure proc = new MultiIntProcedure(procedure, impl, querySupport);\r
+\r
+ int sId = querySupport.getId(subject);\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(proc, "#DirectPredicates" + sId);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ DirectPredicates.queryEach(impl, sId, this, impl.parent, listener, proc);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final Procedure<DirectStatements> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forEachDirectStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<DirectStatements> procedure, boolean ignoreVirtual) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ org.simantics.db.impl.query.DirectStatements.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure, ignoreVirtual);\r
+\r
+ }\r
+\r
+ private static final Resource INVALID_RESOURCE = new ResourceImpl(null, Integer.MIN_VALUE);\r
+\r
+ @Override\r
+ final public void forPossibleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Resource> procedure) {\r
+\r
+ forEachObject(impl, subject, predicate, new AsyncMultiProcedure<Resource>() {\r
+\r
+ private Resource single = null;\r
+\r
+ @Override\r
+ public synchronized void execute(AsyncReadGraph graph, Resource result) {\r
+ if(single == null) {\r
+ single = result;\r
+ } else {\r
+ single = INVALID_RESOURCE;\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public synchronized void finished(AsyncReadGraph graph) {\r
+ if(single == null || single == INVALID_RESOURCE) procedure.execute(graph, null);\r
+ else procedure.execute(graph, single);\r
+ }\r
+\r
+ @Override\r
+ public synchronized void exception(AsyncReadGraph graph, Throwable throwable) {\r
+ procedure.exception(graph, throwable);\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final ListenerBase listener, final IntProcedure procedure) {\r
+ \r
+ final int sId = querySupport.getId(subject);\r
+ final int pId = querySupport.getId(predicate);\r
+\r
+ Objects.runner(impl, sId, pId, impl.parent, listener, procedure);\r
+ \r
+ }\r
+ \r
+ final public int getSingleObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate) throws DatabaseException {\r
+\r
+ final int sId = querySupport.getId(subject);\r
+ final int pId = querySupport.getId(predicate);\r
+\r
+ return Objects.runner2(impl, sId, pId, impl.parent);\r
+ \r
+ }\r
+\r
+ final public void forEachObject(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ if(impl.parent != null || listener != null) {\r
+\r
+ IntProcedure ip = new IntProcedure() {\r
+\r
+ AtomicBoolean first = new AtomicBoolean(true);\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int i) {\r
+ try {\r
+ if(first.get()) {\r
+ procedure.execute(impl, querySupport.getResource(i));\r
+ } else {\r
+ procedure.execute(impl.newRestart(graph), querySupport.getResource(i));\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.finished(impl);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.finished(impl.newRestart(graph));\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec(this);\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return "forEachObject with " + procedure;\r
+ }\r
+\r
+ };\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Objects" + subject + "#" + predicate);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ forEachObject(impl, subject, predicate, listener, ip);\r
+\r
+ } else {\r
+\r
+ IntProcedure ip = new IntProcedure() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int i) {\r
+ procedure.execute(graph, querySupport.getResource(i));\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ procedure.finished(graph);\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ procedure.exception(graph, t);\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return "forEachObject with " + procedure;\r
+ }\r
+\r
+ };\r
+\r
+ forEachObject(impl, subject, predicate, listener, ip);\r
+\r
+ }\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+ forEachObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {\r
+\r
+ private Set<Resource> current = null;\r
+ private Set<Resource> run = new HashSet<Resource>();\r
+\r
+ @Override\r
+ public void execute(AsyncReadGraph graph, Resource result) {\r
+\r
+ boolean found = false;\r
+\r
+ if(current != null) {\r
+\r
+ found = current.remove(result);\r
+\r
+ }\r
+\r
+ if(!found) procedure.add(graph, result);\r
+\r
+ run.add(result);\r
+\r
+ }\r
+\r
+ @Override\r
+ public void finished(AsyncReadGraph graph) {\r
+\r
+ if(current != null) { \r
+ for(Resource r : current) procedure.remove(graph, r);\r
+ }\r
+\r
+ current = run;\r
+\r
+ run = new HashSet<Resource>();\r
+\r
+ }\r
+\r
+ @Override\r
+ public boolean isDisposed() {\r
+ return procedure.isDisposed();\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable t) {\r
+ procedure.exception(graph, t);\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return "forObjectSet " + procedure;\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forPredicateSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ forEachPredicate(impl, subject, new AsyncMultiListener<Resource>() {\r
+\r
+ private Set<Resource> current = null;\r
+ private Set<Resource> run = new HashSet<Resource>();\r
+\r
+ @Override\r
+ public void execute(AsyncReadGraph graph, Resource result) {\r
+\r
+ boolean found = false;\r
+\r
+ if(current != null) {\r
+\r
+ found = current.remove(result);\r
+\r
+ }\r
+\r
+ if(!found) procedure.add(graph, result);\r
+\r
+ run.add(result);\r
+\r
+ }\r
+\r
+ @Override\r
+ public void finished(AsyncReadGraph graph) {\r
+\r
+ if(current != null) { \r
+ for(Resource r : current) procedure.remove(graph, r);\r
+ }\r
+\r
+ current = run;\r
+\r
+ run = new HashSet<Resource>();\r
+\r
+ }\r
+\r
+ @Override\r
+ public boolean isDisposed() {\r
+ return procedure.isDisposed();\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable t) {\r
+ procedure.exception(graph, t);\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return "forPredicateSet " + procedure;\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forPrincipalTypeSet(final ReadGraphImpl impl, final Resource subject, final AsyncSetListener<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ forEachPrincipalType(impl, subject, new AsyncMultiListener<Resource>() {\r
+\r
+ private Set<Resource> current = null;\r
+ private Set<Resource> run = new HashSet<Resource>();\r
+\r
+ @Override\r
+ public void execute(AsyncReadGraph graph, Resource result) {\r
+\r
+ boolean found = false;\r
+\r
+ if(current != null) {\r
+\r
+ found = current.remove(result);\r
+\r
+ }\r
+\r
+ if(!found) procedure.add(graph, result);\r
+\r
+ run.add(result);\r
+\r
+ }\r
+\r
+ @Override\r
+ public void finished(AsyncReadGraph graph) {\r
+\r
+ if(current != null) { \r
+ for(Resource r : current) procedure.remove(graph, r);\r
+ }\r
+\r
+ current = run;\r
+\r
+ run = new HashSet<Resource>();\r
+\r
+ }\r
+\r
+ @Override\r
+ public boolean isDisposed() {\r
+ return procedure.isDisposed();\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable t) {\r
+ procedure.exception(graph, t);\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return "forPrincipalTypeSet " + procedure;\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forAssertedObjectSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+ forEachAssertedObject(impl, subject, predicate, new AsyncMultiListener<Resource>() {\r
+\r
+ private Set<Resource> current = null;\r
+ private Set<Resource> run = new HashSet<Resource>();\r
+\r
+ @Override\r
+ public void execute(AsyncReadGraph graph, Resource result) {\r
+\r
+ boolean found = false;\r
+\r
+ if(current != null) {\r
+\r
+ found = current.remove(result);\r
+\r
+ }\r
+\r
+ if(!found) procedure.add(graph, result);\r
+\r
+ run.add(result);\r
+\r
+ }\r
+\r
+ @Override\r
+ public void finished(AsyncReadGraph graph) {\r
+\r
+ if(current != null) { \r
+ for(Resource r : current) procedure.remove(graph, r);\r
+ }\r
+\r
+ current = run;\r
+\r
+ run = new HashSet<Resource>();\r
+\r
+ }\r
+\r
+ @Override\r
+ public boolean isDisposed() {\r
+ return procedure.isDisposed();\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable t) {\r
+ procedure.exception(graph, t);\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return "forObjectSet " + procedure;\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forAssertedStatementSet(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncSetListener<Statement> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+ forEachAssertedStatement(impl, subject, predicate, new AsyncMultiListener<Statement>() {\r
+\r
+ private Set<Statement> current = null;\r
+ private Set<Statement> run = new HashSet<Statement>();\r
+\r
+ @Override\r
+ public void execute(AsyncReadGraph graph, Statement result) {\r
+\r
+ boolean found = false;\r
+\r
+ if(current != null) {\r
+\r
+ found = current.remove(result);\r
+\r
+ }\r
+\r
+ if(!found) procedure.add(graph, result);\r
+\r
+ run.add(result);\r
+\r
+ }\r
+\r
+ @Override\r
+ public void finished(AsyncReadGraph graph) {\r
+\r
+ if(current != null) { \r
+ for(Statement s : current) procedure.remove(graph, s);\r
+ }\r
+\r
+ current = run;\r
+\r
+ run = new HashSet<Statement>();\r
+\r
+ }\r
+\r
+ @Override\r
+ public boolean isDisposed() {\r
+ return procedure.isDisposed();\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable t) {\r
+ procedure.exception(graph, t);\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return "forStatementSet " + procedure;\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forEachAssertedObject(final ReadGraphImpl impl, final Resource subject,\r
+ final Resource predicate, final AsyncMultiProcedure<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ AssertedStatements.queryEach(impl, querySupport.getId(subject), querySupport.getId(predicate), this, impl.parent, listener, new TripleIntProcedure() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int s, int p, int o) {\r
+ try {\r
+ procedure.execute(graph, querySupport.getResource(o));\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ try { \r
+ procedure.finished(graph);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ IntProcedure ip = new IntProcedure() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int i) {\r
+ try {\r
+ procedure.execute(graph, querySupport.getResource(i));\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ try {\r
+ procedure.finished(graph);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec(this);\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec(this);\r
+ }\r
+\r
+ };\r
+\r
+ int sId = querySupport.getId(subject);\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#PrincipalTypes#" + sId);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ PrincipalTypes.queryEach(impl, sId, this, impl.parent, listener, ip);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forEachPrincipalType(final ReadGraphImpl impl, final Resource subject, final MultiProcedure<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ PrincipalTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int i) {\r
+ try {\r
+ procedure.execute(querySupport.getResource(i));\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ try {\r
+ procedure.finished();\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ final public void forTypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {\r
+\r
+ AtomicBoolean first = new AtomicBoolean(true);\r
+\r
+ @Override\r
+ public void execute(final ReadGraphImpl graph, IntSet set) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.execute(graph, set);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.execute(impl.newRestart(graph), set);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.exception(graph, t);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.exception(impl.newRestart(graph), t);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ };\r
+\r
+ int sId = querySupport.getId(subject);\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Types" + sId);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ Types.queryEach(impl, sId, this, impl.parent, listener, ip);\r
+\r
+ }\r
+ \r
+ @Override\r
+ final public IntSet getTypes(final ReadGraphImpl impl, final Resource subject) throws Throwable {\r
+\r
+ assert(subject != null);\r
+ \r
+ return Types.queryEach2(impl, querySupport.getId(subject), this, impl.parent);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forRelationInfo(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<RelationInfo> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ RelationInfoQuery.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<RelationInfo>() {\r
+\r
+ AtomicBoolean first = new AtomicBoolean(true);\r
+\r
+ @Override\r
+ public void execute(final ReadGraphImpl graph, RelationInfo set) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.execute(graph, set);\r
+// impl.state.barrier.dec();\r
+ } else {\r
+ procedure.execute(impl.newRestart(graph), set);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.exception(graph, t);\r
+// impl.state.barrier.dec("ReadGraphSupportImpl.1353");\r
+ } else {\r
+ procedure.exception(impl.newRestart(graph), t);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forSupertypes(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ SuperTypes.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new InternalProcedure<IntSet>() {\r
+\r
+ AtomicBoolean first = new AtomicBoolean(true);\r
+\r
+ @Override\r
+ public void execute(final ReadGraphImpl graph, IntSet set) {\r
+// final HashSet<Resource> result = new HashSet<Resource>();\r
+// set.forEach(new TIntProcedure() {\r
+//\r
+// @Override\r
+// public boolean execute(int type) {\r
+// result.add(querySupport.getResource(type));\r
+// return true;\r
+// }\r
+//\r
+// });\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.execute(graph, set);\r
+// impl.state.barrier.dec();\r
+ } else {\r
+ procedure.execute(impl.newRestart(graph), set);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.exception(graph, t);\r
+// impl.state.barrier.dec();\r
+ } else {\r
+ procedure.exception(impl.newRestart(graph), t);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forDirectSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ IntProcedure ip = new IntProcedureAdapter() {\r
+\r
+ @Override\r
+ public void execute(final ReadGraphImpl graph, int superRelation) {\r
+ try {\r
+ procedure.execute(graph, querySupport.getResource(superRelation));\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(final ReadGraphImpl graph) {\r
+ try {\r
+ procedure.finished(graph);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec(this);\r
+ }\r
+\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec(this);\r
+ }\r
+\r
+ };\r
+\r
+ int sId = querySupport.getId(subject); \r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectSuperRelations#" + sId);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ DirectSuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forPossibleSuperrelation(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ PossibleSuperRelation.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, procedure);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forSuperrelations(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Set<Resource>> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ InternalProcedure<IntSet> ip = new InternalProcedure<IntSet>() {\r
+\r
+ @Override\r
+ public void execute(final ReadGraphImpl graph, IntSet set) {\r
+// final HashSet<Resource> result = new HashSet<Resource>();\r
+// set.forEach(new TIntProcedure() {\r
+//\r
+// @Override\r
+// public boolean execute(int type) {\r
+// result.add(querySupport.getResource(type));\r
+// return true;\r
+// }\r
+//\r
+// });\r
+ try {\r
+ procedure.execute(graph, set);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec(this);\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec(this);\r
+ }\r
+\r
+ };\r
+\r
+ int sId = querySupport.getId(subject);\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#SuperRelations#" + sId);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ SuperRelations.queryEach(impl, sId, this, impl.parent, listener, ip);\r
+\r
+ }\r
+\r
+ final public byte[] getValue(final ReadGraphImpl impl, final Resource subject) throws DatabaseException {\r
+\r
+ int sId = querySupport.getId(subject);\r
+ return ValueQuery.queryEach(impl, sId, impl.parent);\r
+\r
+ }\r
+\r
+ final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {\r
+\r
+ return ValueQuery.queryEach(impl, subject, impl.parent);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {\r
+\r
+ assert(subject != null);\r
+\r
+ int sId = querySupport.getId(subject);\r
+\r
+ if(procedure != null) {\r
+ \r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {\r
+\r
+ AtomicBoolean first = new AtomicBoolean(true);\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, byte[] result) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.execute(graph, result);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.execute(impl.newRestart(graph), result);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.exception(graph, t);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.exception(impl.newRestart(graph), t);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ };\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ return ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);\r
+\r
+ } else {\r
+\r
+ return ValueQuery.queryEach(impl, sId, impl.parent, null, null);\r
+ \r
+ }\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forPossibleValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ if(impl.parent != null || listener != null) {\r
+\r
+ InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {\r
+\r
+ AtomicBoolean first = new AtomicBoolean(true);\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, byte[] result) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.execute(graph, result);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.execute(impl.newRestart(graph), result);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ if(first.compareAndSet(true, false)) {\r
+ procedure.exception(graph, t);\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.exception(impl.newRestart(graph), t);\r
+ }\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ };\r
+\r
+ int sId = querySupport.getId(subject);\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#Value" + sId);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);\r
+\r
+ } else {\r
+\r
+ InternalProcedure<byte[]> ip = new InternalProcedure<byte[]>() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, byte[] result) {\r
+\r
+ procedure.execute(graph, result);\r
+\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+\r
+ procedure.exception(graph, t);\r
+\r
+ }\r
+\r
+ };\r
+\r
+ int sId = querySupport.getId(subject);\r
+\r
+ ValueQuery.queryEach(impl, sId, impl.parent, listener, ip);\r
+\r
+ }\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forInverse(final ReadGraphImpl impl, final Resource relation, final AsyncProcedure<Resource> procedure) {\r
+\r
+ assert(relation != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+ IntProcedure ip = new IntProcedure() {\r
+\r
+ private int result = 0;\r
+ \r
+ final AtomicBoolean found = new AtomicBoolean(false);\r
+ final AtomicBoolean done = new AtomicBoolean(false);\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ \r
+ // Shall fire exactly once!\r
+ if(done.compareAndSet(false, true)) {\r
+ try {\r
+ if(result == 0) {\r
+ procedure.exception(graph, new NoInverseException(""));\r
+// impl.state.barrier.dec(this);\r
+ } else {\r
+ procedure.execute(graph, querySupport.getResource(result));\r
+// impl.state.barrier.dec(this);\r
+ }\r
+ } catch (Throwable t) {\r
+ Logger.defaultLogError(t);\r
+ }\r
+ }\r
+ \r
+ }\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int i) {\r
+ \r
+ if(found.compareAndSet(false, true)) {\r
+ this.result = i;\r
+ } else {\r
+ // Shall fire exactly once!\r
+ if(done.compareAndSet(false, true)) {\r
+ try {\r
+ procedure.exception(graph, new ManyObjectsForFunctionalRelationException("Multiple items e.g. " + this.result + " and " + result));\r
+// impl.state.barrier.dec(this);\r
+ } catch (Throwable t) {\r
+ Logger.defaultLogError(t);\r
+ }\r
+ }\r
+ }\r
+ \r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ // Shall fire exactly once!\r
+ if(done.compareAndSet(false, true)) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+// impl.state.barrier.dec(this);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+ }\r
+\r
+ };\r
+\r
+ int sId = querySupport.getId(relation);\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#DirectObjects#" + sId);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ Objects.runner(impl, sId, getInverseOf(), impl.parent, listener, ip);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forResource(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {\r
+\r
+ assert(id != null);\r
+ assert(procedure != null);\r
+\r
+ InternalProcedure<Integer> ip = new InternalProcedure<Integer>() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, Integer result) {\r
+ try {\r
+ procedure.execute(graph, querySupport.getResource(result));\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec(this);\r
+ } \r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec(this);\r
+ }\r
+\r
+ };\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "Resource");\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ forResource(impl, id, impl.parent, ip);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forBuiltin(final ReadGraphImpl impl, final String id, final AsyncProcedure<Resource> procedure) {\r
+\r
+ assert(id != null);\r
+ assert(procedure != null);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ forBuiltin(impl, id, impl.parent, new InternalProcedure<Integer>() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, Integer result) {\r
+ try {\r
+ procedure.execute(graph, querySupport.getResource(result)); \r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ } \r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ DirectPredicates.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
+\r
+ boolean found = false;\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int object) {\r
+ found = true;\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ try {\r
+ procedure.execute(graph, found);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final AsyncProcedure<Boolean> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+ AsyncMultiProcedure<Resource> ip = new AsyncMultiProcedureAdapter<Resource>() {\r
+\r
+ boolean found = false;\r
+\r
+ @Override\r
+ synchronized public void execute(AsyncReadGraph graph, Resource resource) {\r
+ found = true;\r
+ }\r
+\r
+ @Override\r
+ synchronized public void finished(AsyncReadGraph graph) {\r
+ try {\r
+ procedure.execute(graph, found);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec(this);\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec(this);\r
+ }\r
+\r
+ };\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(ip, "#ForEachObject#" + subject + "#" + predicate);\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ forEachObject(impl, subject, predicate, ip);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forHasStatement(final ReadGraphImpl impl, final Resource subject, final Resource predicate, final Resource object, final AsyncProcedure<Boolean> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(predicate != null);\r
+ assert(procedure != null);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ forEachObject(impl, subject, predicate, new AsyncMultiProcedureAdapter<Resource>() {\r
+\r
+ boolean found = false;\r
+\r
+ @Override\r
+ synchronized public void execute(AsyncReadGraph graph, Resource resource) {\r
+ if(resource.equals(object)) found = true;\r
+ }\r
+\r
+ @Override\r
+ synchronized public void finished(AsyncReadGraph graph) {\r
+ try {\r
+ procedure.execute(graph, found);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forHasValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<Boolean> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ ValueQuery.queryEach(impl, querySupport.getId(subject), impl.parent, listener, new InternalProcedure<byte[]>() {\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, byte[] object) {\r
+ boolean result = object != null;\r
+ try {\r
+ procedure.execute(graph, result);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public void forOrderedSet(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {\r
+\r
+ assert(subject != null);\r
+ assert(procedure != null);\r
+\r
+ final ListenerBase listener = getListenerBase(procedure);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ OrderedSet.queryEach(impl, querySupport.getId(subject), this, impl.parent, listener, new IntProcedure() {\r
+\r
+ @Override\r
+ public void exception(ReadGraphImpl graph, Throwable t) {\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ @Override\r
+ public void execute(ReadGraphImpl graph, int i) {\r
+ try {\r
+ procedure.execute(graph, querySupport.getResource(i));\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(ReadGraphImpl graph) {\r
+ try {\r
+ procedure.finished(graph);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+// impl.state.barrier.dec();\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public <T> void query(final ReadGraphImpl impl, final AsyncRead<T> request, final CacheEntry parent, final AsyncProcedure<T> procedure, ListenerBase listener) {\r
+\r
+ assert(request != null);\r
+ assert(procedure != null);\r
+\r
+// if(AsyncBarrierImpl.BOOKKEEPING) impl.state.barrier.inc(request, "#" + request.toString() + ".1999");\r
+// else impl.state.barrier.inc(null, null);\r
+\r
+ runAsyncRead(impl, request, parent, listener, procedure);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public <T> T tryQuery(final ReadGraphImpl graph, final Read<T> request) throws DatabaseException {\r
+\r
+ assert(graph != null);\r
+ assert(request != null);\r
+\r
+ final ReadEntry entry = readMap.get(request);\r
+ if(entry != null && entry.isReady()) {\r
+ return (T)entry.get(graph, this, null);\r
+ } else {\r
+ return request.perform(graph);\r
+ }\r
+\r
+ }\r
+\r
+ final public <T> T tryQuery(final ReadGraphImpl graph, final ExternalRead<T> request) throws DatabaseException {\r
+\r
+ assert(graph != null);\r
+ assert(request != null);\r
+\r
+ final ExternalReadEntry<T> entry = externalReadMap.get(request);\r
+ if(entry != null && entry.isReady()) {\r
+ if(entry.isExcepted()) {\r
+ Throwable t = (Throwable)entry.getResult();\r
+ if(t instanceof DatabaseException) throw (DatabaseException)t;\r
+ else throw new DatabaseException(t);\r
+ } else {\r
+ return (T)entry.getResult();\r
+ } \r
+ } else {\r
+\r
+ final DataContainer<T> result = new DataContainer<T>();\r
+ final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
+ \r
+ request.register(graph, new Listener<T>() {\r
+ \r
+ @Override\r
+ public void exception(Throwable t) {\r
+ exception.set(t);\r
+ }\r
+\r
+ @Override\r
+ public void execute(T t) {\r
+ result.set(t);\r
+ }\r
+\r
+ @Override\r
+ public boolean isDisposed() {\r
+ return true;\r
+ }\r
+ \r
+ });\r
+ \r
+ Throwable t = exception.get();\r
+ if(t != null) {\r
+ if(t instanceof DatabaseException) throw (DatabaseException)t;\r
+ else throw new DatabaseException(t);\r
+ }\r
+ \r
+ return result.get();\r
+\r
+ }\r
+\r
+ }\r
+ \r
+ @Override\r
+ final public <T> void tryQuery(final ReadGraphImpl graph, final AsyncRead<T> request, AsyncProcedure<T> procedure) {\r
+\r
+ assert(graph != null);\r
+ assert(request != null);\r
+\r
+ final AsyncReadEntry entry = asyncReadMap.get(request);\r
+ if(entry != null && entry.isReady()) {\r
+ if(entry.isExcepted()) {\r
+ procedure.exception(graph, (Throwable)entry.getResult());\r
+ } else {\r
+ procedure.execute(graph, (T)entry.getResult());\r
+ }\r
+ } else {\r
+ request.perform(graph, procedure);\r
+ }\r
+\r
+ }\r
+\r
+ @Override\r
+ final public <T> void query(final ReadGraphImpl impl, final MultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {\r
+\r
+ assert(request != null);\r
+ assert(procedure != null);\r
+\r
+// impl.state.barrier.inc(null, null);\r
+\r
+ queryMultiRead(impl, request, parent, listener, procedure);\r
+\r
+ }\r
+\r
+ @Override\r
+ final public <T> void query(final ReadGraphImpl impl, final AsyncMultiRead<T> request, final CacheEntry parent, final AsyncMultiProcedure<T> procedure, ListenerBase listener) {\r
+\r
+ assert(request != null);\r
+ assert(procedure != null);\r
+\r
+// impl.state.barrier.inc();\r
+\r
+ runAsyncMultiRead(impl, request, parent, listener, new AsyncMultiProcedure<T>() {\r
+\r
+ public void execute(AsyncReadGraph graph, T result) {\r
+\r
+ try {\r
+ procedure.execute(graph, result);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void finished(AsyncReadGraph graph) {\r
+\r
+ try {\r
+ procedure.finished(graph);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+\r
+// impl.state.barrier.dec();\r
+\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return procedure.toString();\r
+ }\r
+\r
+ @Override\r
+ public void exception(AsyncReadGraph graph, Throwable t) {\r
+\r
+ try {\r
+ procedure.exception(graph, t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+\r
+// impl.state.barrier.dec();\r
+\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {\r
+\r
+ assert(request != null);\r
+ assert(procedure != null);\r
+\r
+ queryPrimitiveRead(impl, request, parent, listener, new Procedure<T>() {\r
+\r
+ @Override\r
+ public void execute(T result) {\r
+ try {\r
+ procedure.execute(result);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return procedure.toString();\r
+ }\r
+\r
+ @Override\r
+ public void exception(Throwable t) {\r
+ try {\r
+ procedure.exception(t);\r
+ } catch (Throwable t2) {\r
+ Logger.defaultLogError(t2);\r
+ }\r
+ }\r
+\r
+ });\r
+\r
+ }\r
+\r
+ @Override\r
+ public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {\r
+ \r
+ return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));\r
+ \r
+ }\r
+ \r
+ @Override\r
+ public VirtualGraph getProvider(Resource subject, Resource predicate) {\r
+ \r
+ return querySupport.getProvider(querySupport.getId(subject), querySupport.getId(predicate));\r
+ \r
+ }\r
+\r
+ @Override\r
+ public VirtualGraph getValueProvider(Resource subject) {\r
+ \r
+ return querySupport.getValueProvider(querySupport.getId(subject));\r
+ \r
+ }\r
+\r
+ public boolean resumeTasks(ReadGraphImpl graph) {\r
+\r
+ return querySupport.resume(graph);\r
+\r
+ }\r
+ \r
+ public boolean isImmutable(int resourceId) {\r
+ return querySupport.isImmutable(resourceId);\r
+ }\r
+\r
+ public boolean isImmutable(Resource resource) {\r
+ ResourceImpl impl = (ResourceImpl)resource;\r
+ return isImmutable(impl.id);\r
+ }\r
+ \r
+ private Layer0 L0;\r
+ \r
+ public Layer0 getL0(ReadGraph graph) {\r
+ if(L0 == null) {\r
+ L0 = Layer0.getInstance(graph);\r
+ }\r
+ return L0;\r
+ }\r
+ \r
+}\r