QueryThread[] executors;
public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
+
+ public LinkedList<SessionTask> topLevelTasks = new LinkedList<SessionTask>();
enum ThreadState {
public void close() {
}
- public SessionTask getSubTask(ReadGraphImpl impl) {
+ public SessionTask getSubTask(ReadGraphImpl parent) {
synchronized(querySupportLock) {
int index = 0;
while(index < freeScheduling.size()) {
SessionTask task = freeScheduling.get(index);
- if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) {
- queueLength.decrementAndGet();
+ if(task.isSubtask(parent) && task.maybeReady()) {
return freeScheduling.remove(index);
}
index++;
* We are running errands while waiting for requests to complete.
* We can only run work that is part of the current root request to avoid any deadlocks
*/
- public boolean performPending(ReadGraphImpl graph) {
- SessionTask task = getSubTask(graph);
+ public boolean performPending(ReadGraphImpl under) {
+ SessionTask task = getSubTask(under);
if(task != null) {
- task.run(QueryProcessor.thread.get());
+ task.run(thread.get());
return true;
}
return false;
}
-
+
final public void scheduleNow(SessionTask request) {
- schedule(request, false);
+ SessionTask toExecute = scheduleOrReturnForExecution(request);
+ if(toExecute != null)
+ toExecute.run(thread.get());
}
- final public void scheduleLater(SessionTask request) {
- schedule(request, true);
- }
+ final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
- AtomicInteger queueLength = new AtomicInteger(0);
-
- final public void schedule(SessionTask request, boolean late) {
-
- int queueLengthEstimate = queueLength.get();
- if(!late && queueLengthEstimate > 80) {
- request.run(thread.get());
- return;
- }
-
- assert(request != null);
-
- synchronized(querySupportLock) {
+ assert(request != null);
- if(BarrierTracing.BOOKKEEPING) {
- Exception current = new Exception();
- Exception previous = BarrierTracing.tasks.put(request, current);
- if(previous != null) {
- previous.printStackTrace();
- current.printStackTrace();
- }
- }
+ synchronized(querySupportLock) {
- if(late) {
- int pos = request.position - 1;
- if(pos < freeScheduling.size()) {
- freeScheduling.add(pos, request);
- queueLength.incrementAndGet();
- requests.release();
- } else {
- freeScheduling.addLast(request);
- queueLength.incrementAndGet();
- requests.release();
- }
- }
- else {
- if(request.getLevel() < 4) {
- if(freeScheduling.size() < 100) {
- freeScheduling.addFirst(request);
- queueLength.incrementAndGet();
- requests.release();
- } else {
- request.run(thread.get());
- }
- } else {
- if(freeScheduling.size() < 20) {
- freeScheduling.addFirst(request);
- queueLength.incrementAndGet();
- requests.release();
- } else {
- request.run(thread.get());
- }
- }
- }
+ LinkedList<SessionTask> queue = request.rootGraph != null ? freeScheduling : topLevelTasks;
+
+ if(BarrierTracing.BOOKKEEPING) {
+ Exception current = new Exception();
+ Exception previous = BarrierTracing.tasks.put(request, current);
+ if(previous != null) {
+ previous.printStackTrace();
+ current.printStackTrace();
+ }
+ }
+ queue.addFirst(request);
+ requests.release();
- }
+ }
- }
+ return null;
+
+ }
final int THREADS;
public static abstract class SessionTask {
- public final ReadGraphImpl graph;
+ final protected ReadGraphImpl rootGraph;
private int counter = 0;
protected int position = 1;
private Exception trace;
- public SessionTask(ReadGraphImpl graph) {
- this.graph = graph;
- if(graph != null) graph.asyncBarrier.inc();
+ public SessionTask() {
+ this(null);
}
-
- public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) {
- if(r1 == null || r2 == null) return false;
- return r1.getTopLevelGraph() == r2.getTopLevelGraph();
- }
+
+ public SessionTask(ReadGraphImpl rootGraph) {
+ this.rootGraph = rootGraph;
+ }
+
+ public boolean isSubtask(ReadGraphImpl graph) {
+ return graph.isParent(rootGraph);
+ }
public abstract void run0(int thread);
trace = new Exception();
}
run0(thread);
- if(graph != null) graph.asyncBarrier.dec();
+ //if(graph != null && graph.asyncBarrier != null) graph.asyncBarrier.dec();
}
public boolean maybeReady() {
@Override
public String toString() {
- if(graph == null)
+// if(graph == null)
return "SessionTask[no graph]";
- else
- return "SessionTask[" + graph.parent + "]";
+// else
+// return "SessionTask[" + graph.parent + "]";
}
- public int getLevel() {
- if(graph == null) return 0;
- else return graph.getLevel();
- }
+// public int getLevel() {
+// if(graph == null) return 0;
+// else return graph.getLevel();
+// }
}