/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ /* * * @author Toni Kalajainen */ package org.simantics.utils.threads.ua; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; /** * ThreadPool contains a queue of work and a pool of threads processing them. *
* A thread can join the pool either as resident or as temporary. * Temporary thread works until work queue is empty and returns. * Thread work mode can be set with setMode(). *
* Current thread can yield to the service of ThreadPool with work()-invoke. *
* A new permanent worker thread can be spawned with newThread(). * This thread will be released when the ThreadPool is closed or explicitely with * setMode(thread, ThreadMode.Interrupted); *
* Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit(). *
* ThreadPool can be closed (or paused) with setState(). *
* TODO Use ThreadFactory
*
* @see SynchronizedWorker Use queue to ensure processing order of work.
* @see Worker
* @author Toni Kalajainen (toni.kalajainen@vtt.fi)
*/
public class ThreadPool extends AbstractState
* WorkPool spawns more threads when new work is queued and
* the number of working threads goes below this value.
*
* New thread is spawned if number of threads is less than queued word and
* the number of threads is less than limit
value.
*
* @param limit on demand thread limit
*/
public void setSpawnOnDemandLimit(int limit)
{
this.spawnOnDemandLimit = limit;
}
/**
* Makes current thread yield to the service of the thread pool.
* Service can be ended by invoking setMode(ThreadMode.Interrupted).
*
* @param stayResident if true works until interrupted, if false works until queue is empty
*/
public void work(boolean stayResident)
{
Thread t = Thread.currentThread();
synchronized(this) {
threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
}
for (;;) {
synchronized(this) {
ThreadMode mode = getThreadMode(t);
ExecutorState state = getState();
// Temporary threads can return if not working right now
if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state)))
mode = ThreadMode.Interrupted;
if (mode == null || mode == ThreadMode.Interrupted) break;
// Do not do anything if the worker is shut down
if (state==ExecutorState.Terminated) break;
if (state==ExecutorState.Paused) {
// Remove temporary threads if paused
if (mode==ThreadMode.Temporary) break;
if (mode==ThreadMode.Resident) {
try {
waitForState(ExecutorState.NON_PAUSED_STATES);
} catch (InterruptedException e) {
}
continue;
}
}
}
try {
//s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;
s.acquire();
WorkImpl work;
synchronized(this) {
work = queue.removeFirst();
}
try {
work.setState(WorkState.Working);
work.getRunnable().run();
work.setState(WorkState.Complete);
} catch (RuntimeException e) {
work.setError(e);
work.setState(WorkState.Error);
}
} catch (InterruptedException e) {
}
}
// Go to shutdown state?
boolean goToShutdown = false;
synchronized(this) {
threads.remove(t);
// If this is last thread and queue is empty, then go to shutdown state
goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();
}
// Go to shutdown state.
if (goToShutdown)
super.setState(ExecutorState.Terminated);
}
Runnable residentRun = new Runnable()
{
@Override
public void run() {
work(true);
}
};
Runnable tempRun = new Runnable()
{
@Override
public void run() {
work(false);
}
};
/**
* Spawn new thread to the service of the thread pool
* @param stayResident
* @return new running thread
*/
public Thread newThread(boolean stayResident)
{
Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");
synchronized(this) {
threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
}
t.start();
return t;
}
public synchronized ThreadMode getThreadMode(Thread t)
{
return threads.get(t);
}
public synchronized void setMode(Thread t, ThreadMode tm)
{
if (t==null || tm==null) throw new IllegalArgumentException();
ThreadMode oldMode = threads.get(t);
if (oldMode == tm) return;
threads.put(t, tm);
if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)
{
t.interrupt();
}
if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/)
{
t.interrupt();
}
}
/**
* Is work queue empty
* @return true if the work queue is empty
*/
public synchronized boolean isEmpty()
{
return queue.isEmpty();
}
void interrupt(Thread t)
{
t.interrupt();
}
public synchronized boolean remove(WorkImpl r)
{
return queue.remove(r);
}
@Override
public WorkMonitor asyncExec(Runnable runnable, StateListener