X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.utils.thread%2Fsrc%2Forg%2Fsimantics%2Futils%2Fthreads%2Fua%2FThreadPool.java;h=6738771d9eca009289e0a29149bb33254fd54572;hb=refs%2Fchanges%2F38%2F238%2F2;hp=cff196467f5871adbcdff42e86047e870e8d9368;hpb=24e2b34260f219f0d1644ca7a138894980e25b14;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java index cff196467..6738771d9 100644 --- a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java +++ b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java @@ -1,452 +1,452 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -/* - * - * @author Toni Kalajainen - */ -package org.simantics.utils.threads.ua; - -import java.util.Collection; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; - - -/** - * ThreadPool contains a queue of work and a pool of threads processing them. - *
- * A thread can join the pool either as resident or as temporary. - * Temporary thread works until work queue is empty and returns. - * Thread work mode can be set with setMode(). - *
- * Current thread can yield to the service of ThreadPool with work()-invoke. - *
- * A new permanent worker thread can be spawned with newThread(). - * This thread will be released when the ThreadPool is closed or explicitely with - * setMode(thread, ThreadMode.Interrupted); - *
- * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit(). - *
- * ThreadPool can be closed (or paused) with setState(). - *
- * TODO Use ThreadFactory
- *
- * @see SynchronizedWorker Use queue to ensure processing order of work.
- * @see Worker
- * @author Toni Kalajainen (toni.kalajainen@vtt.fi)
- */
-public class ThreadPool extends AbstractState
- * WorkPool spawns more threads when new work is queued and
- * the number of working threads goes below this value.
- *
- * New thread is spawned if number of threads is less than queued word and
- * the number of threads is less than
+ * A thread can join the pool either as resident or as temporary.
+ * Temporary thread works until work queue is empty and returns.
+ * Thread work mode can be set with setMode().
+ *
+ * Current thread can yield to the service of ThreadPool with work()-invoke.
+ *
+ * A new permanent worker thread can be spawned with newThread().
+ * This thread will be released when the ThreadPool is closed or explicitely with
+ * setMode(thread, ThreadMode.Interrupted);
+ *
+ * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit().
+ *
+ * ThreadPool can be closed (or paused) with setState().
+ *
+ * TODO Use ThreadFactory
+ *
+ * @see SynchronizedWorker Use queue to ensure processing order of work.
+ * @see Worker
+ * @author Toni Kalajainen (toni.kalajainen@vtt.fi)
+ */
+public class ThreadPool extends AbstractState
+ * WorkPool spawns more threads when new work is queued and
+ * the number of working threads goes below this value.
+ *
+ * New thread is spawned if number of threads is less than queued word and
+ * the number of threads is less than limit
value.
- *
- * @param limit on demand thread limit
- */
- public void setSpawnOnDemandLimit(int limit)
- {
- this.spawnOnDemandLimit = limit;
- }
-
- /**
- * Makes current thread yield to the service of the thread pool.
- * Service can be ended by invoking setMode(ThreadMode.Interrupted).
- *
- * @param stayResident if true works until interrupted, if false works until queue is empty
- */
- public void work(boolean stayResident)
- {
- Thread t = Thread.currentThread();
- synchronized(this) {
- threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
- }
- for (;;) {
- synchronized(this) {
- ThreadMode mode = getThreadMode(t);
- ExecutorState state = getState();
-
- // Temporary threads can return if not working right now
- if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state)))
- mode = ThreadMode.Interrupted;
-
- if (mode == null || mode == ThreadMode.Interrupted) break;
-
- // Do not do anything if the worker is shut down
- if (state==ExecutorState.Terminated) break;
-
-
- if (state==ExecutorState.Paused) {
- // Remove temporary threads if paused
- if (mode==ThreadMode.Temporary) break;
- if (mode==ThreadMode.Resident) {
- try {
- waitForState(ExecutorState.NON_PAUSED_STATES);
- } catch (InterruptedException e) {
- }
- continue;
- }
- }
-
- }
- try {
- //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;
- s.acquire();
- WorkImpl work;
- synchronized(this) {
- work = queue.removeFirst();
- }
- try {
- work.setState(WorkState.Working);
- work.getRunnable().run();
- work.setState(WorkState.Complete);
- } catch (RuntimeException e) {
- work.setError(e);
- work.setState(WorkState.Error);
- }
- } catch (InterruptedException e) {
- }
- }
- // Go to shutdown state?
- boolean goToShutdown = false;
- synchronized(this) {
- threads.remove(t);
- // If this is last thread and queue is empty, then go to shutdown state
- goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();
- }
- // Go to shutdown state.
- if (goToShutdown)
- super.setState(ExecutorState.Terminated);
- }
-
- Runnable residentRun = new Runnable()
- {
- @Override
- public void run() {
- work(true);
- }
- };
-
- Runnable tempRun = new Runnable()
- {
- @Override
- public void run() {
- work(false);
- }
- };
-
- /**
- * Spawn new thread to the service of the thread pool
- * @param stayResident
- * @return new running thread
- */
- public Thread newThread(boolean stayResident)
- {
- Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");
-
- synchronized(this) {
- threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
- }
- t.start();
- return t;
- }
-
- public synchronized ThreadMode getThreadMode(Thread t)
- {
- return threads.get(t);
- }
-
- public synchronized void setMode(Thread t, ThreadMode tm)
- {
- if (t==null || tm==null) throw new IllegalArgumentException();
- ThreadMode oldMode = threads.get(t);
- if (oldMode == tm) return;
- threads.put(t, tm);
- if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)
- {
- t.interrupt();
- }
- if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/)
- {
- t.interrupt();
- }
- }
-
- /**
- * Is work queue empty
- * @return true if the work queue is empty
- */
- public synchronized boolean isEmpty()
- {
- return queue.isEmpty();
- }
-
- void interrupt(Thread t)
- {
- t.interrupt();
- }
-
- public synchronized boolean remove(WorkImpl r)
- {
- return queue.remove(r);
- }
-
- @Override
- public WorkMonitor asyncExec(Runnable runnable, StateListenerlimit
value.
+ *
+ * @param limit on demand thread limit
+ */
+ public void setSpawnOnDemandLimit(int limit)
+ {
+ this.spawnOnDemandLimit = limit;
+ }
+
+ /**
+ * Makes current thread yield to the service of the thread pool.
+ * Service can be ended by invoking setMode(ThreadMode.Interrupted).
+ *
+ * @param stayResident if true works until interrupted, if false works until queue is empty
+ */
+ public void work(boolean stayResident)
+ {
+ Thread t = Thread.currentThread();
+ synchronized(this) {
+ threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
+ }
+ for (;;) {
+ synchronized(this) {
+ ThreadMode mode = getThreadMode(t);
+ ExecutorState state = getState();
+
+ // Temporary threads can return if not working right now
+ if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state)))
+ mode = ThreadMode.Interrupted;
+
+ if (mode == null || mode == ThreadMode.Interrupted) break;
+
+ // Do not do anything if the worker is shut down
+ if (state==ExecutorState.Terminated) break;
+
+
+ if (state==ExecutorState.Paused) {
+ // Remove temporary threads if paused
+ if (mode==ThreadMode.Temporary) break;
+ if (mode==ThreadMode.Resident) {
+ try {
+ waitForState(ExecutorState.NON_PAUSED_STATES);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+ }
+
+ }
+ try {
+ //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;
+ s.acquire();
+ WorkImpl work;
+ synchronized(this) {
+ work = queue.removeFirst();
+ }
+ try {
+ work.setState(WorkState.Working);
+ work.getRunnable().run();
+ work.setState(WorkState.Complete);
+ } catch (RuntimeException e) {
+ work.setError(e);
+ work.setState(WorkState.Error);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ // Go to shutdown state?
+ boolean goToShutdown = false;
+ synchronized(this) {
+ threads.remove(t);
+ // If this is last thread and queue is empty, then go to shutdown state
+ goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();
+ }
+ // Go to shutdown state.
+ if (goToShutdown)
+ super.setState(ExecutorState.Terminated);
+ }
+
+ Runnable residentRun = new Runnable()
+ {
+ @Override
+ public void run() {
+ work(true);
+ }
+ };
+
+ Runnable tempRun = new Runnable()
+ {
+ @Override
+ public void run() {
+ work(false);
+ }
+ };
+
+ /**
+ * Spawn new thread to the service of the thread pool
+ * @param stayResident
+ * @return new running thread
+ */
+ public Thread newThread(boolean stayResident)
+ {
+ Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");
+
+ synchronized(this) {
+ threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
+ }
+ t.start();
+ return t;
+ }
+
+ public synchronized ThreadMode getThreadMode(Thread t)
+ {
+ return threads.get(t);
+ }
+
+ public synchronized void setMode(Thread t, ThreadMode tm)
+ {
+ if (t==null || tm==null) throw new IllegalArgumentException();
+ ThreadMode oldMode = threads.get(t);
+ if (oldMode == tm) return;
+ threads.put(t, tm);
+ if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)
+ {
+ t.interrupt();
+ }
+ if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/)
+ {
+ t.interrupt();
+ }
+ }
+
+ /**
+ * Is work queue empty
+ * @return true if the work queue is empty
+ */
+ public synchronized boolean isEmpty()
+ {
+ return queue.isEmpty();
+ }
+
+ void interrupt(Thread t)
+ {
+ t.interrupt();
+ }
+
+ public synchronized boolean remove(WorkImpl r)
+ {
+ return queue.remove(r);
+ }
+
+ @Override
+ public WorkMonitor asyncExec(Runnable runnable, StateListener