X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryProcessor.java;h=5a37257e2531c051c39de0bea04af271e1adccce;hb=691db89ccddae3006529c229c829dd2fd027c35c;hp=9c60691fad7a0a850228997e85ffb47168ca1aae;hpb=9f0fd59be54719b1fe9322d8fd37e4950857308c;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java index 9c60691fa..5a37257e2 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java @@ -51,6 +51,7 @@ import org.simantics.db.exception.NoInverseException; import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.impl.DebugPolicy; import org.simantics.db.impl.ResourceImpl; +import org.simantics.db.impl.graph.BarrierTracing; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.ReadGraphSupport; import org.simantics.db.impl.graph.WriteGraphImpl; @@ -171,23 +172,40 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap public void close() { } - SessionTask getOwnTask(int thread) { + public SessionTask getOwnTask(ReadGraphImpl impl) { + Set ancestors = impl.ancestorSet(); synchronized(querySupportLock) { int index = 0; while(index < freeScheduling.size()) { SessionTask task = freeScheduling.get(index); - if(task.thread == thread && !task.systemCall) + if(task.hasCommonParent(ancestors)) { return freeScheduling.remove(index); + } index++; } } return null; } - - public boolean performPending(int thread) { - SessionTask task = getOwnTask(thread); + + public SessionTask getSubTask(ReadGraphImpl impl) { + Set onlyThis = Collections.singleton(impl); + synchronized(querySupportLock) { + int index = 0; + while(index < freeScheduling.size()) { + SessionTask task = freeScheduling.get(index); + if(task.hasCommonParent(onlyThis)) { + return freeScheduling.remove(index); + } + index++; + } + } + return null; + } + + public boolean performPending(ReadGraphImpl graph) { + SessionTask task = getOwnTask(graph); if(task != null) { - task.run(thread); + task.run(QueryProcessor.thread.get()); return true; } else { return false; @@ -199,11 +217,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // } final public void schedule(SessionTask request) { + + //int performer = request.thread; - int performer = request.thread; - - if(DebugPolicy.SCHEDULE) - System.out.println("schedule " + request + " " + " -> " + performer); +// if(DebugPolicy.SCHEDULE) +// System.out.println("schedule " + request + " " + " -> " + performer); //assert(performer >= 0); @@ -216,24 +234,20 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap // if(performer == THREADS) { synchronized(querySupportLock) { - - //new Exception().printStackTrace(); - + + if(BarrierTracing.BOOKKEEPING) { + Exception current = new Exception(); + Exception previous = BarrierTracing.tasks.put(request, current); + if(previous != null) { + previous.printStackTrace(); + current.printStackTrace(); + } + } + freeScheduling.add(request); querySupportLock.notifyAll(); - //System.err.println("schedule free task " + request + " => " + freeScheduling.size()); - -// for(int i=0;i ancestors; + private int counter = 0; + private Exception trace; - public SessionTask(boolean systemCall) { - this.thread = QueryProcessor.thread.get(); - this.systemCall = systemCall; -// this.syncCaller = -1; - //this.object = object; + public SessionTask(ReadGraphImpl graph) { + this.graph = graph; + if(graph != null) graph.asyncBarrier.inc(); } -// public SessionTask(Object object, int syncCaller) { -// this.thread = QueryProcessor.thread.get(); -// this.syncCaller = syncCaller; -// this.object = object; -// } + public boolean hasCommonParent(Set otherAncestors) { + if(graph == null) return false; + if(ancestors == null) ancestors = graph.ancestorSet(); + return !Collections.disjoint(ancestors, otherAncestors); + } - public abstract void run(int thread); + public abstract void run0(int thread); + + public final void run(int thread) { + if(counter++ > 0) { + if(BarrierTracing.BOOKKEEPING) { + trace.printStackTrace(); + new Exception().printStackTrace(); + } + throw new IllegalStateException("Multiple invocations of SessionTask!"); + } + if(BarrierTracing.BOOKKEEPING) { + trace = new Exception(); + } + run0(thread); + if(graph != null) graph.asyncBarrier.dec(); + } @Override public String toString() { - return "SessionTask[" + super.toString() + "]"; + return "SessionTask[" + graph.parent + "]"; } } @@ -294,7 +321,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap final public DataContainer throwable; public SessionRead(DataContainer throwable, Semaphore notify) { - super(true); + super(null); this.throwable = throwable; this.notify = notify; } @@ -1593,7 +1620,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap entry.prepareRecompute(querySupport); - ReadGraphImpl parentGraph = graph.withParent(entry); + ReadGraphImpl parentGraph = graph.forRecompute(entry); query.recompute(parentGraph); @@ -1765,6 +1792,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap } else { // If not changed, keep the old value immediate.setResult(oldValue); + immediate.setReady(); listenersUnknown = true; } @@ -2268,67 +2296,16 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap @Override final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure procedure) { - throw new UnsupportedOperationException(); + try { -// assert(subject != null); -// assert(procedure != null); -// -// final ListenerBase listener = getListenerBase(procedure); -// -// IntProcedure ip = new IntProcedure() { -// -// AtomicBoolean first = new AtomicBoolean(true); -// -// @Override -// public void execute(ReadGraphImpl graph, int i) { -// try { -// if(first.get()) { -// procedure.execute(graph, querySupport.getResource(i)); -// } else { -// procedure.execute(impl.newRestart(graph), querySupport.getResource(i)); -// } -// } catch (Throwable t2) { -// Logger.defaultLogError(t2); -// } -// } -// -// @Override -// public void finished(ReadGraphImpl graph) { -// try { -// if(first.compareAndSet(true, false)) { -// procedure.finished(graph); -//// impl.state.barrier.dec(this); -// } else { -// procedure.finished(impl.newRestart(graph)); -// } -// -// } catch (Throwable t2) { -// Logger.defaultLogError(t2); -// } -// } -// -// @Override -// public void exception(ReadGraphImpl graph, Throwable t) { -// try { -// if(first.compareAndSet(true, false)) { -// procedure.exception(graph, t); -// } else { -// procedure.exception(impl.newRestart(graph), t); -// } -// } catch (Throwable t2) { -// Logger.defaultLogError(t2); -// } -// } -// -// }; -// -// int sId = querySupport.getId(subject); -// -// try { -// QueryCache.runnerPredicates(impl, sId, impl.parent, listener, ip); -// } catch (DatabaseException e) { -// Logger.defaultLogError(e); -// } + for(Resource predicate : getPredicates(impl, subject)) + procedure.execute(impl, predicate); + + procedure.finished(impl); + + } catch (Throwable e) { + procedure.exception(impl, e); + } }