1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
14 * @author Toni Kalajainen
\r
16 package org.simantics.utils.threads.ua;
\r
18 import java.util.Collection;
\r
19 import java.util.EnumSet;
\r
20 import java.util.HashMap;
\r
21 import java.util.LinkedList;
\r
22 import java.util.Map;
\r
23 import java.util.concurrent.Semaphore;
\r
24 import java.util.concurrent.ThreadFactory;
\r
28 * ThreadPool contains a queue of work and a pool of threads processing them.
\r
30 * A thread can join the pool either as resident or as temporary.
\r
31 * Temporary thread works until work queue is empty and returns.
\r
32 * Thread work mode can be set with setMode().
\r
34 * Current thread can yield to the service of ThreadPool with work()-invoke.
\r
36 * A new permanent worker thread can be spawned with newThread().
\r
37 * This thread will be released when the ThreadPool is closed or explicitely with
\r
38 * setMode(thread, ThreadMode.Interrupted);
\r
40 * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit().
\r
42 * ThreadPool can be closed (or paused) with setState().
\r
44 * TODO Use ThreadFactory
\r
46 * @see SynchronizedWorker Use queue to ensure processing order of work.
\r
48 * @author Toni Kalajainen (toni.kalajainen@vtt.fi)
\r
50 public class ThreadPool extends AbstractState<ExecutorState, RuntimeException> implements Worker, StatefulExecutor {
\r
53 private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool");
\r
54 /** Constructs daemon threads of "ThreadPool" group */
\r
55 private final static ThreadFactory FACTORY = new ThreadFactory()
\r
58 public Thread newThread(Runnable r) {
\r
59 Thread t = new Thread(THREADGROUP, r, "WorkerThread");
\r
65 LinkedList<WorkImpl> queue = new LinkedList<WorkImpl>();
\r
66 Semaphore s = new Semaphore(0);
\r
68 Map<Thread, ThreadMode> threads = new HashMap<Thread, ThreadMode>();
\r
69 // How many threads may be spawned on demand when new work is queued
\r
70 int spawnOnDemandLimit = 0;
\r
71 static ThreadPool INSTANCE;
\r
72 ThreadFactory threadFactory = FACTORY;
\r
75 * Get singleton instance. This pool has 0 resident threads and spawns
\r
76 * temporary threads up to the number of CPUs in the system.
\r
77 * NOTE! Do not use this shared resource for blocking actions.
\r
79 * @return singleton instance which has one worker for each core
\r
81 public synchronized static ThreadPool getInstance() {
\r
82 if (INSTANCE == null)
\r
83 INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors());
\r
87 public enum ThreadMode {
\r
88 Resident, // Works and stays resident (sleeps if no work available)
\r
89 Temporary, // Works until queue is empty
\r
90 Interrupted // Stops working right away or after current task
\r
93 public ThreadPool() {
\r
94 super(ExecutorState.Active);
\r
98 * Create thread pool with 0 initial worker threads.
\r
100 * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads.
\r
102 public ThreadPool(int maxNumberOfTemporaryThreads) {
\r
103 super(ExecutorState.Active);
\r
104 setSpawnOnDemandLimit(maxNumberOfTemporaryThreads);
\r
107 public synchronized int getWorkingThreadCount()
\r
110 for (ThreadMode m : threads.values())
\r
111 if (m != ThreadMode.Interrupted) {
\r
118 * Sets the limit for how many threads may be spawned on demand when new
\r
121 * WorkPool spawns more threads when new work is queued and
\r
122 * the number of working threads goes below this value.
\r
124 * New thread is spawned if number of threads is less than queued word and
\r
125 * the number of threads is less than <code>limit</code> value.
\r
127 * @param limit on demand thread limit
\r
129 public void setSpawnOnDemandLimit(int limit)
\r
131 this.spawnOnDemandLimit = limit;
\r
135 * Makes current thread yield to the service of the thread pool.
\r
136 * Service can be ended by invoking setMode(ThreadMode.Interrupted).
\r
138 * @param stayResident if true works until interrupted, if false works until queue is empty
\r
140 public void work(boolean stayResident)
\r
142 Thread t = Thread.currentThread();
\r
143 synchronized(this) {
\r
144 threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
\r
147 synchronized(this) {
\r
148 ThreadMode mode = getThreadMode(t);
\r
149 ExecutorState state = getState();
\r
151 // Temporary threads can return if not working right now
\r
152 if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state)))
\r
153 mode = ThreadMode.Interrupted;
\r
155 if (mode == null || mode == ThreadMode.Interrupted) break;
\r
157 // Do not do anything if the worker is shut down
\r
158 if (state==ExecutorState.Terminated) break;
\r
161 if (state==ExecutorState.Paused) {
\r
162 // Remove temporary threads if paused
\r
163 if (mode==ThreadMode.Temporary) break;
\r
164 if (mode==ThreadMode.Resident) {
\r
166 waitForState(ExecutorState.NON_PAUSED_STATES);
\r
167 } catch (InterruptedException e) {
\r
175 //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;
\r
178 synchronized(this) {
\r
179 work = queue.removeFirst();
\r
182 work.setState(WorkState.Working);
\r
183 work.getRunnable().run();
\r
184 work.setState(WorkState.Complete);
\r
185 } catch (RuntimeException e) {
\r
187 work.setState(WorkState.Error);
\r
189 } catch (InterruptedException e) {
\r
192 // Go to shutdown state?
\r
193 boolean goToShutdown = false;
\r
194 synchronized(this) {
\r
196 // If this is last thread and queue is empty, then go to shutdown state
\r
197 goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();
\r
199 // Go to shutdown state.
\r
201 super.setState(ExecutorState.Terminated);
\r
204 Runnable residentRun = new Runnable()
\r
207 public void run() {
\r
212 Runnable tempRun = new Runnable()
\r
215 public void run() {
\r
221 * Spawn new thread to the service of the thread pool
\r
222 * @param stayResident
\r
223 * @return new running thread
\r
225 public Thread newThread(boolean stayResident)
\r
227 Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");
\r
229 synchronized(this) {
\r
230 threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
\r
236 public synchronized ThreadMode getThreadMode(Thread t)
\r
238 return threads.get(t);
\r
241 public synchronized void setMode(Thread t, ThreadMode tm)
\r
243 if (t==null || tm==null) throw new IllegalArgumentException();
\r
244 ThreadMode oldMode = threads.get(t);
\r
245 if (oldMode == tm) return;
\r
246 threads.put(t, tm);
\r
247 if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)
\r
251 if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/)
\r
258 * Is work queue empty
\r
259 * @return true if the work queue is empty
\r
261 public synchronized boolean isEmpty()
\r
263 return queue.isEmpty();
\r
266 void interrupt(Thread t)
\r
271 public synchronized boolean remove(WorkImpl r)
\r
273 return queue.remove(r);
\r
277 public WorkMonitor asyncExec(Runnable runnable, StateListener<WorkState> listener)
\r
278 throws WorkerClosedException
\r
281 synchronized(this) {
\r
282 if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState()))
\r
283 throw new WorkerClosedException();
\r
284 w = new WorkImpl(runnable);
\r
285 if (listener!=null)
\r
286 w.addStateListener(listener);
\r
288 spawnTempThreads();
\r
295 // Raise visibility
\r
297 public boolean setState(ExecutorState state) {
\r
298 // If worker is already shutdown then it must not be reactivated
\r
299 if (getState() == ExecutorState.Terminated)
\r
301 synchronized(this) {
\r
302 // Go directly to shutdown if queue is empty and there are no threads
\r
303 if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown)
\r
305 if (threads.isEmpty() && queue.isEmpty()) {
\r
306 super.setState( ExecutorState.Shutdown );
\r
307 super.setState( ExecutorState.Terminated );
\r
309 super.setState( ExecutorState.Shutdown );
\r
314 return super.setState(state);
\r
319 * @param mayInterrupt
\r
321 public synchronized void cancelAll(boolean mayInterrupt)
\r
323 for (WorkMonitor w : queue)
\r
324 w.cancel(mayInterrupt);
\r
329 protected void onStateTransition(ExecutorState oldState,
\r
330 ExecutorState newState) {
\r
331 if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState))
\r
332 spawnTempThreads();
\r
335 private synchronized void spawnTempThreads()
\r
337 if (!ExecutorState.WORKING_STATES.contains(getState())) return;
\r
338 if (spawnOnDemandLimit>0) {
\r
339 int workers = getWorkingThreadCount();
\r
340 int n = spawnOnDemandLimit - workers;
\r
341 for (int i=0; i<n; i++)
\r
346 public class WorkImpl extends AbstractWorkMonitor
\r
348 public WorkImpl(Runnable r) {
\r
353 public boolean cancel(boolean mayInterrupt) {
\r
355 throw new RuntimeException("NOT IMPLEMENTED");
\r
356 synchronized(ThreadPool.this)
\r
358 WorkState s = getState();
\r
359 if (s==WorkState.Ready)
\r
360 return remove(this);
\r
365 // Raise visibility
\r
367 public boolean setState(WorkState state) {
\r
368 return super.setState(state);
\r
373 public synchronized void getQueuedWork(Collection<WorkMonitor> result) {
\r
374 result.addAll(queue);
\r
378 public static void main(String[] args) throws InterruptedException {
\r
379 ThreadPool pool = new ThreadPool(2);
\r
381 pool.addStateListener(new StateListener<ExecutorState>() {
\r
383 public void onStateTransition(IStatefulObject<ExecutorState, ?> monitor, ExecutorState oldState, ExecutorState newState) {
\r
384 System.out.println("Worker State: "+newState);
\r
387 StateListener<WorkState> l = new StateListener<WorkState>() {
\r
389 public void onStateTransition(IStatefulObject<WorkState, ?> monitor, WorkState oldState, WorkState newState) {
\r
390 System.out.println("Work ("+monitor+"): "+newState);
\r
394 Runnable[] r = new Runnable[10];
\r
395 WorkMonitor[] m = new WorkMonitor[10];
\r
396 for (int i=0; i<r.length; i++)
\r
399 r[i] = new Runnable() {
\r
401 public String toString() {
\r
405 public void run() {
\r
407 Thread.sleep(1000);
\r
408 } catch (InterruptedException e) {
\r
409 //e.printStackTrace();
\r
411 System.out.println(j+" completed by "+Thread.currentThread());
\r
416 for (Runnable rr : r)
\r
417 m[i++] = pool.asyncExec(rr, l);
\r
419 for (int j=2; j<8; j++)
\r
420 m[j].cancel(false);
\r
422 pool.setState(ExecutorState.Terminated);
\r
423 m[9].waitForState(EnumSet.of(WorkState.Complete));
\r
426 System.out.println("fin");
\r
429 public void setThreadFactory(ThreadFactory factory)
\r
431 this.threadFactory = factory;
\r
435 public void execute(Runnable command) {
\r
436 asyncExec(command, null);
\r
440 * Returns the task queue used by this executor. Access to the
\r
441 * task queue is intended primarily for debugging and monitoring.
\r
442 * This queue may be in active use. Retrieving the task queue
\r
443 * does not prevent queued tasks from executing.
\r
445 * @return the task queue
\r
447 // public BlockingQueue<Runnable> getQueue() {
\r