X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryThread.java;h=ab10efa0baab252a1ff5ef8b8485c313b2b32db1;hp=a5e79244e10e16e532ad843e56a75cb64f2b763c;hb=0d9b90834ce56b292c00b1a39850ed842c3e4d42;hpb=e5db6157fd8722c946613d4e46d7aaf6bfa92609 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java index a5e79244e..ab10efa0b 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java @@ -3,17 +3,17 @@ package org.simantics.db.impl.query; import java.util.ArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import org.simantics.db.Session; import org.simantics.db.common.SessionThread; -import org.simantics.db.common.utils.Logger; import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.impl.query.QueryProcessor.ThreadState; +import org.slf4j.LoggerFactory; class QueryThread extends Thread implements SessionThread { + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QueryThread.class); + boolean disposed = false; private Semaphore exited = new Semaphore(0); @@ -21,42 +21,44 @@ class QueryThread extends Thread implements SessionThread { private Session session; private QuerySupport querySupport; + private final QueryProcessor processor; final private ArrayList tasks = new ArrayList(); - final private ArrayList own; - final private ArrayList ownSync; - final private ArrayList queue; - final private ReentrantLock lock; - final private Condition condition; +// final private ArrayList own; +// final private ArrayList ownSync; +// final private ArrayList queue; +// final private ReentrantLock lock; +// final private Condition condition; final private Object querySupportLock; final private int THREADS; final private AtomicInteger sleepers; final private ThreadState[] threadStates; - final private ArrayList[] delayQueues; - final private QueryThread[] executors; - final private ReentrantLock[] threadLocks; - final private ArrayList[] queues; - final private ArrayList[] ownSyncTasks; +// final private ArrayList[] delayQueues; +// final private QueryThread[] executors; +// final private ReentrantLock[] threadLocks; +// final private ArrayList[] queues; +// final private ArrayList[] ownSyncTasks; public QueryThread(Session session, QueryProcessor processor, int index, String name) { super(QueryProcessor.QueryThreadGroup, null, name); this.session = session; + this.processor = processor; this.index = index; - own = processor.ownTasks[index]; - ownSync = processor.ownSyncTasks[index]; - queue = processor.queues[index]; - lock = processor.threadLocks[index]; - condition = processor.threadConditions[index]; +// own = processor.ownTasks[index]; +// ownSync = processor.ownSyncTasks[index]; +// queue = processor.queues[index]; +// lock = processor.threadLocks[index]; +// condition = processor.threadConditions[index]; querySupportLock = processor.querySupportLock; THREADS = processor.THREADS; sleepers = processor.sleepers; querySupport = processor.querySupport; threadStates = processor.threadStates; - delayQueues = processor.delayQueues; - executors = processor.executors; - threadLocks = processor.threadLocks; - queues = processor.queues; - ownSyncTasks = processor.ownSyncTasks; +// delayQueues = processor.delayQueues; +// executors = processor.executors; +// threadLocks = processor.threadLocks; +// queues = processor.queues; +// ownSyncTasks = processor.ownSyncTasks; } synchronized void dispose() { @@ -64,20 +66,19 @@ class QueryThread extends Thread implements SessionThread { // System.err.println("qt dispose"); disposed = true; - lock.lock(); - condition.signalAll(); - lock.unlock(); +// lock.lock(); +// condition.signalAll(); +// lock.unlock(); try { exited.acquire(); } catch (InterruptedException e) { - Logger.defaultLogError(e); + LOGGER.error("dispose was interrupted", e); } session = null; querySupport = null; - // System.err.println("qt disposed"); } @@ -94,43 +95,31 @@ class QueryThread extends Thread implements SessionThread { } + private boolean pumpTask() { + if(!processor.freeScheduling.isEmpty()) { + tasks.add(processor.freeScheduling.removeFirst()); + return true; + } + return false; + } + ArrayList newTasks(boolean doWait, ArrayList tasks) { try { while(true) { - // Perform own tasks first - if(tasks.addAll(own)) { - own.clear(); - } else if (doWait && !ownSync.isEmpty()) { - tasks.add(ownSync.remove(ownSync.size()-1)); - } - - // Try some queued tasks - lock.lock(); - if(tasks.addAll(queue)) { - queue.clear(); - lock.unlock(); - return tasks; - } else { - lock.unlock(); - } - // Return tasks if some were found if(!tasks.isEmpty()) return tasks; + if(!doWait) return null; synchronized (querySupportLock) { - lock.lock(); - - // Just maybe someone inserted tasks and notified just before synchronized block - if(tasks.addAll(queue)) { - queue.clear(); - lock.unlock(); + if(pumpTask()) return tasks; - } + +// lock.lock(); // We are the last one awake if(sleepers.incrementAndGet() == THREADS) { @@ -141,12 +130,8 @@ class QueryThread extends Thread implements SessionThread { if(querySupport == null) System.err.println("null qs"); querySupport.ceased(index); - if(tasks.addAll(own)) { - own.clear(); - } - // System.err.println("tasks after ceased: " + tasks.size()); - if(!tasks.isEmpty()) { - lock.unlock(); + if(pumpTask()) { +// lock.unlock(); return tasks; } @@ -163,30 +148,44 @@ class QueryThread extends Thread implements SessionThread { // We are done if(isDisposed()) { threadStates[index] = ThreadState.DISPOSED; - lock.unlock(); +// lock.unlock(); return null; } + threadStates[index] = ThreadState.SLEEP; - condition.await(); + + synchronized (querySupportLock) { + querySupportLock.wait(100); + + } + +// boolean woken = condition.await(10, TimeUnit.MILLISECONDS); +// if(!woken) { +// synchronized (querySupportLock) { +// if(!processor.freeScheduling.isEmpty()) +// System.err.println("some tasks are available!"); +// } +// } + + sleepers.decrementAndGet(); // We are done if(isDisposed()) { threadStates[index] = ThreadState.DISPOSED; - lock.unlock(); + //lock.unlock(); return null; } threadStates[index] = ThreadState.RUN; - lock.unlock(); + //lock.unlock(); } } catch (InterruptedException e) { - e.printStackTrace(); - + LOGGER.error("Query handling (newTasks) was interrupted", e); throw new RuntimeException("Querying was interrupted.", e); } @@ -197,17 +196,17 @@ class QueryThread extends Thread implements SessionThread { boolean didExecute = false; - for(int performer=0;performer finished = newTasks(false, tasks); @@ -218,12 +217,13 @@ class QueryThread extends Thread implements SessionThread { SessionTask task = tasks.remove(tasks.size() - 1); - if(task.syncCaller == index) { - ownSyncTasks[index].add(task); - } else { +// if(task.syncCaller == index) { +// ownSyncTasks[index].add(task); +// } else { task.run(index); +// System.err.println("QT(s) " + index + " runs " + task); didExecute = true; - } +// } } @@ -234,6 +234,8 @@ class QueryThread extends Thread implements SessionThread { @Override public void run() { + processor.thread.set(index); + QuerySupport support = this.querySupport; try { @@ -248,27 +250,28 @@ class QueryThread extends Thread implements SessionThread { while(!tasks.isEmpty()) { SessionTask task = tasks.remove(tasks.size()-1); +// System.err.println("QT " + index + " runs " + task); task.run(index); } - for(int performer=0;performer