1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
14 * @author Toni Kalajainen
16 package org.simantics.utils.threads.ua;
18 import java.util.Collection;
19 import java.util.EnumSet;
20 import java.util.HashMap;
21 import java.util.LinkedList;
23 import java.util.concurrent.Semaphore;
24 import java.util.concurrent.ThreadFactory;
28 * ThreadPool contains a queue of work and a pool of threads processing them.
30 * A thread can join the pool either as resident or as temporary.
31 * Temporary thread works until work queue is empty and returns.
32 * Thread work mode can be set with setMode().
34 * Current thread can yield to the service of ThreadPool with work()-invoke.
36 * A new permanent worker thread can be spawned with newThread().
37 * This thread will be released when the ThreadPool is closed or explicitely with
38 * setMode(thread, ThreadMode.Interrupted);
40 * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit().
42 * ThreadPool can be closed (or paused) with setState().
44 * TODO Use ThreadFactory
46 * @see SynchronizedWorker Use queue to ensure processing order of work.
48 * @author Toni Kalajainen (toni.kalajainen@vtt.fi)
50 public class ThreadPool extends AbstractState<ExecutorState, RuntimeException> implements Worker, StatefulExecutor {
53 private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool");
54 /** Constructs daemon threads of "ThreadPool" group */
55 private final static ThreadFactory FACTORY = new ThreadFactory()
58 public Thread newThread(Runnable r) {
59 Thread t = new Thread(THREADGROUP, r, "WorkerThread");
65 LinkedList<WorkImpl> queue = new LinkedList<WorkImpl>();
66 Semaphore s = new Semaphore(0);
68 Map<Thread, ThreadMode> threads = new HashMap<Thread, ThreadMode>();
69 // How many threads may be spawned on demand when new work is queued
70 int spawnOnDemandLimit = 0;
71 static ThreadPool INSTANCE;
72 ThreadFactory threadFactory = FACTORY;
75 * Get singleton instance. This pool has 0 resident threads and spawns
76 * temporary threads up to the number of CPUs in the system.
77 * NOTE! Do not use this shared resource for blocking actions.
79 * @return singleton instance which has one worker for each core
81 public synchronized static ThreadPool getInstance() {
83 INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors());
87 public enum ThreadMode {
88 Resident, // Works and stays resident (sleeps if no work available)
89 Temporary, // Works until queue is empty
90 Interrupted // Stops working right away or after current task
94 super(ExecutorState.Active);
98 * Create thread pool with 0 initial worker threads.
100 * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads.
102 public ThreadPool(int maxNumberOfTemporaryThreads) {
103 super(ExecutorState.Active);
104 setSpawnOnDemandLimit(maxNumberOfTemporaryThreads);
107 public synchronized int getWorkingThreadCount()
110 for (ThreadMode m : threads.values())
111 if (m != ThreadMode.Interrupted) {
118 * Sets the limit for how many threads may be spawned on demand when new
121 * WorkPool spawns more threads when new work is queued and
122 * the number of working threads goes below this value.
124 * New thread is spawned if number of threads is less than queued word and
125 * the number of threads is less than <code>limit</code> value.
127 * @param limit on demand thread limit
129 public void setSpawnOnDemandLimit(int limit)
131 this.spawnOnDemandLimit = limit;
135 * Makes current thread yield to the service of the thread pool.
136 * Service can be ended by invoking setMode(ThreadMode.Interrupted).
138 * @param stayResident if true works until interrupted, if false works until queue is empty
140 public void work(boolean stayResident)
142 Thread t = Thread.currentThread();
144 threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
148 ThreadMode mode = getThreadMode(t);
149 ExecutorState state = getState();
151 // Temporary threads can return if not working right now
152 if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state)))
153 mode = ThreadMode.Interrupted;
155 if (mode == null || mode == ThreadMode.Interrupted) break;
157 // Do not do anything if the worker is shut down
158 if (state==ExecutorState.Terminated) break;
161 if (state==ExecutorState.Paused) {
162 // Remove temporary threads if paused
163 if (mode==ThreadMode.Temporary) break;
164 if (mode==ThreadMode.Resident) {
166 waitForState(ExecutorState.NON_PAUSED_STATES);
167 } catch (InterruptedException e) {
175 //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;
179 work = queue.removeFirst();
182 work.setState(WorkState.Working);
183 work.getRunnable().run();
184 work.setState(WorkState.Complete);
185 } catch (RuntimeException e) {
187 work.setState(WorkState.Error);
189 } catch (InterruptedException e) {
192 // Go to shutdown state?
193 boolean goToShutdown = false;
196 // If this is last thread and queue is empty, then go to shutdown state
197 goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();
199 // Go to shutdown state.
201 super.setState(ExecutorState.Terminated);
204 Runnable residentRun = new Runnable()
212 Runnable tempRun = new Runnable()
221 * Spawn new thread to the service of the thread pool
222 * @param stayResident
223 * @return new running thread
225 public Thread newThread(boolean stayResident)
227 Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");
230 threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
236 public synchronized ThreadMode getThreadMode(Thread t)
238 return threads.get(t);
241 public synchronized void setMode(Thread t, ThreadMode tm)
243 if (t==null || tm==null) throw new IllegalArgumentException();
244 ThreadMode oldMode = threads.get(t);
245 if (oldMode == tm) return;
247 if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)
251 if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/)
258 * Is work queue empty
259 * @return true if the work queue is empty
261 public synchronized boolean isEmpty()
263 return queue.isEmpty();
266 void interrupt(Thread t)
271 public synchronized boolean remove(WorkImpl r)
273 return queue.remove(r);
277 public WorkMonitor asyncExec(Runnable runnable, StateListener<WorkState> listener)
278 throws WorkerClosedException
282 if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState()))
283 throw new WorkerClosedException();
284 w = new WorkImpl(runnable);
286 w.addStateListener(listener);
297 public boolean setState(ExecutorState state) {
298 // If worker is already shutdown then it must not be reactivated
299 if (getState() == ExecutorState.Terminated)
302 // Go directly to shutdown if queue is empty and there are no threads
303 if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown)
305 if (threads.isEmpty() && queue.isEmpty()) {
306 super.setState( ExecutorState.Shutdown );
307 super.setState( ExecutorState.Terminated );
309 super.setState( ExecutorState.Shutdown );
314 return super.setState(state);
319 * @param mayInterrupt
321 public synchronized void cancelAll(boolean mayInterrupt)
323 for (WorkMonitor w : queue)
324 w.cancel(mayInterrupt);
329 protected void onStateTransition(ExecutorState oldState,
330 ExecutorState newState) {
331 if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState))
335 private synchronized void spawnTempThreads()
337 if (!ExecutorState.WORKING_STATES.contains(getState())) return;
338 if (spawnOnDemandLimit>0) {
339 int workers = getWorkingThreadCount();
340 int n = spawnOnDemandLimit - workers;
341 for (int i=0; i<n; i++)
346 public class WorkImpl extends AbstractWorkMonitor
348 public WorkImpl(Runnable r) {
353 public boolean cancel(boolean mayInterrupt) {
355 throw new RuntimeException("NOT IMPLEMENTED");
356 synchronized(ThreadPool.this)
358 WorkState s = getState();
359 if (s==WorkState.Ready)
367 public boolean setState(WorkState state) {
368 return super.setState(state);
373 public synchronized void getQueuedWork(Collection<WorkMonitor> result) {
374 result.addAll(queue);
378 public static void main(String[] args) throws InterruptedException {
379 ThreadPool pool = new ThreadPool(2);
381 pool.addStateListener(new StateListener<ExecutorState>() {
383 public void onStateTransition(IStatefulObject<ExecutorState, ?> monitor, ExecutorState oldState, ExecutorState newState) {
384 System.out.println("Worker State: "+newState);
387 StateListener<WorkState> l = new StateListener<WorkState>() {
389 public void onStateTransition(IStatefulObject<WorkState, ?> monitor, WorkState oldState, WorkState newState) {
390 System.out.println("Work ("+monitor+"): "+newState);
394 Runnable[] r = new Runnable[10];
395 WorkMonitor[] m = new WorkMonitor[10];
396 for (int i=0; i<r.length; i++)
399 r[i] = new Runnable() {
401 public String toString() {
408 } catch (InterruptedException e) {
409 //e.printStackTrace();
411 System.out.println(j+" completed by "+Thread.currentThread());
416 for (Runnable rr : r)
417 m[i++] = pool.asyncExec(rr, l);
419 for (int j=2; j<8; j++)
422 pool.setState(ExecutorState.Terminated);
423 m[9].waitForState(EnumSet.of(WorkState.Complete));
426 System.out.println("fin");
429 public void setThreadFactory(ThreadFactory factory)
431 this.threadFactory = factory;
435 public void execute(Runnable command) {
436 asyncExec(command, null);
440 * Returns the task queue used by this executor. Access to the
441 * task queue is intended primarily for debugging and monitoring.
442 * This queue may be in active use. Retrieving the task queue
443 * does not prevent queued tasks from executing.
445 * @return the task queue
447 // public BlockingQueue<Runnable> getQueue() {