X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=eaaa9b3730976265c0673dc4547bbe35b3d4904c;hp=55855ee3e823bed936c00c558848ceaf05a04915;hb=90a52d58489b6c518b13dde238923125c8f63eec;hpb=6e2d3bc019eb7bcf8a75c8ea6f5f2cfadd159359 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 55855ee3e..eaaa9b373 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -44,7 +44,6 @@ import org.simantics.db.Statement; import org.simantics.db.VirtualGraph; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.utils.Logger; -import org.simantics.db.debug.ListenerReport; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ManyObjectsForFunctionalRelationException; import org.simantics.db.exception.NoInverseException; @@ -53,7 +52,6 @@ import org.simantics.db.impl.ResourceImpl; import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.ReadGraphSupport; -import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.procedure.IntProcedureAdapter; import org.simantics.db.impl.procedure.InternalProcedure; import org.simantics.db.impl.procedure.TripleIntProcedureAdapter; @@ -130,23 +128,23 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap AtomicInteger sleepers = new AtomicInteger(0); - private boolean updating = false; + boolean updating = false; - private boolean firingListeners = false; - final public QueryCache cache; final public QuerySupport querySupport; final public Session session; final public ResourceSupport resourceSupport; - - private THashSet scheduledListeners = new THashSet(); + + final public Semaphore requests = new Semaphore(1); + + final public QueryListening listening = new QueryListening(this); QueryThread[] executors; - -// public ArrayList[] queues; public LinkedList freeScheduling = new LinkedList(); + + public LinkedList topLevelTasks = new LinkedList(); enum ThreadState { @@ -155,14 +153,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } public ThreadState[] threadStates; -// public ReentrantLock[] threadLocks; -// public Condition[] threadConditions; - - //public ArrayList[] ownTasks; - - //public ArrayList[] ownSyncTasks; - - //ArrayList[] delayQueues; final Object querySupportLock; @@ -171,28 +161,12 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - public SessionTask getOwnTask(ReadGraphImpl impl) { - Set ancestors = impl.ancestorSet(); - synchronized(querySupportLock) { - int index = 0; - while(index < freeScheduling.size()) { - SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(ancestors)) { - return freeScheduling.remove(index); - } - index++; - } - } - return null; - } - - public SessionTask getSubTask(ReadGraphImpl impl) { - Set onlyThis = Collections.singleton(impl); + public SessionTask getSubTask(ReadGraphImpl parent) { synchronized(querySupportLock) { int index = 0; while(index < freeScheduling.size()) { SessionTask task = freeScheduling.get(index); - if(task.hasCommonParent(onlyThis)) { + if(task.isSubtask(parent) && task.maybeReady()) { return freeScheduling.remove(index); } index++; @@ -201,70 +175,50 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap return null; } - public boolean performPending(ReadGraphImpl graph) { - SessionTask task = getOwnTask(graph); + /* + * We are running errands while waiting for requests to complete. + * We can only run work that is part of the current root request to avoid any deadlocks + */ + public boolean performPending(ReadGraphImpl under) { + SessionTask task = getSubTask(under); if(task != null) { - task.run(QueryProcessor.thread.get()); + task.run(thread.get()); return true; - } else { - return false; } + return false; } + + final public void scheduleNow(SessionTask request) { + SessionTask toExecute = scheduleOrReturnForExecution(request); + if(toExecute != null) + toExecute.run(thread.get()); + } -// final public void scheduleOwn(int caller, SessionTask request) { -// ownTasks[caller].add(request); -// } + final public SessionTask scheduleOrReturnForExecution(SessionTask request) { - final public void schedule(SessionTask request) { - - //int performer = request.thread; + assert(request != null); -// if(DebugPolicy.SCHEDULE) -// System.out.println("schedule " + request + " " + " -> " + performer); + synchronized(querySupportLock) { - //assert(performer >= 0); + LinkedList queue = request.rootGraph != null ? freeScheduling : topLevelTasks; + + if(BarrierTracing.BOOKKEEPING) { + Exception current = new Exception(); + Exception previous = BarrierTracing.tasks.put(request, current); + if(previous != null) { + previous.printStackTrace(); + current.printStackTrace(); + } + } - assert(request != null); + queue.addFirst(request); + requests.release(); -// if(caller == performer) { -// request.run(caller); -// } else { - -// if(performer == THREADS) { - - synchronized(querySupportLock) { - - if(BarrierTracing.BOOKKEEPING) { - Exception current = new Exception(); - Exception previous = BarrierTracing.tasks.put(request, current); - if(previous != null) { - previous.printStackTrace(); - current.printStackTrace(); - } - } - - freeScheduling.add(request); - - querySupportLock.notifyAll(); - - } + } - return; - -// } -// -// ReentrantLock queueLock = threadLocks[performer]; -// queueLock.lock(); -// queues[performer].add(request); -// // This thread could have been sleeping -// if(queues[performer].size() == 1) { -// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet(); -// threadConditions[performer].signalAll(); -// } -// queueLock.unlock(); -// } + return null; - } + } final int THREADS; @@ -274,23 +228,24 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public static abstract class SessionTask { - public final ReadGraphImpl graph; - private Set ancestors; + final protected ReadGraphImpl rootGraph; private int counter = 0; + protected int position = 1; private Exception trace; - public SessionTask(ReadGraphImpl graph) { - this.graph = graph; - if(graph != null) graph.asyncBarrier.inc(); - } - - public boolean hasCommonParent(Set otherAncestors) { - if(graph == null) return false; - if(ancestors == null) ancestors = graph.ancestorSet(); - return !Collections.disjoint(ancestors, otherAncestors); + public SessionTask() { + this(null); } + + public SessionTask(ReadGraphImpl rootGraph) { + this.rootGraph = rootGraph; + } + + public boolean isSubtask(ReadGraphImpl graph) { + return graph.isParent(rootGraph); + } - public abstract void run0(int thread); + public abstract void run0(int thread); public final void run(int thread) { if(counter++ > 0) { @@ -304,12 +259,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap trace = new Exception(); } run0(thread); - if(graph != null) graph.asyncBarrier.dec(); + } + + public boolean maybeReady() { + return true; } @Override public String toString() { - return "SessionTask[" + graph.parent + "]"; + if(rootGraph == null) + return "SessionTask[no graph]"; + else + return "SessionTask[" + rootGraph.parent + "]"; } } @@ -327,37 +288,13 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - long waitingTime = 0; - - static int koss = 0; - static int koss2 = 0; - public boolean resume(ReadGraphImpl graph) { return executors[0].runSynchronized(); } - - //private WeakReference garbageTracker; - - private class GarbageTracker { - - @Override - protected void finalize() throws Throwable { - -// System.err.println("GarbageTracker"); -// -// garbageTracker = new WeakReference(new GarbageTracker()); - - super.finalize(); - - } - - } public QueryProcessor(final int threads, QuerySupport core, Set threadSet) throws DatabaseException { - //garbageTracker = new WeakReference(new GarbageTracker()); - THREADS = threads; THREAD_MASK = threads - 1; @@ -368,31 +305,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap querySupportLock = core.getLock(); executors = new QueryThread[THREADS]; -// queues = new ArrayList[THREADS]; -// threadLocks = new ReentrantLock[THREADS]; -// threadConditions = new Condition[THREADS]; threadStates = new ThreadState[THREADS]; -// ownTasks = new ArrayList[THREADS]; -// ownSyncTasks = new ArrayList[THREADS]; -// delayQueues = new ArrayList[THREADS * THREADS]; - - // freeSchedule = new AtomicInteger(0); - -// for (int i = 0; i < THREADS * THREADS; i++) { -// delayQueues[i] = new ArrayList(); -// } for (int i = 0; i < THREADS; i++) { - - // tasks[i] = new ArrayList(); -// ownTasks[i] = new ArrayList(); -// ownSyncTasks[i] = new ArrayList(); -// queues[i] = new ArrayList(); -// threadLocks[i] = new ReentrantLock(); -// threadConditions[i] = threadLocks[i].newCondition(); - // limits[i] = false; threadStates[i] = ThreadState.INIT; - } for (int i = 0; i < THREADS; i++) { @@ -504,7 +420,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } final public void releaseWrite(ReadGraphImpl graph) { - performDirtyUpdates(graph); + propagateChangesInQueryCache(graph); modificationCounter++; } @@ -699,35 +615,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap boolean isBound(ExternalReadEntry entry) { if(entry.hasParents()) return true; - else if(hasListener(entry)) return true; + else if(listening.hasListener(entry)) return true; else return false; } - synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { - - if (parent != null && !inferred) { - try { - if(!child.isImmutable(graph)) - child.addParent(parent); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { - System.out.println(child + " -> " + parent); - } - } - } - - if (listener != null) { - return registerListener(child, listener, procedure); - } else { - return null; - } - - } - - static class Dummy implements InternalProcedure, IntProcedure { @Override @@ -929,181 +820,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { - - assert (entry != null); - - if (base.isDisposed()) - return null; - - return addListener(entry, base, procedure); - - } - - private void primeListenerEntry(final ListenerEntry entry, final Object result) { - entry.setLastKnown(result); - } - - private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { - - assert (entry != null); - assert (procedure != null); - - ArrayList list = cache.listeners.get(entry); - if (list == null) { - list = new ArrayList(1); - cache.listeners.put(entry, list); - } - - ListenerEntry result = new ListenerEntry(entry, base, procedure); - int currentIndex = list.indexOf(result); - // There was already a listener - if(currentIndex > -1) { - ListenerEntry current = list.get(currentIndex); - if(!current.base.isDisposed()) return null; - list.set(currentIndex, result); - } else { - list.add(result); - } - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); - } - } - - return result; - - } - - private void scheduleListener(ListenerEntry entry) { - assert (entry != null); - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Scheduled " + entry.procedure); - } - } - scheduledListeners.add(entry); - } - - private void removeListener(ListenerEntry entry) { - assert (entry != null); - ArrayList list = cache.listeners.get(entry.entry); - if(list == null) return; - boolean success = list.remove(entry); - assert (success); - if (list.isEmpty()) - cache.listeners.remove(entry.entry); - } - - private boolean hasListener(CacheEntry entry) { - if(cache.listeners.get(entry) != null) return true; - return false; - } - - boolean hasListenerAfterDisposing(CacheEntry entry) { - if(cache.listeners.get(entry) != null) { - ArrayList entries = cache.listeners.get(entry); - ArrayList list = null; - for (ListenerEntry e : entries) { - if (e.base.isDisposed()) { - if(list == null) list = new ArrayList(); - list.add(e); - } - } - if(list != null) { - for (ListenerEntry e : list) { - entries.remove(e); - } - } - if (entries.isEmpty()) { - cache.listeners.remove(entry); - return false; - } - return true; - } - return false; - } - - List getListenerEntries(CacheEntry entry) { - hasListenerAfterDisposing(entry); - if(cache.listeners.get(entry) != null) - return cache.listeners.get(entry); - else - return Collections.emptyList(); - } - - void processListenerReport(CacheEntry entry, Map> workarea) { - - if(!workarea.containsKey(entry)) { - - HashSet ls = new HashSet(); - for(ListenerEntry e : getListenerEntries(entry)) - ls.add(e.base); - - workarea.put(entry, ls); - - for(CacheEntry parent : entry.getParents(this)) { - processListenerReport(parent, workarea); - ls.addAll(workarea.get(parent)); - } - - } - - } - - public synchronized ListenerReport getListenerReport() throws IOException { - - class ListenerReportImpl implements ListenerReport { - - Map> workarea = new HashMap>(); - - @Override - public void print(PrintStream b) { - Map hist = new HashMap(); - for(Map.Entry> e : workarea.entrySet()) { - for(ListenerBase l : e.getValue()) { - Integer i = hist.get(l); - hist.put(l, i != null ? i-1 : -1); - } - } - - for(Pair p : CollectionUtils.valueSortedEntries(hist)) { - b.print("" + -p.second + " " + p.first + "\n"); - } - - b.flush(); - } - - } - - ListenerReportImpl result = new ListenerReportImpl(); - - Collection all = allCaches(new CacheCollectionResult()).toCollection(); - for(CacheEntryBase entry : all) { - hasListenerAfterDisposing(entry); - } - for(CacheEntryBase entry : all) { - processListenerReport(entry, result.workarea); - } - - return result; - - } - - public synchronized String reportListeners(File file) throws IOException { - - if (!isAlive()) - return "Disposed!"; - - PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); - ListenerReport report = getListenerReport(); - report.print(b); - - return "Done reporting listeners."; - - } void processParentReport(CacheEntry entry, Map> workarea) { @@ -1176,7 +892,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap int listeners = 0; for(CacheEntry entry : workarea.keySet()) { - boolean listener = hasListenerAfterDisposing(entry); + boolean listener = listening.hasListenerAfterDisposing(entry); boolean hasParents = entry.getParents(this).iterator().hasNext(); if(listener) { // Bound @@ -1266,11 +982,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap for(CacheEntry entry : workarea.keySet()) { Class clazz = entry.getClass(); - if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass(); - else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass(); - else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass(); - else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass(); - else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass(); + if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).id.getClass(); + else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).id.getClass(); + else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).id.getClass(); + else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).id.getClass(); + else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).id.getClass(); Integer c = counts.get(clazz); if(c == null) counts.put(clazz, -1); else counts.put(clazz, c-1); @@ -1340,7 +1056,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap b.print("// Entry parent listing\n"); for(CacheEntry entry : workarea.keySet()) { int status = flagMap.get(entry); - boolean hasListener = hasListenerAfterDisposing(entry); + boolean hasListener = listening.hasListenerAfterDisposing(entry); b.print("Q " + entry.toString()); if(hasListener) { b.print(" (L" + status + ")"); @@ -1386,22 +1102,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - class UpdateEntry { - - public CacheEntry caller; - - public CacheEntry entry; - - public int indent; - - public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) { - this.caller = caller; - this.entry = entry; - this.indent = indent; - } - - }; - boolean removeQuery(CacheEntry entry) { // This entry has been removed before. No need to do anything here. @@ -1494,11 +1194,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Query query = entry.getQuery(); int type = query.type(); - boolean hasListener = hasListener(entry); + boolean hasListener = listening.hasListener(entry); if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - if(hasListener(entry)) { + if(listening.hasListener(entry)) { System.err.println(" (L)"); } else { System.err.println(""); @@ -1545,43 +1245,23 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // System.err.println(" => FOO " + type); if (hasListener) { - ArrayList entries = cache.listeners.get(entry); + ArrayList entries = listening.listeners.get(entry); if(entries != null) { for (ListenerEntry le : entries) { - scheduleListener(le); + listening.scheduleListener(le); } } } // If invalid, update parents if (type == RequestFlags.INVALIDATE) { - updateParents(e.indent, entry, todo); + listening.updateParents(e.indent, entry, todo); } return hasListener; } - private void updateParents(int indent, CacheEntry entry, LinkedList todo) { - - Iterable oldParents = entry.getParents(this); - for (CacheEntry parent : oldParents) { -// System.err.println("updateParents " + entry + " => " + parent); - if(!parent.isDiscarded()) - todo.push(new UpdateEntry(entry, parent, indent + 2)); - } - - } - - private boolean pruneListener(ListenerEntry entry) { - if (entry.base.isDisposed()) { - removeListener(entry); - return true; - } else { - return false; - } - } - /** * @param av1 an array (guaranteed) * @param av2 any object @@ -1617,7 +1297,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap - final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) { + final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) { try { @@ -1681,93 +1361,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - public boolean hasScheduledUpdates() { - return !scheduledListeners.isEmpty(); - } - - public void performScheduledUpdates(WriteGraphImpl graph) { - - assert (!updating); - assert (!cache.collecting); - assert (!firingListeners); - - firingListeners = true; - - try { - - // Performing may cause further events to be scheduled. - while (!scheduledListeners.isEmpty()) { - -// graph.restart(); -// graph.state.barrier.inc(); - - // Clone current events to make new entries possible during - // firing. - THashSet entries = scheduledListeners; - scheduledListeners = new THashSet(); - - ArrayList schedule = new ArrayList(); - - for (ListenerEntry listenerEntry : entries) { - - if (pruneListener(listenerEntry)) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Pruned " + listenerEntry.procedure); - } - } - continue; - } - - final CacheEntry entry = listenerEntry.entry; - assert (entry != null); - - Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown()); - - if (newValue != ListenerEntry.NOT_CHANGED) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); - } - } - schedule.add(listenerEntry); - listenerEntry.setLastKnown(entry.getResult()); - } - - } - - for(ListenerEntry listenerEntry : schedule) { - final CacheEntry entry = listenerEntry.entry; - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure); - } - } - try { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); - } - } - entry.performFromCache(graph, listenerEntry.procedure); - } catch (Throwable t) { - t.printStackTrace(); - } - } - -// graph.state.barrier.dec(); -// graph.waitAsync(null); -// graph.state.barrier.assertReady(); - - } - - } finally { - firingListeners = false; - } - - } /** * @@ -1810,7 +1403,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE); if (newValue != ListenerEntry.NOT_CHANGED) - updateParents(0, immediate, todo); + listening.updateParents(0, immediate, todo); } else { @@ -1818,7 +1411,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Object newValue = compareTo(graph, immediate, oldValue); if (newValue != ListenerEntry.NOT_CHANGED) { - updateParents(0, immediate, todo); + listening.updateParents(0, immediate, todo); } else { // If not changed, keep the old value immediate.setResult(oldValue); @@ -1867,7 +1460,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - public void performDirtyUpdates(final ReadGraphImpl graph) { + public void propagateChangesInQueryCache(final ReadGraphImpl graph) { + + // Make sure that listening has performed its work + listening.sync(); cache.dirty = false; lastInvalidate = 0; @@ -2117,6 +1713,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override protected void doDispose() { + requests.release(Integer.MAX_VALUE / 2); + for(int index = 0; index < THREADS; index++) { executors[index].dispose(); }