X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryThread.java;h=ab10efa0baab252a1ff5ef8b8485c313b2b32db1;hp=b8916f3ee6de0aab7ba8076ccecd88ed8d9c44b6;hb=0d9b90834ce56b292c00b1a39850ed842c3e4d42;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java index b8916f3ee..ab10efa0b 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java @@ -1,282 +1,285 @@ -package org.simantics.db.impl.query; - -import java.util.ArrayList; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.simantics.db.Session; -import org.simantics.db.common.SessionThread; -import org.simantics.db.common.utils.Logger; -import org.simantics.db.impl.query.QueryProcessor.SessionTask; -import org.simantics.db.impl.query.QueryProcessor.ThreadState; - -class QueryThread extends Thread implements SessionThread { - - boolean disposed = false; - private Semaphore exited = new Semaphore(0); - - final int index; - - private Session session; - private QuerySupport querySupport; - - final private ArrayList tasks = new ArrayList(); - final private ArrayList own; - final private ArrayList ownSync; - final private ArrayList queue; - final private ReentrantLock lock; - final private Condition condition; - final private Object querySupportLock; - final private int THREADS; - final private AtomicInteger sleepers; - final private ThreadState[] threadStates; - final private ArrayList[] delayQueues; - final private QueryThread[] executors; - final private ReentrantLock[] threadLocks; - final private ArrayList[] queues; - final private ArrayList[] ownSyncTasks; - - public QueryThread(Session session, QueryProcessor processor, int index, String name) { - super(QueryProcessor.QueryThreadGroup, null, name); - this.session = session; - this.index = index; - own = processor.ownTasks[index]; - ownSync = processor.ownSyncTasks[index]; - queue = processor.queues[index]; - lock = processor.threadLocks[index]; - condition = processor.threadConditions[index]; - querySupportLock = processor.querySupportLock; - THREADS = processor.THREADS; - sleepers = processor.sleepers; - querySupport = processor.querySupport; - threadStates = processor.threadStates; - delayQueues = processor.delayQueues; - executors = processor.executors; - threadLocks = processor.threadLocks; - queues = processor.queues; - ownSyncTasks = processor.ownSyncTasks; - } - - synchronized void dispose() { - -// System.err.println("qt dispose"); - - disposed = true; - lock.lock(); - condition.signalAll(); - lock.unlock(); - - try { - exited.acquire(); - } catch (InterruptedException e) { - Logger.defaultLogError(e); - } - - session = null; - querySupport = null; - - -// System.err.println("qt disposed"); - - } - - boolean isDisposed() { - - return disposed; - - } - - public Session getSession() { - - return session; - - } - - ArrayList newTasks(boolean doWait, ArrayList tasks) { - - try { - - while(true) { - - // Perform own tasks first - if(tasks.addAll(own)) { - own.clear(); - } else if (doWait && !ownSync.isEmpty()) { - tasks.add(ownSync.remove(ownSync.size()-1)); - } - - // Try some queued tasks - lock.lock(); - if(tasks.addAll(queue)) { - queue.clear(); - lock.unlock(); - return tasks; - } else { - lock.unlock(); - } - - // Return tasks if some were found - if(!tasks.isEmpty()) return tasks; - if(!doWait) return null; - - synchronized (querySupportLock) { - - lock.lock(); - - // Just maybe someone inserted tasks and notified just before synchronized block - if(tasks.addAll(queue)) { - queue.clear(); - lock.unlock(); - return tasks; - } - - // We are the last one awake - if(sleepers.incrementAndGet() == THREADS) { - - // Do not indicate sleeping yet - sleepers.decrementAndGet(); - // Ceased can create new own tasks - if(querySupport == null) System.err.println("null qs"); - querySupport.ceased(index); - - if(tasks.addAll(own)) { - own.clear(); - } - // System.err.println("tasks after ceased: " + tasks.size()); - if(!tasks.isEmpty()) { - lock.unlock(); - return tasks; - } - - // OK, now we are going to sleep - sleepers.incrementAndGet(); - - } - - } - - // Nope nothing. Sleep & wait - // Whoever releases this calls sleepers.decrementAndGet() - - // We are done - if(isDisposed()) { - threadStates[index] = ThreadState.DISPOSED; - lock.unlock(); - return null; - } - - threadStates[index] = ThreadState.SLEEP; - condition.await(); - - // We are done - if(isDisposed()) { - threadStates[index] = ThreadState.DISPOSED; - lock.unlock(); - return null; - } - - threadStates[index] = ThreadState.RUN; - - lock.unlock(); - - } - - } catch (InterruptedException e) { - - e.printStackTrace(); - - throw new RuntimeException("Querying was interrupted.", e); - - } - - } - - public boolean runSynchronized() { - - boolean didExecute = false; - - for(int performer=0;performer finished = newTasks(false, tasks); - if(finished == null) return didExecute; - } - - while(!tasks.isEmpty()) { - - SessionTask task = tasks.remove(tasks.size() - 1); - - if(task.syncCaller == index) { - ownSyncTasks[index].add(task); - } else { - task.run(index); - didExecute = true; - } - - } - - return didExecute; - - } - - @Override - public void run() { - - QuerySupport support = this.querySupport; - - try { - - while (true) { - - boolean finished = newTasks(true, tasks) == null; - if(finished) { - return; - } - - while(!tasks.isEmpty()) { - - SessionTask task = tasks.remove(tasks.size()-1); - task.run(index); - - } - - for(int performer=0;performer tasks = new ArrayList(); +// final private ArrayList own; +// final private ArrayList ownSync; +// final private ArrayList queue; +// final private ReentrantLock lock; +// final private Condition condition; + final private Object querySupportLock; + final private int THREADS; + final private AtomicInteger sleepers; + final private ThreadState[] threadStates; +// final private ArrayList[] delayQueues; +// final private QueryThread[] executors; +// final private ReentrantLock[] threadLocks; +// final private ArrayList[] queues; +// final private ArrayList[] ownSyncTasks; + + public QueryThread(Session session, QueryProcessor processor, int index, String name) { + super(QueryProcessor.QueryThreadGroup, null, name); + this.session = session; + this.processor = processor; + this.index = index; +// own = processor.ownTasks[index]; +// ownSync = processor.ownSyncTasks[index]; +// queue = processor.queues[index]; +// lock = processor.threadLocks[index]; +// condition = processor.threadConditions[index]; + querySupportLock = processor.querySupportLock; + THREADS = processor.THREADS; + sleepers = processor.sleepers; + querySupport = processor.querySupport; + threadStates = processor.threadStates; +// delayQueues = processor.delayQueues; +// executors = processor.executors; +// threadLocks = processor.threadLocks; +// queues = processor.queues; +// ownSyncTasks = processor.ownSyncTasks; + } + + synchronized void dispose() { + +// System.err.println("qt dispose"); + + disposed = true; +// lock.lock(); +// condition.signalAll(); +// lock.unlock(); + + try { + exited.acquire(); + } catch (InterruptedException e) { + LOGGER.error("dispose was interrupted", e); + } + + session = null; + querySupport = null; + +// System.err.println("qt disposed"); + + } + + boolean isDisposed() { + + return disposed; + + } + + public Session getSession() { + + return session; + + } + + private boolean pumpTask() { + if(!processor.freeScheduling.isEmpty()) { + tasks.add(processor.freeScheduling.removeFirst()); + return true; + } + return false; + } + + ArrayList newTasks(boolean doWait, ArrayList tasks) { + + try { + + while(true) { + + // Return tasks if some were found + if(!tasks.isEmpty()) return tasks; + + if(!doWait) return null; + + synchronized (querySupportLock) { + + if(pumpTask()) + return tasks; + +// lock.lock(); + + // We are the last one awake + if(sleepers.incrementAndGet() == THREADS) { + + // Do not indicate sleeping yet + sleepers.decrementAndGet(); + // Ceased can create new own tasks + if(querySupport == null) System.err.println("null qs"); + querySupport.ceased(index); + + if(pumpTask()) { +// lock.unlock(); + return tasks; + } + + // OK, now we are going to sleep + sleepers.incrementAndGet(); + + } + + } + + // Nope nothing. Sleep & wait + // Whoever releases this calls sleepers.decrementAndGet() + + // We are done + if(isDisposed()) { + threadStates[index] = ThreadState.DISPOSED; +// lock.unlock(); + return null; + } + + + threadStates[index] = ThreadState.SLEEP; + + synchronized (querySupportLock) { + querySupportLock.wait(100); + + } + +// boolean woken = condition.await(10, TimeUnit.MILLISECONDS); +// if(!woken) { +// synchronized (querySupportLock) { +// if(!processor.freeScheduling.isEmpty()) +// System.err.println("some tasks are available!"); +// } +// } + + sleepers.decrementAndGet(); + + // We are done + if(isDisposed()) { + threadStates[index] = ThreadState.DISPOSED; + //lock.unlock(); + return null; + } + + threadStates[index] = ThreadState.RUN; + + //lock.unlock(); + + } + + } catch (InterruptedException e) { + + LOGGER.error("Query handling (newTasks) was interrupted", e); + throw new RuntimeException("Querying was interrupted.", e); + + } + + } + + public boolean runSynchronized() { + + boolean didExecute = false; + +// for(int performer=0;performer finished = newTasks(false, tasks); + if(finished == null) return didExecute; + } + + while(!tasks.isEmpty()) { + + SessionTask task = tasks.remove(tasks.size() - 1); + +// if(task.syncCaller == index) { +// ownSyncTasks[index].add(task); +// } else { + task.run(index); +// System.err.println("QT(s) " + index + " runs " + task); + didExecute = true; +// } + + } + + return didExecute; + + } + + @Override + public void run() { + + processor.thread.set(index); + + QuerySupport support = this.querySupport; + + try { + + while (true) { + + boolean finished = newTasks(true, tasks) == null; + if(finished) { + return; + } + + while(!tasks.isEmpty()) { + + SessionTask task = tasks.remove(tasks.size()-1); +// System.err.println("QT " + index + " runs " + task); + task.run(index); + + } + +// for(int performer=0;performer