private Session session;
private QuerySupport querySupport;
+ final private QueryProcessor processor;
final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
- final private ArrayList<SessionTask> own;
- final private ArrayList<SessionTask> ownSync;
- final private ArrayList<SessionTask> queue;
+// final private ArrayList<SessionTask> own;
+// final private ArrayList<SessionTask> ownSync;
+// final private ArrayList<SessionTask> queue;
final private ReentrantLock lock;
final private Condition condition;
final private Object querySupportLock;
final private int THREADS;
final private AtomicInteger sleepers;
final private ThreadState[] threadStates;
- final private ArrayList<SessionTask>[] delayQueues;
+// final private ArrayList<SessionTask>[] delayQueues;
final private QueryThread[] executors;
final private ReentrantLock[] threadLocks;
- final private ArrayList<SessionTask>[] queues;
- final private ArrayList<SessionTask>[] ownSyncTasks;
+// final private ArrayList<SessionTask>[] queues;
+// final private ArrayList<SessionTask>[] ownSyncTasks;
public QueryThread(Session session, QueryProcessor processor, int index, String name) {
super(QueryProcessor.QueryThreadGroup, null, name);
this.session = session;
+ this.processor = processor;
this.index = index;
- own = processor.ownTasks[index];
- ownSync = processor.ownSyncTasks[index];
- queue = processor.queues[index];
+// own = processor.ownTasks[index];
+// ownSync = processor.ownSyncTasks[index];
+// queue = processor.queues[index];
lock = processor.threadLocks[index];
condition = processor.threadConditions[index];
querySupportLock = processor.querySupportLock;
sleepers = processor.sleepers;
querySupport = processor.querySupport;
threadStates = processor.threadStates;
- delayQueues = processor.delayQueues;
+// delayQueues = processor.delayQueues;
executors = processor.executors;
threadLocks = processor.threadLocks;
- queues = processor.queues;
- ownSyncTasks = processor.ownSyncTasks;
+// queues = processor.queues;
+// ownSyncTasks = processor.ownSyncTasks;
}
synchronized void dispose() {
while(true) {
// Perform own tasks first
- if(tasks.addAll(own)) {
- own.clear();
- } else if (doWait && !ownSync.isEmpty()) {
- tasks.add(ownSync.remove(ownSync.size()-1));
- }
+// if(tasks.addAll(own)) {
+// own.clear();
+// } else if (doWait && !ownSync.isEmpty()) {
+// tasks.add(ownSync.remove(ownSync.size()-1));
+// }
// Try some queued tasks
- lock.lock();
- if(tasks.addAll(queue)) {
- queue.clear();
- lock.unlock();
- return tasks;
- } else {
- lock.unlock();
- }
+// lock.lock();
+// if(tasks.addAll(queue)) {
+// queue.clear();
+// lock.unlock();
+// return tasks;
+// } else {
+// lock.unlock();
+// }
// Return tasks if some were found
if(!tasks.isEmpty()) return tasks;
+
if(!doWait) return null;
synchronized (querySupportLock) {
- lock.lock();
+// System.err.println("check free tasks for QT " + index + " (" + processor.freeScheduling + ")");
- // Just maybe someone inserted tasks and notified just before synchronized block
- if(tasks.addAll(queue)) {
- queue.clear();
- lock.unlock();
+ if(!processor.freeScheduling.isEmpty()) {
+ tasks.add(processor.freeScheduling.removeFirst());
return tasks;
}
+ lock.lock();
+
+ // Just maybe someone inserted tasks and notified just before synchronized block
+// if(tasks.addAll(queue)) {
+// queue.clear();
+// lock.unlock();
+// return tasks;
+// }
+
+// System.err.println("QT " + index + ", sleepers = " + sleepers);
+
// We are the last one awake
if(sleepers.incrementAndGet() == THREADS) {
if(querySupport == null) System.err.println("null qs");
querySupport.ceased(index);
- if(tasks.addAll(own)) {
- own.clear();
- }
+// if(tasks.addAll(own)) {
+// own.clear();
+// }
// System.err.println("tasks after ceased: " + tasks.size());
if(!tasks.isEmpty()) {
lock.unlock();
threadStates[index] = ThreadState.SLEEP;
condition.await();
+ sleepers.decrementAndGet();
+
// We are done
if(isDisposed()) {
threadStates[index] = ThreadState.DISPOSED;
boolean didExecute = false;
- for(int performer=0;performer<THREADS;performer++) {
- if(!delayQueues[index * THREADS + performer].isEmpty()) {
- synchronized(executors[performer]) {
- threadLocks[performer].lock();
- queues[performer].addAll(delayQueues[index * THREADS + performer]);
- delayQueues[index * THREADS + performer].clear();
- executors[performer].notify();
- threadLocks[performer].unlock();
- }
- }
- }
+// for(int performer=0;performer<THREADS;performer++) {
+// if(!delayQueues[index * THREADS + performer].isEmpty()) {
+// synchronized(executors[performer]) {
+// threadLocks[performer].lock();
+// queues[performer].addAll(delayQueues[index * THREADS + performer]);
+// delayQueues[index * THREADS + performer].clear();
+// executors[performer].notify();
+// threadLocks[performer].unlock();
+// }
+// }
+// }
if(tasks.isEmpty()) {
ArrayList<SessionTask> finished = newTasks(false, tasks);
SessionTask task = tasks.remove(tasks.size() - 1);
- if(task.syncCaller == index) {
- ownSyncTasks[index].add(task);
- } else {
+// if(task.syncCaller == index) {
+// ownSyncTasks[index].add(task);
+// } else {
task.run(index);
+// System.err.println("QT(s) " + index + " runs " + task);
didExecute = true;
- }
+// }
}
while(!tasks.isEmpty()) {
SessionTask task = tasks.remove(tasks.size()-1);
+// System.err.println("QT " + index + " runs " + task);
task.run(index);
}
- for(int performer=0;performer<THREADS;performer++) {
- if(!delayQueues[index * THREADS + performer].isEmpty()) {
- synchronized(executors[performer]) {
- threadLocks[performer].lock();
- queues[performer].addAll(delayQueues[index * THREADS + performer]);
- delayQueues[index * THREADS + performer].clear();
- executors[performer].notify();
- threadLocks[performer].unlock();
- }
- }
- }
+// for(int performer=0;performer<THREADS;performer++) {
+// if(!delayQueues[index * THREADS + performer].isEmpty()) {
+// synchronized(executors[performer]) {
+// threadLocks[performer].lock();
+// queues[performer].addAll(delayQueues[index * THREADS + performer]);
+// delayQueues[index * THREADS + performer].clear();
+// executors[performer].notify();
+// threadLocks[performer].unlock();
+// }
+// }
+// }
}