X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=6d9560940be2b1c19b73616c85a2996ee2b65d25;hp=7743907202eb8fbf54fd584330e3dc84fcdfac16;hb=73884c78cf2695329aa3b6559c4c6a196792ac9b;hpb=ab7918e4c6dc68b4561fdfe4aa5fd93bf036ad58 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 774390720..6d9560940 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -44,7 +44,6 @@ import org.simantics.db.Statement; import org.simantics.db.VirtualGraph; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.utils.Logger; -import org.simantics.db.debug.ListenerReport; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ManyObjectsForFunctionalRelationException; import org.simantics.db.exception.NoInverseException; @@ -53,7 +52,6 @@ import org.simantics.db.impl.ResourceImpl; import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.ReadGraphSupport; -import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.procedure.IntProcedureAdapter; import org.simantics.db.impl.procedure.InternalProcedure; import org.simantics.db.impl.procedure.TripleIntProcedureAdapter; @@ -130,39 +128,29 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap AtomicInteger sleepers = new AtomicInteger(0); - private boolean updating = false; + boolean updating = false; - private boolean firingListeners = false; - final public QueryCache cache; final public QuerySupport querySupport; final public Session session; final public ResourceSupport resourceSupport; - - private THashSet scheduledListeners = new THashSet(); + + final public Semaphore requests = new Semaphore(1); + + final public QueryListening listening = new QueryListening(this); QueryThread[] executors; -// public ArrayList[] queues; - - public LinkedList freeScheduling = new LinkedList(); - enum ThreadState { INIT, RUN, SLEEP, DISPOSED } + final Scheduling scheduling; + public ThreadState[] threadStates; -// public ReentrantLock[] threadLocks; -// public Condition[] threadConditions; - - //public ArrayList[] ownTasks; - - //public ArrayList[] ownSyncTasks; - - //ArrayList[] delayQueues; final Object querySupportLock; @@ -171,100 +159,31 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getOwnTask(ReadGraphImpl impl) { - Set ancestors = impl.ancestorSet(); - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(ancestors)) { - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } - - public SessionTask getSubTask(ReadGraphImpl impl) { - Set onlyThis = Collections.singleton(impl); - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(onlyThis)) { - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } - public boolean performPending(ReadGraphImpl graph) { - SessionTask task = getOwnTask(graph); + /* + * We are running errands while waiting for requests to complete. + * We can only run work that is part of the current root request to avoid any deadlocks + */ + public boolean performPending(ReadGraphImpl under) { + SessionTask task = scheduling.getSubTask(under); if(task != null) { - task.run(QueryProcessor.thread.get()); + task.run(thread.get()); return true; - } else { - return false; } + return false; } + + final public void scheduleNow(SessionTask request) { + SessionTask toExecute = scheduleOrReturnForExecution(request); + if(toExecute != null) + toExecute.run(thread.get()); + } -// final public void scheduleOwn(int caller, SessionTask request) { -// ownTasks[caller].add(request); -// } - - final public void schedule(SessionTask request) { - - //int performer = request.thread; - -// if(DebugPolicy.SCHEDULE) -// System.out.println("schedule " + request + " " + " -> " + performer); - - //assert(performer >= 0); - - assert(request != null); - -// if(caller == performer) { -// request.run(caller); -// } else { - -// if(performer == THREADS) { - - synchronized(querySupportLock) { - - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } - - freeScheduling.add(request); - - querySupportLock.notifyAll(); - - } - - return; - -// } -// -// ReentrantLock queueLock = threadLocks[performer]; -// queueLock.lock(); -// queues[performer].add(request); -// // This thread could have been sleeping -// if(queues[performer].size() == 1) { -// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet(); -// threadConditions[performer].signalAll(); -// } -// queueLock.unlock(); -// } + final public SessionTask scheduleOrReturnForExecution(SessionTask request) { + + return scheduling.scheduleOrReturnForExecution(request); - } + } final int THREADS; @@ -274,23 +193,24 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public static abstract class SessionTask { - public final ReadGraphImpl graph; - private Set ancestors; + final protected ReadGraphImpl rootGraph; private int counter = 0; + protected int position = 1; private Exception trace; - public SessionTask(ReadGraphImpl graph) { - this.graph = graph; - if(graph != null) graph.asyncBarrier.inc(); - } - - public boolean hasCommonParent(Set otherAncestors) { - if(graph == null) return false; - if(ancestors == null) ancestors = graph.ancestorSet(); - return !Collections.disjoint(ancestors, otherAncestors); + public SessionTask() { + this(null); } + + public SessionTask(ReadGraphImpl rootGraph) { + this.rootGraph = rootGraph; + } + + public boolean isSubtask(ReadGraphImpl graph) { + return graph.isParent(rootGraph); + } - public abstract void run0(int thread); + public abstract void run0(int thread); public final void run(int thread) { if(counter++ > 0) { @@ -304,12 +224,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap trace = new Exception(); } run0(thread); - if(graph != null) graph.asyncBarrier.dec(); + } + + public boolean maybeReady() { + return true; } @Override public String toString() { - return "SessionTask[" + graph.parent + "]"; + if(rootGraph == null) + return "SessionTask[no graph]"; + else + return "SessionTask[" + rootGraph.parent + "]"; } } @@ -327,40 +253,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - long waitingTime = 0; - - static int koss = 0; - static int koss2 = 0; - public boolean resume(ReadGraphImpl graph) { return executors[0].runSynchronized(); } - - //private WeakReference garbageTracker; - - private class GarbageTracker { - - @Override - protected void finalize() throws Throwable { - -// System.err.println("GarbageTracker"); -// -// garbageTracker = new WeakReference(new GarbageTracker()); - - super.finalize(); - - } - - } public QueryProcessor(final int threads, QuerySupport core, Set threadSet) throws DatabaseException { - //garbageTracker = new WeakReference(new GarbageTracker()); - THREADS = threads; THREAD_MASK = threads - 1; + scheduling = new Scheduling(requests); + querySupport = core; cache = new QueryCache(core, threads); session = querySupport.getSession(); @@ -368,31 +272,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap querySupportLock = core.getLock(); executors = new QueryThread[THREADS]; -// queues = new ArrayList[THREADS]; -// threadLocks = new ReentrantLock[THREADS]; -// threadConditions = new Condition[THREADS]; threadStates = new ThreadState[THREADS]; -// ownTasks = new ArrayList[THREADS]; -// ownSyncTasks = new ArrayList[THREADS]; -// delayQueues = new ArrayList[THREADS * THREADS]; - - // freeSchedule = new AtomicInteger(0); - -// for (int i = 0; i < THREADS * THREADS; i++) { -// delayQueues[i] = new ArrayList(); -// } for (int i = 0; i < THREADS; i++) { - - // tasks[i] = new ArrayList(); -// ownTasks[i] = new ArrayList(); -// ownSyncTasks[i] = new ArrayList(); -// queues[i] = new ArrayList(); -// threadLocks[i] = new ReentrantLock(); -// threadConditions[i] = threadLocks[i].newCondition(); - // limits[i] = false; threadStates[i] = ThreadState.INIT; - } for (int i = 0; i < THREADS; i++) { @@ -504,7 +387,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } final public void releaseWrite(ReadGraphImpl graph) { - performDirtyUpdates(graph); + propagateChangesInQueryCache(graph); modificationCounter++; } @@ -699,35 +582,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap boolean isBound(ExternalReadEntry entry) { if(entry.hasParents()) return true; - else if(hasListener(entry)) return true; + else if(listening.hasListener(entry)) return true; else return false; } - synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { - - if (parent != null && !inferred) { - try { - if(!child.isImmutable(graph)) - child.addParent(parent); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { - System.out.println(child + " -> " + parent); - } - } - } - - if (listener != null) { - return registerListener(child, listener, procedure); - } else { - return null; - } - - } - - static class Dummy implements InternalProcedure, IntProcedure { @Override @@ -929,181 +787,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { - - assert (entry != null); - - if (base.isDisposed()) - return null; - - return addListener(entry, base, procedure); - - } - - private void primeListenerEntry(final ListenerEntry entry, final Object result) { - entry.setLastKnown(result); - } - - private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { - - assert (entry != null); - assert (procedure != null); - - ArrayList list = cache.listeners.get(entry); - if (list == null) { - list = new ArrayList(1); - cache.listeners.put(entry, list); - } - - ListenerEntry result = new ListenerEntry(entry, base, procedure); - int currentIndex = list.indexOf(result); - // There was already a listener - if(currentIndex > -1) { - ListenerEntry current = list.get(currentIndex); - if(!current.base.isDisposed()) return null; - list.set(currentIndex, result); - } else { - list.add(result); - } - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); - } - } - - return result; - - } - - private void scheduleListener(ListenerEntry entry) { - assert (entry != null); - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Scheduled " + entry.procedure); - } - } - scheduledListeners.add(entry); - } - - private void removeListener(ListenerEntry entry) { - assert (entry != null); - ArrayList list = cache.listeners.get(entry.entry); - if(list == null) return; - boolean success = list.remove(entry); - assert (success); - if (list.isEmpty()) - cache.listeners.remove(entry.entry); - } - - private boolean hasListener(CacheEntry entry) { - if(cache.listeners.get(entry) != null) return true; - return false; - } - - boolean hasListenerAfterDisposing(CacheEntry entry) { - if(cache.listeners.get(entry) != null) { - ArrayList entries = cache.listeners.get(entry); - ArrayList list = null; - for (ListenerEntry e : entries) { - if (e.base.isDisposed()) { - if(list == null) list = new ArrayList(); - list.add(e); - } - } - if(list != null) { - for (ListenerEntry e : list) { - entries.remove(e); - } - } - if (entries.isEmpty()) { - cache.listeners.remove(entry); - return false; - } - return true; - } - return false; - } - - List getListenerEntries(CacheEntry entry) { - hasListenerAfterDisposing(entry); - if(cache.listeners.get(entry) != null) - return cache.listeners.get(entry); - else - return Collections.emptyList(); - } - - void processListenerReport(CacheEntry entry, Map> workarea) { - - if(!workarea.containsKey(entry)) { - - HashSet ls = new HashSet(); - for(ListenerEntry e : getListenerEntries(entry)) - ls.add(e.base); - - workarea.put(entry, ls); - - for(CacheEntry parent : entry.getParents(this)) { - processListenerReport(parent, workarea); - ls.addAll(workarea.get(parent)); - } - - } - - } - - public synchronized ListenerReport getListenerReport() throws IOException { - - class ListenerReportImpl implements ListenerReport { - - Map> workarea = new HashMap>(); - - @Override - public void print(PrintStream b) { - Map hist = new HashMap(); - for(Map.Entry> e : workarea.entrySet()) { - for(ListenerBase l : e.getValue()) { - Integer i = hist.get(l); - hist.put(l, i != null ? i-1 : -1); - } - } - - for(Pair p : CollectionUtils.valueSortedEntries(hist)) { - b.print("" + -p.second + " " + p.first + "\n"); - } - - b.flush(); - } - - } - - ListenerReportImpl result = new ListenerReportImpl(); - - Collection all = allCaches(new CacheCollectionResult()).toCollection(); - for(CacheEntryBase entry : all) { - hasListenerAfterDisposing(entry); - } - for(CacheEntryBase entry : all) { - processListenerReport(entry, result.workarea); - } - - return result; - - } - - public synchronized String reportListeners(File file) throws IOException { - - if (!isAlive()) - return "Disposed!"; - - PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); - ListenerReport report = getListenerReport(); - report.print(b); - - return "Done reporting listeners."; - - } void processParentReport(CacheEntry entry, Map> workarea) { @@ -1176,7 +859,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap int listeners = 0; for(CacheEntry entry : workarea.keySet()) { - boolean listener = hasListenerAfterDisposing(entry); + boolean listener = listening.hasListenerAfterDisposing(entry); boolean hasParents = entry.getParents(this).iterator().hasNext(); if(listener) { // Bound @@ -1266,11 +949,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap for(CacheEntry entry : workarea.keySet()) { Class clazz = entry.getClass(); - if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass(); - else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass(); - else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass(); - else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass(); - else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass(); + if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).id.getClass(); + else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).id.getClass(); + else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).id.getClass(); + else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).id.getClass(); + else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).id.getClass(); Integer c = counts.get(clazz); if(c == null) counts.put(clazz, -1); else counts.put(clazz, c-1); @@ -1340,7 +1023,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap b.print("// Entry parent listing\n"); for(CacheEntry entry : workarea.keySet()) { int status = flagMap.get(entry); - boolean hasListener = hasListenerAfterDisposing(entry); + boolean hasListener = listening.hasListenerAfterDisposing(entry); b.print("Q " + entry.toString()); if(hasListener) { b.print(" (L" + status + ")"); @@ -1386,22 +1069,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - class UpdateEntry { - - public CacheEntry caller; - - public CacheEntry entry; - - public int indent; - - public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) { - this.caller = caller; - this.entry = entry; - this.indent = indent; - } - - }; - boolean removeQuery(CacheEntry entry) { // This entry has been removed before. No need to do anything here. @@ -1452,17 +1119,17 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap return false; } - if (entry.isRefuted()) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - System.err.print("R"); - for (int i = 0; i < e.indent; i++) - System.err.print(" "); - System.err.println(entry.getQuery()); - } - } - return false; - } +// if (entry.isRefuted()) { +// if (Development.DEVELOPMENT) { +// if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { +// System.err.print("R"); +// for (int i = 0; i < e.indent; i++) +// System.err.print(" "); +// System.err.println(entry.getQuery()); +// } +// } +// return false; +// } if (entry.isExcepted()) { if (Development.DEVELOPMENT) { @@ -1494,11 +1161,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Query query = entry.getQuery(); int type = query.type(); - boolean hasListener = hasListener(entry); + boolean hasListener = listening.hasListener(entry); if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - if(hasListener(entry)) { + if(listening.hasListener(entry)) { System.err.println(" (L)"); } else { System.err.println(""); @@ -1545,43 +1212,23 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // System.err.println(" => FOO " + type); if (hasListener) { - ArrayList entries = cache.listeners.get(entry); + ArrayList entries = listening.listeners.get(entry); if(entries != null) { for (ListenerEntry le : entries) { - scheduleListener(le); + listening.scheduleListener(le); } } } // If invalid, update parents if (type == RequestFlags.INVALIDATE) { - updateParents(e.indent, entry, todo); + listening.updateParents(e.indent, entry, todo); } return hasListener; } - private void updateParents(int indent, CacheEntry entry, LinkedList todo) { - - Iterable oldParents = entry.getParents(this); - for (CacheEntry parent : oldParents) { -// System.err.println("updateParents " + entry + " => " + parent); - if(!parent.isDiscarded()) - todo.push(new UpdateEntry(entry, parent, indent + 2)); - } - - } - - private boolean pruneListener(ListenerEntry entry) { - if (entry.base.isDisposed()) { - removeListener(entry); - return true; - } else { - return false; - } - } - /** * @param av1 an array (guaranteed) * @param av2 any object @@ -1617,7 +1264,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap - final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) { + final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) { try { @@ -1632,8 +1279,9 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap entry.prepareRecompute(querySupport); ReadGraphImpl parentGraph = graph.forRecompute(entry); - + parentGraph.asyncBarrier.inc(); query.recompute(parentGraph); + parentGraph.asyncBarrier.dec(); if(entry.isExcepted()) return ListenerEntry.NO_VALUE; @@ -1681,93 +1329,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - public boolean hasScheduledUpdates() { - return !scheduledListeners.isEmpty(); - } - - public void performScheduledUpdates(WriteGraphImpl graph) { - - assert (!updating); - assert (!cache.collecting); - assert (!firingListeners); - - firingListeners = true; - - try { - - // Performing may cause further events to be scheduled. - while (!scheduledListeners.isEmpty()) { - -// graph.restart(); -// graph.state.barrier.inc(); - - // Clone current events to make new entries possible during - // firing. - THashSet entries = scheduledListeners; - scheduledListeners = new THashSet(); - - ArrayList schedule = new ArrayList(); - - for (ListenerEntry listenerEntry : entries) { - - if (pruneListener(listenerEntry)) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Pruned " + listenerEntry.procedure); - } - } - continue; - } - - final CacheEntry entry = listenerEntry.entry; - assert (entry != null); - - Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown()); - - if (newValue != ListenerEntry.NOT_CHANGED) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); - } - } - schedule.add(listenerEntry); - listenerEntry.setLastKnown(entry.getResult()); - } - - } - - for(ListenerEntry listenerEntry : schedule) { - final CacheEntry entry = listenerEntry.entry; - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure); - } - } - try { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); - } - } - entry.performFromCache(graph, listenerEntry.procedure); - } catch (Throwable t) { - t.printStackTrace(); - } - } - -// graph.state.barrier.dec(); -// graph.waitAsync(null); -// graph.state.barrier.assertReady(); - - } - - } finally { - firingListeners = false; - } - - } /** * @@ -1810,7 +1371,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE); if (newValue != ListenerEntry.NOT_CHANGED) - updateParents(0, immediate, todo); + listening.updateParents(0, immediate, todo); } else { @@ -1818,7 +1379,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Object newValue = compareTo(graph, immediate, oldValue); if (newValue != ListenerEntry.NOT_CHANGED) { - updateParents(0, immediate, todo); + listening.updateParents(0, immediate, todo); } else { // If not changed, keep the old value immediate.setResult(oldValue); @@ -1851,7 +1412,26 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap private Object primitiveUpdateLock = new Object(); private THashSet scheduledPrimitiveUpdates = new THashSet(); - public void performDirtyUpdates(final ReadGraphImpl graph) { + private ArrayList refutations = new ArrayList<>(); + + private void markForUpdate(ReadGraphImpl graph, CacheEntry e) { + e.refute(); + refutations.add(e); + } + + private void updateRefutations(ReadGraphImpl graph) { + + for(CacheEntry e : refutations) + update(graph, e); + + refutations.clear(); + + } + + public void propagateChangesInQueryCache(final ReadGraphImpl graph) { + + // Make sure that listening has performed its work + listening.sync(); cache.dirty = false; lastInvalidate = 0; @@ -1870,28 +1450,37 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final int subject = (int)(arg0 >>> 32); final int predicate = (int)(arg0 & 0xffffffff); - for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o); - for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o); - for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o); + for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o); + for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o); + for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o); if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) { PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject); - if(principalTypes != null) update(graph, principalTypes); + if(principalTypes != null) markForUpdate(graph, principalTypes); Types types = QueryCache.entryTypes(QueryProcessor.this, subject); - if(types != null) update(graph, types); + if(types != null) markForUpdate(graph, types); } if(predicate == subrelationOf) { SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject); - if(superRelations != null) update(graph, superRelations); + if(superRelations != null) markForUpdate(graph, superRelations); } DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject); - if(dp != null) update(graph, dp); + if(dp != null) markForUpdate(graph, dp); OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate); - if(os != null) update(graph, os); + if(os != null) markForUpdate(graph, os); + updateRefutations(graph); + scheduledObjectUpdates.clear(); + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + System.err.println("== Query update ends =="); + } + } + return; } @@ -1902,9 +1491,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap int arg0 = scheduledValueUpdates.getFirst(); ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0); - if(valueQuery != null) update(graph, valueQuery); + if(valueQuery != null) markForUpdate(graph, valueQuery); + + updateRefutations(graph); scheduledValueUpdates.clear(); + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + System.err.println("== Query update ends =="); + } + } + return; } @@ -1918,30 +1516,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap scheduledPrimitiveUpdates = new THashSet(); } - primitiveUpdates.forEach(new TObjectProcedure() { - - @Override - public boolean execute(Object arg0) { - - ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0); - if (query != null) { - boolean listening = update(graph, query); - if (!listening && !query.hasParents()) { - cache.externalReadEntryMap.remove(arg0); - query.discard(); - } - } - return true; - } - - }); - scheduledValueUpdates.forEach(new TIntProcedure() { @Override public boolean execute(int arg0) { ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0); - if(valueQuery != null) update(graph, valueQuery); + if(valueQuery != null) markForUpdate(graph, valueQuery); return true; } @@ -1953,15 +1533,15 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public boolean execute(int resource) { ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource); - if(valueQuery != null) update(graph, valueQuery); + if(valueQuery != null) markForUpdate(graph, valueQuery); PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource); - if(principalTypes != null) update(graph, principalTypes); + if(principalTypes != null) markForUpdate(graph, principalTypes); Types types = QueryCache.entryTypes(QueryProcessor.this, resource); - if(types != null) update(graph, types); + if(types != null) markForUpdate(graph, types); SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource); - if(superRelations != null) update(graph, superRelations); + if(superRelations != null) markForUpdate(graph, superRelations); predicates.add(resource); @@ -1980,14 +1560,14 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) { PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject); - if(principalTypes != null) update(graph, principalTypes); + if(principalTypes != null) markForUpdate(graph, principalTypes); Types types = QueryCache.entryTypes(QueryProcessor.this, subject); - if(types != null) update(graph, types); + if(types != null) markForUpdate(graph, types); } if(predicate == subrelationOf) { SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject); - if(superRelations != null) update(graph, superRelations); + if(superRelations != null) markForUpdate(graph, superRelations); } predicates.add(subject); @@ -2004,12 +1584,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override public boolean execute(final int subject) { - for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o); - for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o); - for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o); + for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o); + for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o); + for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o); DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject); - if(entry != null) update(graph, entry); + if(entry != null) markForUpdate(graph, entry); return true; @@ -2023,7 +1603,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public boolean execute(int orderedSet) { OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet); - if(entry != null) update(graph, entry); + if(entry != null) markForUpdate(graph, entry); return true; @@ -2031,21 +1611,35 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap }); - // for (Integer subject : predicates) { - // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject); - // if(entry != null) update(graph, entry); - // } + updateRefutations(graph); + primitiveUpdates.forEach(new TObjectProcedure() { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - System.err.println("== Query update ends =="); + @Override + public boolean execute(Object arg0) { + + ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0); + if (query != null) { + boolean listening = update(graph, query); + if (!listening && !query.hasParents()) { + cache.externalReadEntryMap.remove(arg0); + query.discard(); + } + } + return true; } - } + }); + scheduledValueUpdates.clear(); scheduledObjectUpdates.clear(); scheduledInvalidates.clear(); + + if (Development.DEVELOPMENT) { + if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { + System.err.println("== Query update ends =="); + } + } } @@ -2087,6 +1681,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override protected void doDispose() { + requests.release(Integer.MAX_VALUE / 2); + for(int index = 0; index < THREADS; index++) { executors[index].dispose(); } @@ -2260,12 +1856,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Exception callerException = null; - public interface AsyncBarrier { - public void inc(); - public void dec(); - // public void inc(String debug); - // public void dec(String debug); - } + public interface AsyncBarrier { + public void inc(); + public void dec(); + public void waitBarrier(Object request, ReadGraphImpl impl); + public boolean isBlocking(); + } // final public QueryProcessor processor; // final public QuerySupport support; @@ -4458,6 +4054,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap return L0; } + public Layer0 getL0() { + return L0; + } + public static ThreadLocal thread = new ThreadLocal() { protected Integer initialValue() { return -1;