final public Session session;
final public ResourceSupport resourceSupport;
+ final public Semaphore requests = new Semaphore(1);
+
final public QueryListening listening = new QueryListening(this);
QueryThread[] executors;
-
-// public ArrayList<SessionTask>[] queues;
public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
}
public ThreadState[] threadStates;
-// public ReentrantLock[] threadLocks;
-// public Condition[] threadConditions;
-
- //public ArrayList<SessionTask>[] ownTasks;
-
- //public ArrayList<SessionTask>[] ownSyncTasks;
-
- //ArrayList<SessionTask>[] delayQueues;
final Object querySupportLock;
public void close() {
}
- public SessionTask getOwnTask(ReadGraphImpl impl) {
- Set<ReadGraphImpl> ancestors = impl.ancestorSet();
- synchronized(querySupportLock) {
- int index = 0;
- while(index < freeScheduling.size()) {
- SessionTask task = freeScheduling.get(index);
- if(task.hasCommonParent(ancestors)) {
- return freeScheduling.remove(index);
- }
- index++;
- }
- }
- return null;
- }
-
public SessionTask getSubTask(ReadGraphImpl impl) {
- Set<ReadGraphImpl> onlyThis = Collections.singleton(impl);
synchronized(querySupportLock) {
int index = 0;
while(index < freeScheduling.size()) {
SessionTask task = freeScheduling.get(index);
- if(task.hasCommonParent(onlyThis)) {
+ if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) {
+ queueLength.decrementAndGet();
return freeScheduling.remove(index);
}
index++;
return null;
}
- public boolean performPending(ReadGraphImpl graph) {
- SessionTask task = getOwnTask(graph);
+ /*
+ * We are running errands while waiting for requests to complete.
+ * We can only run work that is part of the current root request to avoid any deadlocks
+ */
+ public boolean performPending(ReadGraphImpl graph) {
+ SessionTask task = getSubTask(graph);
if(task != null) {
task.run(QueryProcessor.thread.get());
return true;
- } else {
- return false;
}
+ return false;
}
-// final public void scheduleOwn(int caller, SessionTask request) {
-// ownTasks[caller].add(request);
-// }
-
- final public void schedule(SessionTask request) {
-
- //int performer = request.thread;
-
-// if(DebugPolicy.SCHEDULE)
-// System.out.println("schedule " + request + " " + " -> " + performer);
+ final public void scheduleNow(SessionTask request) {
+ schedule(request, false);
+ }
- //assert(performer >= 0);
+ final public void scheduleLater(SessionTask request) {
+ schedule(request, true);
+ }
+ AtomicInteger queueLength = new AtomicInteger(0);
+
+ final public void schedule(SessionTask request, boolean late) {
+
+ int queueLengthEstimate = queueLength.get();
+ if(!late && queueLengthEstimate > 80) {
+ request.run(thread.get());
+ return;
+ }
+
assert(request != null);
+
+ synchronized(querySupportLock) {
-// if(caller == performer) {
-// request.run(caller);
-// } else {
-
-// if(performer == THREADS) {
-
- synchronized(querySupportLock) {
-
- if(BarrierTracing.BOOKKEEPING) {
- Exception current = new Exception();
- Exception previous = BarrierTracing.tasks.put(request, current);
- if(previous != null) {
- previous.printStackTrace();
- current.printStackTrace();
- }
- }
-
- freeScheduling.add(request);
-
- querySupportLock.notifyAll();
+ if(BarrierTracing.BOOKKEEPING) {
+ Exception current = new Exception();
+ Exception previous = BarrierTracing.tasks.put(request, current);
+ if(previous != null) {
+ previous.printStackTrace();
+ current.printStackTrace();
+ }
+ }
- }
+ if(late) {
+ int pos = request.position - 1;
+ if(pos < freeScheduling.size()) {
+ freeScheduling.add(pos, request);
+ queueLength.incrementAndGet();
+ requests.release();
+ } else {
+ freeScheduling.addLast(request);
+ queueLength.incrementAndGet();
+ requests.release();
+ }
+ }
+ else {
+ if(request.getLevel() < 4) {
+ if(freeScheduling.size() < 100) {
+ freeScheduling.addFirst(request);
+ queueLength.incrementAndGet();
+ requests.release();
+ } else {
+ request.run(thread.get());
+ }
+ } else {
+ if(freeScheduling.size() < 20) {
+ freeScheduling.addFirst(request);
+ queueLength.incrementAndGet();
+ requests.release();
+ } else {
+ request.run(thread.get());
+ }
+ }
+ }
- return;
-
-// }
-//
-// ReentrantLock queueLock = threadLocks[performer];
-// queueLock.lock();
-// queues[performer].add(request);
-// // This thread could have been sleeping
-// if(queues[performer].size() == 1) {
-// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
-// threadConditions[performer].signalAll();
-// }
-// queueLock.unlock();
-// }
+
+ }
}
public static abstract class SessionTask {
public final ReadGraphImpl graph;
- private Set<ReadGraphImpl> ancestors;
private int counter = 0;
+ protected int position = 1;
private Exception trace;
public SessionTask(ReadGraphImpl graph) {
if(graph != null) graph.asyncBarrier.inc();
}
- public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
- if(graph == null) return false;
- if(ancestors == null) ancestors = graph.ancestorSet();
- return !Collections.disjoint(ancestors, otherAncestors);
- }
+ public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) {
+ if(r1 == null || r2 == null) return false;
+ return r1.getTopLevelGraph() == r2.getTopLevelGraph();
+ }
- public abstract void run0(int thread);
+ public abstract void run0(int thread);
public final void run(int thread) {
if(counter++ > 0) {
run0(thread);
if(graph != null) graph.asyncBarrier.dec();
}
+
+ public boolean maybeReady() {
+ return true;
+ }
@Override
public String toString() {
- return "SessionTask[" + graph.parent + "]";
+ if(graph == null)
+ return "SessionTask[no graph]";
+ else
+ return "SessionTask[" + graph.parent + "]";
+ }
+
+ public int getLevel() {
+ if(graph == null) return 0;
+ else return graph.getLevel();
}
}
@Override
protected void doDispose() {
+ requests.release(Integer.MAX_VALUE / 2);
+
for(int index = 0; index < THREADS; index++) {
executors[index].dispose();
}