/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ /* * * @author Toni Kalajainen */ package org.simantics.utils.threads.ua; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; /** * ThreadPool contains a queue of work and a pool of threads processing them. *

* A thread can join the pool either as resident or as temporary. * Temporary thread works until work queue is empty and returns. * Thread work mode can be set with setMode(). *

* Current thread can yield to the service of ThreadPool with work()-invoke. *

* A new permanent worker thread can be spawned with newThread(). * This thread will be released when the ThreadPool is closed or explicitely with * setMode(thread, ThreadMode.Interrupted); *

* Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit(). *

* ThreadPool can be closed (or paused) with setState(). *

* TODO Use ThreadFactory * * @see SynchronizedWorker Use queue to ensure processing order of work. * @see Worker * @author Toni Kalajainen (toni.kalajainen@vtt.fi) */ public class ThreadPool extends AbstractState implements Worker, StatefulExecutor { private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool"); /** Constructs daemon threads of "ThreadPool" group */ private final static ThreadFactory FACTORY = new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(THREADGROUP, r, "WorkerThread"); t.setDaemon(true); return t; } }; LinkedList queue = new LinkedList(); Semaphore s = new Semaphore(0); Map threads = new HashMap(); // How many threads may be spawned on demand when new work is queued int spawnOnDemandLimit = 0; static ThreadPool INSTANCE; ThreadFactory threadFactory = FACTORY; /** * Get singleton instance. This pool has 0 resident threads and spawns * temporary threads up to the number of CPUs in the system. * NOTE! Do not use this shared resource for blocking actions. * * @return singleton instance which has one worker for each core */ public synchronized static ThreadPool getInstance() { if (INSTANCE == null) INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors()); return INSTANCE; } public enum ThreadMode { Resident, // Works and stays resident (sleeps if no work available) Temporary, // Works until queue is empty Interrupted // Stops working right away or after current task } public ThreadPool() { super(ExecutorState.Active); } /** * Create thread pool with 0 initial worker threads. * * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads. */ public ThreadPool(int maxNumberOfTemporaryThreads) { super(ExecutorState.Active); setSpawnOnDemandLimit(maxNumberOfTemporaryThreads); } public synchronized int getWorkingThreadCount() { int result = 0; for (ThreadMode m : threads.values()) if (m != ThreadMode.Interrupted) { result++; } return result; } /** * Sets the limit for how many threads may be spawned on demand when new * work is queued. *

* WorkPool spawns more threads when new work is queued and * the number of working threads goes below this value. *

* New thread is spawned if number of threads is less than queued word and * the number of threads is less than limit value. * * @param limit on demand thread limit */ public void setSpawnOnDemandLimit(int limit) { this.spawnOnDemandLimit = limit; } /** * Makes current thread yield to the service of the thread pool. * Service can be ended by invoking setMode(ThreadMode.Interrupted). * * @param stayResident if true works until interrupted, if false works until queue is empty */ public void work(boolean stayResident) { Thread t = Thread.currentThread(); synchronized(this) { threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary); } for (;;) { synchronized(this) { ThreadMode mode = getThreadMode(t); ExecutorState state = getState(); // Temporary threads can return if not working right now if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state))) mode = ThreadMode.Interrupted; if (mode == null || mode == ThreadMode.Interrupted) break; // Do not do anything if the worker is shut down if (state==ExecutorState.Terminated) break; if (state==ExecutorState.Paused) { // Remove temporary threads if paused if (mode==ThreadMode.Temporary) break; if (mode==ThreadMode.Resident) { try { waitForState(ExecutorState.NON_PAUSED_STATES); } catch (InterruptedException e) { } continue; } } } try { //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue; s.acquire(); WorkImpl work; synchronized(this) { work = queue.removeFirst(); } try { work.setState(WorkState.Working); work.getRunnable().run(); work.setState(WorkState.Complete); } catch (RuntimeException e) { work.setError(e); work.setState(WorkState.Error); } } catch (InterruptedException e) { } } // Go to shutdown state? boolean goToShutdown = false; synchronized(this) { threads.remove(t); // If this is last thread and queue is empty, then go to shutdown state goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty(); } // Go to shutdown state. if (goToShutdown) super.setState(ExecutorState.Terminated); } Runnable residentRun = new Runnable() { @Override public void run() { work(true); } }; Runnable tempRun = new Runnable() { @Override public void run() { work(false); } }; /** * Spawn new thread to the service of the thread pool * @param stayResident * @return new running thread */ public Thread newThread(boolean stayResident) { Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread"); synchronized(this) { threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary); } t.start(); return t; } public synchronized ThreadMode getThreadMode(Thread t) { return threads.get(t); } public synchronized void setMode(Thread t, ThreadMode tm) { if (t==null || tm==null) throw new IllegalArgumentException(); ThreadMode oldMode = threads.get(t); if (oldMode == tm) return; threads.put(t, tm); if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/) { t.interrupt(); } if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/) { t.interrupt(); } } /** * Is work queue empty * @return true if the work queue is empty */ public synchronized boolean isEmpty() { return queue.isEmpty(); } void interrupt(Thread t) { t.interrupt(); } public synchronized boolean remove(WorkImpl r) { return queue.remove(r); } @Override public WorkMonitor asyncExec(Runnable runnable, StateListener listener) throws WorkerClosedException { WorkImpl w = null; synchronized(this) { if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState())) throw new WorkerClosedException(); w = new WorkImpl(runnable); if (listener!=null) w.addStateListener(listener); queue.add(w); spawnTempThreads(); } s.release(); return w; } // Raise visibility @Override public boolean setState(ExecutorState state) { // If worker is already shutdown then it must not be reactivated if (getState() == ExecutorState.Terminated) return false; synchronized(this) { // Go directly to shutdown if queue is empty and there are no threads if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown) { if (threads.isEmpty() && queue.isEmpty()) { super.setState( ExecutorState.Shutdown ); super.setState( ExecutorState.Terminated ); } else { super.setState( ExecutorState.Shutdown ); } return true; } } return super.setState(state); } /** * Cancel all work. * @param mayInterrupt */ public synchronized void cancelAll(boolean mayInterrupt) { for (WorkMonitor w : queue) w.cancel(mayInterrupt); queue.clear(); } @Override protected void onStateTransition(ExecutorState oldState, ExecutorState newState) { if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState)) spawnTempThreads(); } private synchronized void spawnTempThreads() { if (!ExecutorState.WORKING_STATES.contains(getState())) return; if (spawnOnDemandLimit>0) { int workers = getWorkingThreadCount(); int n = spawnOnDemandLimit - workers; for (int i=0; i result) { result.addAll(queue); } public static void main(String[] args) throws InterruptedException { ThreadPool pool = new ThreadPool(2); pool.addStateListener(new StateListener() { @Override public void onStateTransition(IStatefulObject monitor, ExecutorState oldState, ExecutorState newState) { System.out.println("Worker State: "+newState); }}); StateListener l = new StateListener() { @Override public void onStateTransition(IStatefulObject monitor, WorkState oldState, WorkState newState) { System.out.println("Work ("+monitor+"): "+newState); } }; Runnable[] r = new Runnable[10]; WorkMonitor[] m = new WorkMonitor[10]; for (int i=0; i getQueue() { // return queue; // } }