import java.util.ArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
import org.simantics.db.Session;
import org.simantics.db.common.SessionThread;
-import org.simantics.db.common.utils.Logger;
import org.simantics.db.impl.query.QueryProcessor.SessionTask;
import org.simantics.db.impl.query.QueryProcessor.ThreadState;
+import org.slf4j.LoggerFactory;
class QueryThread extends Thread implements SessionThread {
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class);
+
boolean disposed = false;
private Semaphore exited = new Semaphore(0);
private Session session;
private QuerySupport querySupport;
+ private final QueryProcessor processor;
final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
- final private ArrayList<SessionTask> own;
- final private ArrayList<SessionTask> ownSync;
- final private ArrayList<SessionTask> queue;
- final private ReentrantLock lock;
- final private Condition condition;
+// final private ArrayList<SessionTask> own;
+// final private ArrayList<SessionTask> ownSync;
+// final private ArrayList<SessionTask> queue;
+// final private ReentrantLock lock;
+// final private Condition condition;
final private Object querySupportLock;
final private int THREADS;
final private AtomicInteger sleepers;
final private ThreadState[] threadStates;
- final private ArrayList<SessionTask>[] delayQueues;
- final private QueryThread[] executors;
- final private ReentrantLock[] threadLocks;
- final private ArrayList<SessionTask>[] queues;
- final private ArrayList<SessionTask>[] ownSyncTasks;
+// final private ArrayList<SessionTask>[] delayQueues;
+// final private QueryThread[] executors;
+// final private ReentrantLock[] threadLocks;
+// final private ArrayList<SessionTask>[] queues;
+// final private ArrayList<SessionTask>[] ownSyncTasks;
public QueryThread(Session session, QueryProcessor processor, int index, String name) {
super(QueryProcessor.QueryThreadGroup, null, name);
this.session = session;
+ this.processor = processor;
this.index = index;
- own = processor.ownTasks[index];
- ownSync = processor.ownSyncTasks[index];
- queue = processor.queues[index];
- lock = processor.threadLocks[index];
- condition = processor.threadConditions[index];
+// own = processor.ownTasks[index];
+// ownSync = processor.ownSyncTasks[index];
+// queue = processor.queues[index];
+// lock = processor.threadLocks[index];
+// condition = processor.threadConditions[index];
querySupportLock = processor.querySupportLock;
THREADS = processor.THREADS;
sleepers = processor.sleepers;
querySupport = processor.querySupport;
threadStates = processor.threadStates;
- delayQueues = processor.delayQueues;
- executors = processor.executors;
- threadLocks = processor.threadLocks;
- queues = processor.queues;
- ownSyncTasks = processor.ownSyncTasks;
+// delayQueues = processor.delayQueues;
+// executors = processor.executors;
+// threadLocks = processor.threadLocks;
+// queues = processor.queues;
+// ownSyncTasks = processor.ownSyncTasks;
}
synchronized void dispose() {
// System.err.println("qt dispose");
disposed = true;
- lock.lock();
- condition.signalAll();
- lock.unlock();
+// lock.lock();
+// condition.signalAll();
+// lock.unlock();
try {
exited.acquire();
} catch (InterruptedException e) {
- Logger.defaultLogError(e);
+ LOGGER.error("dispose was interrupted", e);
}
session = null;
querySupport = null;
-
// System.err.println("qt disposed");
}
}
+ private boolean pumpTask() {
+ if(!processor.freeScheduling.isEmpty()) {
+ tasks.add(processor.freeScheduling.removeFirst());
+ return true;
+ }
+ return false;
+ }
+
ArrayList<SessionTask> newTasks(boolean doWait, ArrayList<SessionTask> tasks) {
try {
while(true) {
- // Perform own tasks first
- if(tasks.addAll(own)) {
- own.clear();
- } else if (doWait && !ownSync.isEmpty()) {
- tasks.add(ownSync.remove(ownSync.size()-1));
- }
-
- // Try some queued tasks
- lock.lock();
- if(tasks.addAll(queue)) {
- queue.clear();
- lock.unlock();
- return tasks;
- } else {
- lock.unlock();
- }
-
// Return tasks if some were found
if(!tasks.isEmpty()) return tasks;
+
if(!doWait) return null;
synchronized (querySupportLock) {
- lock.lock();
-
- // Just maybe someone inserted tasks and notified just before synchronized block
- if(tasks.addAll(queue)) {
- queue.clear();
- lock.unlock();
+ if(pumpTask())
return tasks;
- }
+
+// lock.lock();
// We are the last one awake
if(sleepers.incrementAndGet() == THREADS) {
if(querySupport == null) System.err.println("null qs");
querySupport.ceased(index);
- if(tasks.addAll(own)) {
- own.clear();
- }
- // System.err.println("tasks after ceased: " + tasks.size());
- if(!tasks.isEmpty()) {
- lock.unlock();
+ if(pumpTask()) {
+// lock.unlock();
return tasks;
}
// We are done
if(isDisposed()) {
threadStates[index] = ThreadState.DISPOSED;
- lock.unlock();
+// lock.unlock();
return null;
}
+
threadStates[index] = ThreadState.SLEEP;
- condition.await();
+
+ synchronized (querySupportLock) {
+ querySupportLock.wait(100);
+
+ }
+
+// boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
+// if(!woken) {
+// synchronized (querySupportLock) {
+// if(!processor.freeScheduling.isEmpty())
+// System.err.println("some tasks are available!");
+// }
+// }
+
+ sleepers.decrementAndGet();
// We are done
if(isDisposed()) {
threadStates[index] = ThreadState.DISPOSED;
- lock.unlock();
+ //lock.unlock();
return null;
}
threadStates[index] = ThreadState.RUN;
- lock.unlock();
+ //lock.unlock();
}
} catch (InterruptedException e) {
- e.printStackTrace();
-
+ LOGGER.error("Query handling (newTasks) was interrupted", e);
throw new RuntimeException("Querying was interrupted.", e);
}
boolean didExecute = false;
- for(int performer=0;performer<THREADS;performer++) {
- if(!delayQueues[index * THREADS + performer].isEmpty()) {
- synchronized(executors[performer]) {
- threadLocks[performer].lock();
- queues[performer].addAll(delayQueues[index * THREADS + performer]);
- delayQueues[index * THREADS + performer].clear();
- executors[performer].notify();
- threadLocks[performer].unlock();
- }
- }
- }
+// for(int performer=0;performer<THREADS;performer++) {
+// if(!delayQueues[index * THREADS + performer].isEmpty()) {
+// synchronized(executors[performer]) {
+// threadLocks[performer].lock();
+// queues[performer].addAll(delayQueues[index * THREADS + performer]);
+// delayQueues[index * THREADS + performer].clear();
+// executors[performer].notify();
+// threadLocks[performer].unlock();
+// }
+// }
+// }
if(tasks.isEmpty()) {
ArrayList<SessionTask> finished = newTasks(false, tasks);
SessionTask task = tasks.remove(tasks.size() - 1);
- if(task.syncCaller == index) {
- ownSyncTasks[index].add(task);
- } else {
+// if(task.syncCaller == index) {
+// ownSyncTasks[index].add(task);
+// } else {
task.run(index);
+// System.err.println("QT(s) " + index + " runs " + task);
didExecute = true;
- }
+// }
}
@Override
public void run() {
+ processor.thread.set(index);
+
QuerySupport support = this.querySupport;
try {
while(!tasks.isEmpty()) {
SessionTask task = tasks.remove(tasks.size()-1);
+// System.err.println("QT " + index + " runs " + task);
task.run(index);
}
- for(int performer=0;performer<THREADS;performer++) {
- if(!delayQueues[index * THREADS + performer].isEmpty()) {
- synchronized(executors[performer]) {
- threadLocks[performer].lock();
- queues[performer].addAll(delayQueues[index * THREADS + performer]);
- delayQueues[index * THREADS + performer].clear();
- executors[performer].notify();
- threadLocks[performer].unlock();
- }
- }
- }
+// for(int performer=0;performer<THREADS;performer++) {
+// if(!delayQueues[index * THREADS + performer].isEmpty()) {
+// synchronized(executors[performer]) {
+// threadLocks[performer].lock();
+// queues[performer].addAll(delayQueues[index * THREADS + performer]);
+// delayQueues[index * THREADS + performer].clear();
+// executors[performer].notify();
+// threadLocks[performer].unlock();
+// }
+// }
+// }
}
} catch (Throwable t) {
- Logger.defaultLogError("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
+ LOGGER.error("FATAL BUG: QueryThread task processing caused unexpected exception.", t);
support.exit(t);
} finally {