package org.simantics.db.impl.query; import java.util.ArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import org.simantics.db.Session; import org.simantics.db.common.SessionThread; import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.impl.query.QueryProcessor.ThreadState; import org.slf4j.LoggerFactory; class QueryThread extends Thread implements SessionThread { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class); boolean disposed = false; private Semaphore exited = new Semaphore(0); final int index; private Session session; private QuerySupport querySupport; private final QueryProcessor processor; private final Semaphore requests; final private ArrayList tasks = new ArrayList(); // final private ArrayList own; // final private ArrayList ownSync; // final private ArrayList queue; // final private ReentrantLock lock; // final private Condition condition; final private Object querySupportLock; final private int THREADS; final private AtomicInteger sleepers; final private ThreadState[] threadStates; // final private ArrayList[] delayQueues; // final private QueryThread[] executors; // final private ReentrantLock[] threadLocks; // final private ArrayList[] queues; // final private ArrayList[] ownSyncTasks; public QueryThread(Session session, QueryProcessor processor, int index, String name) { super(QueryProcessor.QueryThreadGroup, null, name); this.session = session; this.processor = processor; this.index = index; // own = processor.ownTasks[index]; // ownSync = processor.ownSyncTasks[index]; // queue = processor.queues[index]; // lock = processor.threadLocks[index]; // condition = processor.threadConditions[index]; querySupportLock = processor.querySupportLock; THREADS = processor.THREADS; sleepers = processor.sleepers; querySupport = processor.querySupport; threadStates = processor.threadStates; requests = processor.requests; // delayQueues = processor.delayQueues; // executors = processor.executors; // threadLocks = processor.threadLocks; // queues = processor.queues; // ownSyncTasks = processor.ownSyncTasks; } synchronized void dispose() { // System.err.println("qt dispose"); disposed = true; // lock.lock(); // condition.signalAll(); // lock.unlock(); try { exited.acquire(); } catch (InterruptedException e) { LOGGER.error("dispose was interrupted", e); } session = null; querySupport = null; // System.err.println("qt disposed"); } boolean isDisposed() { return disposed; } public Session getSession() { return session; } private boolean pumpTask() { return processor.scheduling.pumpTask(tasks); } ArrayList newTasks(boolean doWait, ArrayList tasks) { try { while(true) { // Return tasks if some were found if(!tasks.isEmpty()) return tasks; if(!doWait) return null; synchronized (querySupportLock) { if(pumpTask()) return tasks; // lock.lock(); // We are the last one awake if(sleepers.incrementAndGet() == THREADS) { // Do not indicate sleeping yet sleepers.decrementAndGet(); // Ceased can create new own tasks if(querySupport == null) System.err.println("null qs"); querySupport.ceased(index); if(pumpTask()) { // lock.unlock(); return tasks; } // OK, now we are going to sleep sleepers.incrementAndGet(); } } // Nope nothing. Sleep & wait // Whoever releases this calls sleepers.decrementAndGet() // We are done if(isDisposed()) { threadStates[index] = ThreadState.DISPOSED; // lock.unlock(); return null; } threadStates[index] = ThreadState.SLEEP; requests.acquire(); sleepers.decrementAndGet(); // We are done if(isDisposed()) { threadStates[index] = ThreadState.DISPOSED; //lock.unlock(); return null; } threadStates[index] = ThreadState.RUN; //lock.unlock(); } } catch (InterruptedException e) { LOGGER.error("Query handling (newTasks) was interrupted", e); throw new RuntimeException("Querying was interrupted.", e); } } public boolean runSynchronized() { boolean didExecute = false; // for(int performer=0;performer finished = newTasks(false, tasks); if(finished == null) return didExecute; } while(!tasks.isEmpty()) { SessionTask task = tasks.remove(tasks.size() - 1); // if(task.syncCaller == index) { // ownSyncTasks[index].add(task); // } else { task.run(index); // System.err.println("QT(s) " + index + " runs " + task); didExecute = true; // } } return didExecute; } @Override public void run() { QuerySupport support = this.querySupport; try { while (true) { boolean finished = newTasks(true, tasks) == null; if(finished) { return; } while(!tasks.isEmpty()) { SessionTask task = tasks.remove(tasks.size()-1); task.run(0); } } } catch (Throwable t) { LOGGER.error("FATAL BUG: QueryThread task processing caused unexpected exception.", t); support.exit(t); } finally { exited.release(); } } }