import java.util.ArrayList;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
// final private ArrayList<SessionTask> own;
// final private ArrayList<SessionTask> ownSync;
// final private ArrayList<SessionTask> queue;
- final private ReentrantLock lock;
- final private Condition condition;
+// final private ReentrantLock lock;
+// final private Condition condition;
final private Object querySupportLock;
final private int THREADS;
final private AtomicInteger sleepers;
final private ThreadState[] threadStates;
// final private ArrayList<SessionTask>[] delayQueues;
- final private QueryThread[] executors;
- final private ReentrantLock[] threadLocks;
+// final private QueryThread[] executors;
+// final private ReentrantLock[] threadLocks;
// final private ArrayList<SessionTask>[] queues;
// final private ArrayList<SessionTask>[] ownSyncTasks;
// own = processor.ownTasks[index];
// ownSync = processor.ownSyncTasks[index];
// queue = processor.queues[index];
- lock = processor.threadLocks[index];
- condition = processor.threadConditions[index];
+// lock = processor.threadLocks[index];
+// condition = processor.threadConditions[index];
querySupportLock = processor.querySupportLock;
THREADS = processor.THREADS;
sleepers = processor.sleepers;
querySupport = processor.querySupport;
threadStates = processor.threadStates;
// delayQueues = processor.delayQueues;
- executors = processor.executors;
- threadLocks = processor.threadLocks;
+// executors = processor.executors;
+// threadLocks = processor.threadLocks;
// queues = processor.queues;
// ownSyncTasks = processor.ownSyncTasks;
}
// System.err.println("qt dispose");
disposed = true;
- lock.lock();
- condition.signalAll();
- lock.unlock();
+// lock.lock();
+// condition.signalAll();
+// lock.unlock();
try {
exited.acquire();
}
+ private boolean pumpTask() {
+ if(!processor.freeScheduling.isEmpty()) {
+ tasks.add(processor.freeScheduling.removeFirst());
+ return true;
+ }
+ return false;
+ }
+
ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
try {
while(true) {
- // Perform own tasks first
-// if(tasks.addAll(own)) {
-// own.clear();
-// } else if (doWait && !ownSync.isEmpty()) {
-// tasks.add(ownSync.remove(ownSync.size()-1));
-// }
-
- // Try some queued tasks
-// lock.lock();
-// if(tasks.addAll(queue)) {
-// queue.clear();
-// lock.unlock();
-// return tasks;
-// } else {
-// lock.unlock();
-// }
-
// Return tasks if some were found
if(!tasks.isEmpty()) return tasks;
synchronized (querySupportLock) {
-// System.err.println("check free tasks for QT " + index + " (" + processor.freeScheduling + ")");
-
- if(!processor.freeScheduling.isEmpty()) {
- tasks.add(processor.freeScheduling.removeFirst());
+ if(pumpTask())
return tasks;
- }
- lock.lock();
-
- // Just maybe someone inserted tasks and notified just before synchronized block
-// if(tasks.addAll(queue)) {
-// queue.clear();
-// lock.unlock();
-// return tasks;
-// }
-
-// System.err.println("QT " + index + ", sleepers = " + sleepers);
+// lock.lock();
// We are the last one awake
if(sleepers.incrementAndGet() == THREADS) {
if(querySupport == null) System.err.println("null qs");
querySupport.ceased(index);
-// if(tasks.addAll(own)) {
-// own.clear();
-// }
- // System.err.println("tasks after ceased: " + tasks.size());
- if(!tasks.isEmpty()) {
- lock.unlock();
+ if(pumpTask()) {
+// lock.unlock();
return tasks;
}
// We are done
if(isDisposed()) {
threadStates[index] = ThreadState.DISPOSED;
- lock.unlock();
+// lock.unlock();
return null;
}
+
threadStates[index] = ThreadState.SLEEP;
- condition.await();
+
+ synchronized (querySupportLock) {
+ querySupportLock.wait(100);
+
+ }
+
+// boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
+// if(!woken) {
+// synchronized (querySupportLock) {
+// if(!processor.freeScheduling.isEmpty())
+// System.err.println("some tasks are available!");
+// }
+// }
sleepers.decrementAndGet();
// We are done
if(isDisposed()) {
threadStates[index] = ThreadState.DISPOSED;
- lock.unlock();
+ //lock.unlock();
return null;
}
threadStates[index] = ThreadState.RUN;
- lock.unlock();
+ //lock.unlock();
}
@Override
public void run() {
+ processor.thread.set(index);
+
QuerySupport support = this.querySupport;
try {