X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryThread.java;fp=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryThread.java;h=f7c56a213d83ba92844fae162b002d713fe8931d;hb=1f8b50d81a1aa1bbd67a77f7cbc1060f2eb805d4;hp=07a390a51eafa254965f919fb8f03839b56576ff;hpb=86bb6157fdcb4a11caee57487cf8fa894eaae2dd;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java index 07a390a51..f7c56a213 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java @@ -2,6 +2,7 @@ package org.simantics.db.impl.query; import java.util.ArrayList; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -27,15 +28,15 @@ class QueryThread extends Thread implements SessionThread { // final private ArrayList own; // final private ArrayList ownSync; // final private ArrayList queue; - final private ReentrantLock lock; - final private Condition condition; +// final private ReentrantLock lock; +// final private Condition condition; final private Object querySupportLock; final private int THREADS; final private AtomicInteger sleepers; final private ThreadState[] threadStates; // final private ArrayList[] delayQueues; - final private QueryThread[] executors; - final private ReentrantLock[] threadLocks; +// final private QueryThread[] executors; +// final private ReentrantLock[] threadLocks; // final private ArrayList[] queues; // final private ArrayList[] ownSyncTasks; @@ -47,16 +48,16 @@ class QueryThread extends Thread implements SessionThread { // own = processor.ownTasks[index]; // ownSync = processor.ownSyncTasks[index]; // queue = processor.queues[index]; - lock = processor.threadLocks[index]; - condition = processor.threadConditions[index]; +// lock = processor.threadLocks[index]; +// condition = processor.threadConditions[index]; querySupportLock = processor.querySupportLock; THREADS = processor.THREADS; sleepers = processor.sleepers; querySupport = processor.querySupport; threadStates = processor.threadStates; // delayQueues = processor.delayQueues; - executors = processor.executors; - threadLocks = processor.threadLocks; +// executors = processor.executors; +// threadLocks = processor.threadLocks; // queues = processor.queues; // ownSyncTasks = processor.ownSyncTasks; } @@ -66,9 +67,9 @@ class QueryThread extends Thread implements SessionThread { // System.err.println("qt dispose"); disposed = true; - lock.lock(); - condition.signalAll(); - lock.unlock(); +// lock.lock(); +// condition.signalAll(); +// lock.unlock(); try { exited.acquire(); @@ -96,29 +97,20 @@ class QueryThread extends Thread implements SessionThread { } + private boolean pumpTask() { + if(!processor.freeScheduling.isEmpty()) { + tasks.add(processor.freeScheduling.removeFirst()); + return true; + } + return false; + } + ArrayList newTasks(boolean doWait, ArrayList tasks) { try { while(true) { - // Perform own tasks first -// if(tasks.addAll(own)) { -// own.clear(); -// } else if (doWait && !ownSync.isEmpty()) { -// tasks.add(ownSync.remove(ownSync.size()-1)); -// } - - // Try some queued tasks -// lock.lock(); -// if(tasks.addAll(queue)) { -// queue.clear(); -// lock.unlock(); -// return tasks; -// } else { -// lock.unlock(); -// } - // Return tasks if some were found if(!tasks.isEmpty()) return tasks; @@ -126,23 +118,10 @@ class QueryThread extends Thread implements SessionThread { synchronized (querySupportLock) { -// System.err.println("check free tasks for QT " + index + " (" + processor.freeScheduling + ")"); - - if(!processor.freeScheduling.isEmpty()) { - tasks.add(processor.freeScheduling.removeFirst()); + if(pumpTask()) return tasks; - } - lock.lock(); - - // Just maybe someone inserted tasks and notified just before synchronized block -// if(tasks.addAll(queue)) { -// queue.clear(); -// lock.unlock(); -// return tasks; -// } - -// System.err.println("QT " + index + ", sleepers = " + sleepers); +// lock.lock(); // We are the last one awake if(sleepers.incrementAndGet() == THREADS) { @@ -153,12 +132,8 @@ class QueryThread extends Thread implements SessionThread { if(querySupport == null) System.err.println("null qs"); querySupport.ceased(index); -// if(tasks.addAll(own)) { -// own.clear(); -// } - // System.err.println("tasks after ceased: " + tasks.size()); - if(!tasks.isEmpty()) { - lock.unlock(); + if(pumpTask()) { +// lock.unlock(); return tasks; } @@ -175,25 +150,38 @@ class QueryThread extends Thread implements SessionThread { // We are done if(isDisposed()) { threadStates[index] = ThreadState.DISPOSED; - lock.unlock(); +// lock.unlock(); return null; } + threadStates[index] = ThreadState.SLEEP; - condition.await(); + + synchronized (querySupportLock) { + querySupportLock.wait(100); + + } + +// boolean woken = condition.await(10, TimeUnit.MILLISECONDS); +// if(!woken) { +// synchronized (querySupportLock) { +// if(!processor.freeScheduling.isEmpty()) +// System.err.println("some tasks are available!"); +// } +// } sleepers.decrementAndGet(); // We are done if(isDisposed()) { threadStates[index] = ThreadState.DISPOSED; - lock.unlock(); + //lock.unlock(); return null; } threadStates[index] = ThreadState.RUN; - lock.unlock(); + //lock.unlock(); } @@ -249,6 +237,8 @@ class QueryThread extends Thread implements SessionThread { @Override public void run() { + processor.thread.set(index); + QuerySupport support = this.querySupport; try {