private Session session;
private QuerySupport querySupport;
private final QueryProcessor processor;
+ private final Semaphore requests;
final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
// final private ArrayList<SessionTask> own;
sleepers = processor.sleepers;
querySupport = processor.querySupport;
threadStates = processor.threadStates;
+ requests = processor.requests;
// delayQueues = processor.delayQueues;
// executors = processor.executors;
// threadLocks = processor.threadLocks;
}
private boolean pumpTask() {
- if(!processor.freeScheduling.isEmpty()) {
- tasks.add(processor.freeScheduling.removeFirst());
- return true;
- }
- return false;
+ return processor.scheduling.pumpTask(tasks);
}
ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
threadStates[index] = ThreadState.SLEEP;
- synchronized (querySupportLock) {
- querySupportLock.wait(100);
-
- }
-
-// boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
-// if(!woken) {
-// synchronized (querySupportLock) {
-// if(!processor.freeScheduling.isEmpty())
-// System.err.println("some tasks are available!");
-// }
-// }
+ requests.acquire();
sleepers.decrementAndGet();
@Override
public void run() {
- processor.thread.set(index);
-
QuerySupport support = this.querySupport;
try {
while(!tasks.isEmpty()) {
SessionTask task = tasks.remove(tasks.size()-1);
-// System.err.println("QT " + index + " runs " + task);
- task.run(index);
+ task.run(0);
}
-// for(int performer=0;performer<THREADS;performer++) {
-// if(!delayQueues[index * THREADS + performer].isEmpty()) {
-// synchronized(executors[performer]) {
-// threadLocks[performer].lock();
-// queues[performer].addAll(delayQueues[index * THREADS + performer]);
-// delayQueues[index * THREADS + performer].clear();
-// executors[performer].notify();
-// threadLocks[performer].unlock();
-// }
-// }
-// }
-
}
} catch (Throwable t) {