import org.simantics.db.exception.ResourceNotFoundException;
import org.simantics.db.impl.DebugPolicy;
import org.simantics.db.impl.ResourceImpl;
+import org.simantics.db.impl.graph.BarrierTracing;
import org.simantics.db.impl.graph.ReadGraphImpl;
import org.simantics.db.impl.graph.ReadGraphSupport;
import org.simantics.db.impl.graph.WriteGraphImpl;
public void close() {
}
- SessionTask getOwnTask(int thread) {
+ public SessionTask getOwnTask(ReadGraphImpl impl) {
+ Set<ReadGraphImpl> ancestors = impl.ancestorSet();
synchronized(querySupportLock) {
int index = 0;
while(index < freeScheduling.size()) {
SessionTask task = freeScheduling.get(index);
- if(task.thread == thread && !task.systemCall)
+ if(task.hasCommonParent(ancestors)) {
return freeScheduling.remove(index);
+ }
index++;
}
}
return null;
}
-
- public boolean performPending(int thread) {
- SessionTask task = getOwnTask(thread);
+
+ public SessionTask getSubTask(ReadGraphImpl impl) {
+ Set<ReadGraphImpl> onlyThis = Collections.singleton(impl);
+ synchronized(querySupportLock) {
+ int index = 0;
+ while(index < freeScheduling.size()) {
+ SessionTask task = freeScheduling.get(index);
+ if(task.hasCommonParent(onlyThis)) {
+ return freeScheduling.remove(index);
+ }
+ index++;
+ }
+ }
+ return null;
+ }
+
+ public boolean performPending(ReadGraphImpl graph) {
+ SessionTask task = getOwnTask(graph);
if(task != null) {
- task.run(thread);
+ task.run(QueryProcessor.thread.get());
return true;
} else {
return false;
// }
final public void schedule(SessionTask request) {
+
+ //int performer = request.thread;
- int performer = request.thread;
-
- if(DebugPolicy.SCHEDULE)
- System.out.println("schedule " + request + " " + " -> " + performer);
+// if(DebugPolicy.SCHEDULE)
+// System.out.println("schedule " + request + " " + " -> " + performer);
//assert(performer >= 0);
// if(performer == THREADS) {
synchronized(querySupportLock) {
-
- //new Exception().printStackTrace();
-
+
+ if(BarrierTracing.BOOKKEEPING) {
+ Exception current = new Exception();
+ Exception previous = BarrierTracing.tasks.put(request, current);
+ if(previous != null) {
+ previous.printStackTrace();
+ current.printStackTrace();
+ }
+ }
+
freeScheduling.add(request);
querySupportLock.notifyAll();
- //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
-
-// for(int i=0;i<THREADS;i++) {
-// ReentrantLock queueLock = threadLocks[i];
-// queueLock.lock();
-// //queues[performer].add(request);
-// //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
-// threadConditions[i].signalAll();
-// queueLock.unlock();
-// }
-
}
return;
final int THREADS;
final public int THREAD_MASK;
-
- final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
+
+ final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
public static abstract class SessionTask {
- final public int thread;
- final public boolean systemCall;
-// final public int syncCaller;
- //final public Object object;
+ public final ReadGraphImpl graph;
+ private Set<ReadGraphImpl> ancestors;
+ private int counter = 0;
+ private Exception trace;
- public SessionTask(boolean systemCall) {
- this.thread = QueryProcessor.thread.get();
- this.systemCall = systemCall;
-// this.syncCaller = -1;
- //this.object = object;
+ public SessionTask(ReadGraphImpl graph) {
+ this.graph = graph;
+ if(graph != null) graph.asyncBarrier.inc();
}
-// public SessionTask(Object object, int syncCaller) {
-// this.thread = QueryProcessor.thread.get();
-// this.syncCaller = syncCaller;
-// this.object = object;
-// }
+ public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
+ if(graph == null) return false;
+ if(ancestors == null) ancestors = graph.ancestorSet();
+ return !Collections.disjoint(ancestors, otherAncestors);
+ }
- public abstract void run(int thread);
+ public abstract void run0(int thread);
+
+ public final void run(int thread) {
+ if(counter++ > 0) {
+ if(BarrierTracing.BOOKKEEPING) {
+ trace.printStackTrace();
+ new Exception().printStackTrace();
+ }
+ throw new IllegalStateException("Multiple invocations of SessionTask!");
+ }
+ if(BarrierTracing.BOOKKEEPING) {
+ trace = new Exception();
+ }
+ run0(thread);
+ if(graph != null) graph.asyncBarrier.dec();
+ }
@Override
public String toString() {
- return "SessionTask[" + super.toString() + "]";
+ return "SessionTask[" + graph.parent + "]";
}
}
final public DataContainer<Throwable> throwable;
public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
- super(true);
+ super(null);
this.throwable = throwable;
this.notify = notify;
}
entry.prepareRecompute(querySupport);
- ReadGraphImpl parentGraph = graph.withParent(entry);
+ ReadGraphImpl parentGraph = graph.forRecompute(entry);
query.recompute(parentGraph);
} else {
// If not changed, keep the old value
immediate.setResult(oldValue);
+ immediate.setReady();
listenersUnknown = true;
}