X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;fp=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=7bfa0f4c6195659c21efdb0e6bcf99fed0dfb7bb;hp=9b54d15f6996fbf708a5fce9fc5ea00fd467e0ba;hb=0db1af75cd14f99f16c42c920562708abf37be93;hpb=b000e272429e157638c0384878b07b8dcd758472 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 9b54d15f6..7bfa0f4c6 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -44,7 +44,6 @@ import org.simantics.db.Statement; import org.simantics.db.VirtualGraph; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.utils.Logger; -import org.simantics.db.debug.ListenerReport; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ManyObjectsForFunctionalRelationException; import org.simantics.db.exception.NoInverseException; @@ -53,7 +52,6 @@ import org.simantics.db.impl.ResourceImpl; import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.ReadGraphSupport; -import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.procedure.IntProcedureAdapter; import org.simantics.db.impl.procedure.InternalProcedure; import org.simantics.db.impl.procedure.TripleIntProcedureAdapter; @@ -130,17 +128,15 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap AtomicInteger sleepers = new AtomicInteger(0); - private boolean updating = false; + boolean updating = false; - private boolean firingListeners = false; - final public QueryCache cache; final public QuerySupport querySupport; final public Session session; final public ResourceSupport resourceSupport; - - private THashSet scheduledListeners = new THashSet(); + + final public QueryListening listening = new QueryListening(this); QueryThread[] executors; @@ -504,7 +500,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } final public void releaseWrite(ReadGraphImpl graph) { - performDirtyUpdates(graph); + propagateChangesInQueryCache(graph); modificationCounter++; } @@ -699,35 +695,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap boolean isBound(ExternalReadEntry entry) { if(entry.hasParents()) return true; - else if(hasListener(entry)) return true; + else if(listening.hasListener(entry)) return true; else return false; } - synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { - - if (parent != null && !inferred) { - try { - if(!child.isImmutable(graph)) - child.addParent(parent); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) { - System.out.println(child + " -> " + parent); - } - } - } - - if (listener != null) { - return registerListener(child, listener, procedure); - } else { - return null; - } - - } - - static class Dummy implements InternalProcedure, IntProcedure { @Override @@ -929,181 +900,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { - - assert (entry != null); - - if (base.isDisposed()) - return null; - - return addListener(entry, base, procedure); - - } - - private void primeListenerEntry(final ListenerEntry entry, final Object result) { - entry.setLastKnown(result); - } - - private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { - - assert (entry != null); - assert (procedure != null); - - ArrayList list = cache.listeners.get(entry); - if (list == null) { - list = new ArrayList(1); - cache.listeners.put(entry, list); - } - - ListenerEntry result = new ListenerEntry(entry, base, procedure); - int currentIndex = list.indexOf(result); - // There was already a listener - if(currentIndex > -1) { - ListenerEntry current = list.get(currentIndex); - if(!current.base.isDisposed()) return null; - list.set(currentIndex, result); - } else { - list.add(result); - } - - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); - } - } - - return result; - - } - - private void scheduleListener(ListenerEntry entry) { - assert (entry != null); - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Scheduled " + entry.procedure); - } - } - scheduledListeners.add(entry); - } - - private void removeListener(ListenerEntry entry) { - assert (entry != null); - ArrayList list = cache.listeners.get(entry.entry); - if(list == null) return; - boolean success = list.remove(entry); - assert (success); - if (list.isEmpty()) - cache.listeners.remove(entry.entry); - } - - private boolean hasListener(CacheEntry entry) { - if(cache.listeners.get(entry) != null) return true; - return false; - } - - boolean hasListenerAfterDisposing(CacheEntry entry) { - if(cache.listeners.get(entry) != null) { - ArrayList entries = cache.listeners.get(entry); - ArrayList list = null; - for (ListenerEntry e : entries) { - if (e.base.isDisposed()) { - if(list == null) list = new ArrayList(); - list.add(e); - } - } - if(list != null) { - for (ListenerEntry e : list) { - entries.remove(e); - } - } - if (entries.isEmpty()) { - cache.listeners.remove(entry); - return false; - } - return true; - } - return false; - } - - List getListenerEntries(CacheEntry entry) { - hasListenerAfterDisposing(entry); - if(cache.listeners.get(entry) != null) - return cache.listeners.get(entry); - else - return Collections.emptyList(); - } - - void processListenerReport(CacheEntry entry, Map> workarea) { - - if(!workarea.containsKey(entry)) { - - HashSet ls = new HashSet(); - for(ListenerEntry e : getListenerEntries(entry)) - ls.add(e.base); - - workarea.put(entry, ls); - - for(CacheEntry parent : entry.getParents(this)) { - processListenerReport(parent, workarea); - ls.addAll(workarea.get(parent)); - } - - } - - } - - public synchronized ListenerReport getListenerReport() throws IOException { - - class ListenerReportImpl implements ListenerReport { - - Map> workarea = new HashMap>(); - - @Override - public void print(PrintStream b) { - Map hist = new HashMap(); - for(Map.Entry> e : workarea.entrySet()) { - for(ListenerBase l : e.getValue()) { - Integer i = hist.get(l); - hist.put(l, i != null ? i-1 : -1); - } - } - - for(Pair p : CollectionUtils.valueSortedEntries(hist)) { - b.print("" + -p.second + " " + p.first + "\n"); - } - - b.flush(); - } - - } - - ListenerReportImpl result = new ListenerReportImpl(); - - Collection all = allCaches(new CacheCollectionResult()).toCollection(); - for(CacheEntryBase entry : all) { - hasListenerAfterDisposing(entry); - } - for(CacheEntryBase entry : all) { - processListenerReport(entry, result.workarea); - } - - return result; - - } - - public synchronized String reportListeners(File file) throws IOException { - - if (!isAlive()) - return "Disposed!"; - - PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file))); - ListenerReport report = getListenerReport(); - report.print(b); - - return "Done reporting listeners."; - - } void processParentReport(CacheEntry entry, Map> workarea) { @@ -1176,7 +972,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap int listeners = 0; for(CacheEntry entry : workarea.keySet()) { - boolean listener = hasListenerAfterDisposing(entry); + boolean listener = listening.hasListenerAfterDisposing(entry); boolean hasParents = entry.getParents(this).iterator().hasNext(); if(listener) { // Bound @@ -1340,7 +1136,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap b.print("// Entry parent listing\n"); for(CacheEntry entry : workarea.keySet()) { int status = flagMap.get(entry); - boolean hasListener = hasListenerAfterDisposing(entry); + boolean hasListener = listening.hasListenerAfterDisposing(entry); b.print("Q " + entry.toString()); if(hasListener) { b.print(" (L" + status + ")"); @@ -1386,22 +1182,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - class UpdateEntry { - - public CacheEntry caller; - - public CacheEntry entry; - - public int indent; - - public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) { - this.caller = caller; - this.entry = entry; - this.indent = indent; - } - - }; - boolean removeQuery(CacheEntry entry) { // This entry has been removed before. No need to do anything here. @@ -1494,11 +1274,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Query query = entry.getQuery(); int type = query.type(); - boolean hasListener = hasListener(entry); + boolean hasListener = listening.hasListener(entry); if (Development.DEVELOPMENT) { if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) { - if(hasListener(entry)) { + if(listening.hasListener(entry)) { System.err.println(" (L)"); } else { System.err.println(""); @@ -1545,43 +1325,23 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // System.err.println(" => FOO " + type); if (hasListener) { - ArrayList entries = cache.listeners.get(entry); + ArrayList entries = listening.listeners.get(entry); if(entries != null) { for (ListenerEntry le : entries) { - scheduleListener(le); + listening.scheduleListener(le); } } } // If invalid, update parents if (type == RequestFlags.INVALIDATE) { - updateParents(e.indent, entry, todo); + listening.updateParents(e.indent, entry, todo); } return hasListener; } - private void updateParents(int indent, CacheEntry entry, LinkedList todo) { - - Iterable oldParents = entry.getParents(this); - for (CacheEntry parent : oldParents) { -// System.err.println("updateParents " + entry + " => " + parent); - if(!parent.isDiscarded()) - todo.push(new UpdateEntry(entry, parent, indent + 2)); - } - - } - - private boolean pruneListener(ListenerEntry entry) { - if (entry.base.isDisposed()) { - removeListener(entry); - return true; - } else { - return false; - } - } - /** * @param av1 an array (guaranteed) * @param av2 any object @@ -1617,7 +1377,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap - final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) { + final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) { try { @@ -1681,93 +1441,6 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - public boolean hasScheduledUpdates() { - return !scheduledListeners.isEmpty(); - } - - public void performScheduledUpdates(WriteGraphImpl graph) { - - assert (!updating); - assert (!cache.collecting); - assert (!firingListeners); - - firingListeners = true; - - try { - - // Performing may cause further events to be scheduled. - while (!scheduledListeners.isEmpty()) { - -// graph.restart(); -// graph.state.barrier.inc(); - - // Clone current events to make new entries possible during - // firing. - THashSet entries = scheduledListeners; - scheduledListeners = new THashSet(); - - ArrayList schedule = new ArrayList(); - - for (ListenerEntry listenerEntry : entries) { - - if (pruneListener(listenerEntry)) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Pruned " + listenerEntry.procedure); - } - } - continue; - } - - final CacheEntry entry = listenerEntry.entry; - assert (entry != null); - - Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown()); - - if (newValue != ListenerEntry.NOT_CHANGED) { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - new Exception().printStackTrace(); - System.err.println("Add to schedule " + listenerEntry.procedure + " with " + newValue); - } - } - schedule.add(listenerEntry); - listenerEntry.setLastKnown(entry.getResult()); - } - - } - - for(ListenerEntry listenerEntry : schedule) { - final CacheEntry entry = listenerEntry.entry; - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure); - } - } - try { - if (Development.DEVELOPMENT) { - if(Development.getProperty(DevelopmentKeys.QUERYPROCESSOR_LISTENERS, Bindings.BOOLEAN)) { - System.err.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry); - } - } - entry.performFromCache(graph, listenerEntry.procedure); - } catch (Throwable t) { - t.printStackTrace(); - } - } - -// graph.state.barrier.dec(); -// graph.waitAsync(null); -// graph.state.barrier.assertReady(); - - } - - } finally { - firingListeners = false; - } - - } /** * @@ -1810,7 +1483,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE); if (newValue != ListenerEntry.NOT_CHANGED) - updateParents(0, immediate, todo); + listening.updateParents(0, immediate, todo); } else { @@ -1818,7 +1491,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap Object newValue = compareTo(graph, immediate, oldValue); if (newValue != ListenerEntry.NOT_CHANGED) { - updateParents(0, immediate, todo); + listening.updateParents(0, immediate, todo); } else { // If not changed, keep the old value immediate.setResult(oldValue); @@ -1867,7 +1540,10 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } - public void performDirtyUpdates(final ReadGraphImpl graph) { + public void propagateChangesInQueryCache(final ReadGraphImpl graph) { + + // Make sure that listening has performed its work + listening.sync(); cache.dirty = false; lastInvalidate = 0;