package org.simantics.db.impl.query; import java.util.ArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.simantics.db.Session; import org.simantics.db.common.SessionThread; import org.simantics.db.common.utils.Logger; import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.impl.query.QueryProcessor.ThreadState; class QueryThread extends Thread implements SessionThread { boolean disposed = false; private Semaphore exited = new Semaphore(0); final int index; private Session session; private QuerySupport querySupport; final private ArrayList tasks = new ArrayList(); final private ArrayList own; final private ArrayList ownSync; final private ArrayList queue; final private ReentrantLock lock; final private Condition condition; final private Object querySupportLock; final private int THREADS; final private AtomicInteger sleepers; final private ThreadState[] threadStates; final private ArrayList[] delayQueues; final private QueryThread[] executors; final private ReentrantLock[] threadLocks; final private ArrayList[] queues; final private ArrayList[] ownSyncTasks; public QueryThread(Session session, QueryProcessor processor, int index, String name) { super(QueryProcessor.QueryThreadGroup, null, name); this.session = session; this.index = index; own = processor.ownTasks[index]; ownSync = processor.ownSyncTasks[index]; queue = processor.queues[index]; lock = processor.threadLocks[index]; condition = processor.threadConditions[index]; querySupportLock = processor.querySupportLock; THREADS = processor.THREADS; sleepers = processor.sleepers; querySupport = processor.querySupport; threadStates = processor.threadStates; delayQueues = processor.delayQueues; executors = processor.executors; threadLocks = processor.threadLocks; queues = processor.queues; ownSyncTasks = processor.ownSyncTasks; } synchronized void dispose() { // System.err.println("qt dispose"); disposed = true; lock.lock(); condition.signalAll(); lock.unlock(); try { exited.acquire(); } catch (InterruptedException e) { Logger.defaultLogError(e); } session = null; querySupport = null; // System.err.println("qt disposed"); } boolean isDisposed() { return disposed; } public Session getSession() { return session; } ArrayList newTasks(boolean doWait, ArrayList tasks) { try { while(true) { // Perform own tasks first if(tasks.addAll(own)) { own.clear(); } else if (doWait && !ownSync.isEmpty()) { tasks.add(ownSync.remove(ownSync.size()-1)); } // Try some queued tasks lock.lock(); if(tasks.addAll(queue)) { queue.clear(); lock.unlock(); return tasks; } else { lock.unlock(); } // Return tasks if some were found if(!tasks.isEmpty()) return tasks; if(!doWait) return null; synchronized (querySupportLock) { lock.lock(); // Just maybe someone inserted tasks and notified just before synchronized block if(tasks.addAll(queue)) { queue.clear(); lock.unlock(); return tasks; } // We are the last one awake if(sleepers.incrementAndGet() == THREADS) { // Do not indicate sleeping yet sleepers.decrementAndGet(); // Ceased can create new own tasks if(querySupport == null) System.err.println("null qs"); querySupport.ceased(index); if(tasks.addAll(own)) { own.clear(); } // System.err.println("tasks after ceased: " + tasks.size()); if(!tasks.isEmpty()) { lock.unlock(); return tasks; } // OK, now we are going to sleep sleepers.incrementAndGet(); } } // Nope nothing. Sleep & wait // Whoever releases this calls sleepers.decrementAndGet() // We are done if(isDisposed()) { threadStates[index] = ThreadState.DISPOSED; lock.unlock(); return null; } threadStates[index] = ThreadState.SLEEP; condition.await(); // We are done if(isDisposed()) { threadStates[index] = ThreadState.DISPOSED; lock.unlock(); return null; } threadStates[index] = ThreadState.RUN; lock.unlock(); } } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException("Querying was interrupted.", e); } } public boolean runSynchronized() { boolean didExecute = false; for(int performer=0;performer finished = newTasks(false, tasks); if(finished == null) return didExecute; } while(!tasks.isEmpty()) { SessionTask task = tasks.remove(tasks.size() - 1); if(task.syncCaller == index) { ownSyncTasks[index].add(task); } else { task.run(index); didExecute = true; } } return didExecute; } @Override public void run() { QuerySupport support = this.querySupport; try { while (true) { boolean finished = newTasks(true, tasks) == null; if(finished) { return; } while(!tasks.isEmpty()) { SessionTask task = tasks.remove(tasks.size()-1); task.run(index); } for(int performer=0;performer