X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryThread.java;h=7aa89b3f032ea68f08e75f1d65bd0c0e87bdc030;hb=85fd9f0a35ca2d7018b316bc8bcc280513b47243;hp=a5e79244e10e16e532ad843e56a75cb64f2b763c;hpb=0ae2b770234dfc3cbb18bd38f324125cf0faca07;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java index a5e79244e..7aa89b3f0 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java @@ -3,17 +3,17 @@ package org.simantics.db.impl.query; import java.util.ArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import org.simantics.db.Session; import org.simantics.db.common.SessionThread; -import org.simantics.db.common.utils.Logger; import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.impl.query.QueryProcessor.ThreadState; +import org.slf4j.LoggerFactory; class QueryThread extends Thread implements SessionThread { + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class); + boolean disposed = false; private Semaphore exited = new Semaphore(0); @@ -21,42 +21,46 @@ class QueryThread extends Thread implements SessionThread { private Session session; private QuerySupport querySupport; + private final QueryProcessor processor; + private final Semaphore requests; final private ArrayList tasks = new ArrayList(); - final private ArrayList own; - final private ArrayList ownSync; - final private ArrayList queue; - final private ReentrantLock lock; - final private Condition condition; +// final private ArrayList own; +// final private ArrayList ownSync; +// final private ArrayList queue; +// final private ReentrantLock lock; +// final private Condition condition; final private Object querySupportLock; final private int THREADS; final private AtomicInteger sleepers; final private ThreadState[] threadStates; - final private ArrayList[] delayQueues; - final private QueryThread[] executors; - final private ReentrantLock[] threadLocks; - final private ArrayList[] queues; - final private ArrayList[] ownSyncTasks; +// final private ArrayList[] delayQueues; +// final private QueryThread[] executors; +// final private ReentrantLock[] threadLocks; +// final private ArrayList[] queues; +// final private ArrayList[] ownSyncTasks; public QueryThread(Session session, QueryProcessor processor, int index, String name) { super(QueryProcessor.QueryThreadGroup, null, name); this.session = session; + this.processor = processor; this.index = index; - own = processor.ownTasks[index]; - ownSync = processor.ownSyncTasks[index]; - queue = processor.queues[index]; - lock = processor.threadLocks[index]; - condition = processor.threadConditions[index]; +// own = processor.ownTasks[index]; +// ownSync = processor.ownSyncTasks[index]; +// queue = processor.queues[index]; +// lock = processor.threadLocks[index]; +// condition = processor.threadConditions[index]; querySupportLock = processor.querySupportLock; THREADS = processor.THREADS; sleepers = processor.sleepers; querySupport = processor.querySupport; threadStates = processor.threadStates; - delayQueues = processor.delayQueues; - executors = processor.executors; - threadLocks = processor.threadLocks; - queues = processor.queues; - ownSyncTasks = processor.ownSyncTasks; + requests = processor.requests; +// delayQueues = processor.delayQueues; +// executors = processor.executors; +// threadLocks = processor.threadLocks; +// queues = processor.queues; +// ownSyncTasks = processor.ownSyncTasks; } synchronized void dispose() { @@ -64,20 +68,19 @@ class QueryThread extends Thread implements SessionThread { // System.err.println("qt dispose"); disposed = true; - lock.lock(); - condition.signalAll(); - lock.unlock(); +// lock.lock(); +// condition.signalAll(); +// lock.unlock(); try { exited.acquire(); } catch (InterruptedException e) { - Logger.defaultLogError(e); + LOGGER.error("dispose was interrupted", e); } session = null; querySupport = null; - // System.err.println("qt disposed"); } @@ -94,43 +97,27 @@ class QueryThread extends Thread implements SessionThread { } + private boolean pumpTask() { + return processor.scheduling.pumpTask(tasks); + } + ArrayList newTasks(boolean doWait, ArrayList tasks) { try { while(true) { - // Perform own tasks first - if(tasks.addAll(own)) { - own.clear(); - } else if (doWait && !ownSync.isEmpty()) { - tasks.add(ownSync.remove(ownSync.size()-1)); - } - - // Try some queued tasks - lock.lock(); - if(tasks.addAll(queue)) { - queue.clear(); - lock.unlock(); - return tasks; - } else { - lock.unlock(); - } - // Return tasks if some were found if(!tasks.isEmpty()) return tasks; + if(!doWait) return null; synchronized (querySupportLock) { - lock.lock(); - - // Just maybe someone inserted tasks and notified just before synchronized block - if(tasks.addAll(queue)) { - queue.clear(); - lock.unlock(); + if(pumpTask()) return tasks; - } + +// lock.lock(); // We are the last one awake if(sleepers.incrementAndGet() == THREADS) { @@ -141,12 +128,8 @@ class QueryThread extends Thread implements SessionThread { if(querySupport == null) System.err.println("null qs"); querySupport.ceased(index); - if(tasks.addAll(own)) { - own.clear(); - } - // System.err.println("tasks after ceased: " + tasks.size()); - if(!tasks.isEmpty()) { - lock.unlock(); + if(pumpTask()) { +// lock.unlock(); return tasks; } @@ -163,30 +146,33 @@ class QueryThread extends Thread implements SessionThread { // We are done if(isDisposed()) { threadStates[index] = ThreadState.DISPOSED; - lock.unlock(); +// lock.unlock(); return null; } + threadStates[index] = ThreadState.SLEEP; - condition.await(); + + requests.acquire(); + + sleepers.decrementAndGet(); // We are done if(isDisposed()) { threadStates[index] = ThreadState.DISPOSED; - lock.unlock(); + //lock.unlock(); return null; } threadStates[index] = ThreadState.RUN; - lock.unlock(); + //lock.unlock(); } } catch (InterruptedException e) { - e.printStackTrace(); - + LOGGER.error("Query handling (newTasks) was interrupted", e); throw new RuntimeException("Querying was interrupted.", e); } @@ -197,17 +183,17 @@ class QueryThread extends Thread implements SessionThread { boolean didExecute = false; - for(int performer=0;performer finished = newTasks(false, tasks); @@ -218,12 +204,13 @@ class QueryThread extends Thread implements SessionThread { SessionTask task = tasks.remove(tasks.size() - 1); - if(task.syncCaller == index) { - ownSyncTasks[index].add(task); - } else { +// if(task.syncCaller == index) { +// ownSyncTasks[index].add(task); +// } else { task.run(index); +// System.err.println("QT(s) " + index + " runs " + task); didExecute = true; - } +// } } @@ -248,27 +235,15 @@ class QueryThread extends Thread implements SessionThread { while(!tasks.isEmpty()) { SessionTask task = tasks.remove(tasks.size()-1); - task.run(index); - - } + task.run(0); - for(int performer=0;performer