X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryThread.java;fp=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryThread.java;h=b8916f3ee6de0aab7ba8076ccecd88ed8d9c44b6;hb=969bd23cab98a79ca9101af33334000879fb60c5;hp=0000000000000000000000000000000000000000;hpb=866dba5cd5a3929bbeae85991796acb212338a08;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java new file mode 100644 index 000000000..b8916f3ee --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java @@ -0,0 +1,282 @@ +package org.simantics.db.impl.query; + +import java.util.ArrayList; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.simantics.db.Session; +import org.simantics.db.common.SessionThread; +import org.simantics.db.common.utils.Logger; +import org.simantics.db.impl.query.QueryProcessor.SessionTask; +import org.simantics.db.impl.query.QueryProcessor.ThreadState; + +class QueryThread extends Thread implements SessionThread { + + boolean disposed = false; + private Semaphore exited = new Semaphore(0); + + final int index; + + private Session session; + private QuerySupport querySupport; + + final private ArrayList tasks = new ArrayList(); + final private ArrayList own; + final private ArrayList ownSync; + final private ArrayList queue; + final private ReentrantLock lock; + final private Condition condition; + final private Object querySupportLock; + final private int THREADS; + final private AtomicInteger sleepers; + final private ThreadState[] threadStates; + final private ArrayList[] delayQueues; + final private QueryThread[] executors; + final private ReentrantLock[] threadLocks; + final private ArrayList[] queues; + final private ArrayList[] ownSyncTasks; + + public QueryThread(Session session, QueryProcessor processor, int index, String name) { + super(QueryProcessor.QueryThreadGroup, null, name); + this.session = session; + this.index = index; + own = processor.ownTasks[index]; + ownSync = processor.ownSyncTasks[index]; + queue = processor.queues[index]; + lock = processor.threadLocks[index]; + condition = processor.threadConditions[index]; + querySupportLock = processor.querySupportLock; + THREADS = processor.THREADS; + sleepers = processor.sleepers; + querySupport = processor.querySupport; + threadStates = processor.threadStates; + delayQueues = processor.delayQueues; + executors = processor.executors; + threadLocks = processor.threadLocks; + queues = processor.queues; + ownSyncTasks = processor.ownSyncTasks; + } + + synchronized void dispose() { + +// System.err.println("qt dispose"); + + disposed = true; + lock.lock(); + condition.signalAll(); + lock.unlock(); + + try { + exited.acquire(); + } catch (InterruptedException e) { + Logger.defaultLogError(e); + } + + session = null; + querySupport = null; + + +// System.err.println("qt disposed"); + + } + + boolean isDisposed() { + + return disposed; + + } + + public Session getSession() { + + return session; + + } + + ArrayList newTasks(boolean doWait, ArrayList tasks) { + + try { + + while(true) { + + // Perform own tasks first + if(tasks.addAll(own)) { + own.clear(); + } else if (doWait && !ownSync.isEmpty()) { + tasks.add(ownSync.remove(ownSync.size()-1)); + } + + // Try some queued tasks + lock.lock(); + if(tasks.addAll(queue)) { + queue.clear(); + lock.unlock(); + return tasks; + } else { + lock.unlock(); + } + + // Return tasks if some were found + if(!tasks.isEmpty()) return tasks; + if(!doWait) return null; + + synchronized (querySupportLock) { + + lock.lock(); + + // Just maybe someone inserted tasks and notified just before synchronized block + if(tasks.addAll(queue)) { + queue.clear(); + lock.unlock(); + return tasks; + } + + // We are the last one awake + if(sleepers.incrementAndGet() == THREADS) { + + // Do not indicate sleeping yet + sleepers.decrementAndGet(); + // Ceased can create new own tasks + if(querySupport == null) System.err.println("null qs"); + querySupport.ceased(index); + + if(tasks.addAll(own)) { + own.clear(); + } + // System.err.println("tasks after ceased: " + tasks.size()); + if(!tasks.isEmpty()) { + lock.unlock(); + return tasks; + } + + // OK, now we are going to sleep + sleepers.incrementAndGet(); + + } + + } + + // Nope nothing. Sleep & wait + // Whoever releases this calls sleepers.decrementAndGet() + + // We are done + if(isDisposed()) { + threadStates[index] = ThreadState.DISPOSED; + lock.unlock(); + return null; + } + + threadStates[index] = ThreadState.SLEEP; + condition.await(); + + // We are done + if(isDisposed()) { + threadStates[index] = ThreadState.DISPOSED; + lock.unlock(); + return null; + } + + threadStates[index] = ThreadState.RUN; + + lock.unlock(); + + } + + } catch (InterruptedException e) { + + e.printStackTrace(); + + throw new RuntimeException("Querying was interrupted.", e); + + } + + } + + public boolean runSynchronized() { + + boolean didExecute = false; + + for(int performer=0;performer finished = newTasks(false, tasks); + if(finished == null) return didExecute; + } + + while(!tasks.isEmpty()) { + + SessionTask task = tasks.remove(tasks.size() - 1); + + if(task.syncCaller == index) { + ownSyncTasks[index].add(task); + } else { + task.run(index); + didExecute = true; + } + + } + + return didExecute; + + } + + @Override + public void run() { + + QuerySupport support = this.querySupport; + + try { + + while (true) { + + boolean finished = newTasks(true, tasks) == null; + if(finished) { + return; + } + + while(!tasks.isEmpty()) { + + SessionTask task = tasks.remove(tasks.size()-1); + task.run(index); + + } + + for(int performer=0;performer