final public QueryListening listening = new QueryListening(this);
QueryThread[] executors;
-
- public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
-
- public LinkedList<SessionTask> topLevelTasks = new LinkedList<SessionTask>();
enum ThreadState {
}
+ final Scheduling scheduling;
+
public ThreadState[] threadStates;
final Object querySupportLock;
public void close() {
}
- public SessionTask getSubTask(ReadGraphImpl parent) {
- synchronized(querySupportLock) {
- int index = 0;
- while(index < freeScheduling.size()) {
- SessionTask task = freeScheduling.get(index);
- if(task.isSubtask(parent) && task.maybeReady()) {
- return freeScheduling.remove(index);
- }
- index++;
- }
- }
- return null;
- }
/*
* We are running errands while waiting for requests to complete.
* We can only run work that is part of the current root request to avoid any deadlocks
*/
public boolean performPending(ReadGraphImpl under) {
- SessionTask task = getSubTask(under);
+ SessionTask task = scheduling.getSubTask(under);
if(task != null) {
task.run(thread.get());
return true;
}
final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
-
- assert(request != null);
-
- synchronized(querySupportLock) {
-
- LinkedList<SessionTask> queue = request.rootGraph != null ? freeScheduling : topLevelTasks;
-
- if(BarrierTracing.BOOKKEEPING) {
- Exception current = new Exception();
- Exception previous = BarrierTracing.tasks.put(request, current);
- if(previous != null) {
- previous.printStackTrace();
- current.printStackTrace();
- }
- }
-
- queue.addFirst(request);
- requests.release();
-
- }
-
- return null;
+
+ return scheduling.scheduleOrReturnForExecution(request);
}
THREADS = threads;
THREAD_MASK = threads - 1;
+ scheduling = new Scheduling(requests);
+
querySupport = core;
cache = new QueryCache(core, threads);
session = querySupport.getSession();
entry.prepareRecompute(querySupport);
ReadGraphImpl parentGraph = graph.forRecompute(entry);
-
+ parentGraph.asyncBarrier.inc();
query.recompute(parentGraph);
+ parentGraph.asyncBarrier.dec();
if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
public void propagateChangesInQueryCache(final ReadGraphImpl graph) {
+ ReadGraphImpl syncGraph = graph.forSyncExecute();
+ syncGraph.asyncBarrier.inc();
+ propagateChangesInQueryCache_(syncGraph);
+ syncGraph.asyncBarrier.dec();
+ syncGraph.asyncBarrier.waitBarrier(this, syncGraph);
+
+ }
+
+
+ public void propagateChangesInQueryCache_(final ReadGraphImpl graph) {
+
// Make sure that listening has performed its work
listening.sync();
public void invalidateResource(final int resource) {
if(lastInvalidate == resource) return;
- scheduledValueUpdates.add(resource);
+ //scheduledValueUpdates.add(resource);
+ scheduledInvalidates.add(resource);
lastInvalidate = resource;
cache.dirty = true;
}
Exception callerException = null;
- public interface AsyncBarrier {
- public void inc();
- public void dec();
- // public void inc(String debug);
- // public void dec(String debug);
- }
+ public interface AsyncBarrier {
+ public void inc();
+ public void dec();
+ public void waitBarrier(Object request, ReadGraphImpl impl);
+ public boolean isBlocking();
+ }
// final public QueryProcessor processor;
// final public QuerySupport support;
return L0;
}
+ public Layer0 getL0() {
+ return L0;
+ }
+
public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
protected Integer initialValue() {
return -1;