-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-/*\r
- *\r
- * @author Toni Kalajainen\r
- */\r
-package org.simantics.utils.threads.ua;\r
-\r
-import java.util.Collection;\r
-import java.util.EnumSet;\r
-import java.util.HashMap;\r
-import java.util.LinkedList;\r
-import java.util.Map;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.ThreadFactory;\r
-\r
-\r
-/**\r
- * ThreadPool contains a queue of work and a pool of threads processing them.\r
- * <p>\r
- * A thread can join the pool either as resident or as temporary.\r
- * Temporary thread works until work queue is empty and returns. \r
- * Thread work mode can be set with setMode().\r
- * <p>\r
- * Current thread can yield to the service of ThreadPool with work()-invoke.\r
- * <p>\r
- * A new permanent worker thread can be spawned with newThread().\r
- * This thread will be released when the ThreadPool is closed or explicitely with\r
- * setMode(thread, ThreadMode.Interrupted);\r
- * <p>\r
- * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit(). \r
- * <p>\r
- * ThreadPool can be closed (or paused) with setState().\r
- * <p>\r
- * TODO Use ThreadFactory\r
- * \r
- * @see SynchronizedWorker Use queue to ensure processing order of work.\r
- * @see Worker \r
- * @author Toni Kalajainen (toni.kalajainen@vtt.fi)\r
- */\r
-public class ThreadPool extends AbstractState<ExecutorState, RuntimeException> implements Worker, StatefulExecutor {\r
-\r
- \r
- private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool");\r
- /** Constructs daemon threads of "ThreadPool" group */\r
- private final static ThreadFactory FACTORY = new ThreadFactory()\r
- {\r
- @Override\r
- public Thread newThread(Runnable r) {\r
- Thread t = new Thread(THREADGROUP, r, "WorkerThread");\r
- t.setDaemon(true);\r
- return t; \r
- }\r
- };\r
- \r
- LinkedList<WorkImpl> queue = new LinkedList<WorkImpl>();\r
- Semaphore s = new Semaphore(0);\r
- \r
- Map<Thread, ThreadMode> threads = new HashMap<Thread, ThreadMode>();\r
- // How many threads may be spawned on demand when new work is queued\r
- int spawnOnDemandLimit = 0;\r
- static ThreadPool INSTANCE;\r
- ThreadFactory threadFactory = FACTORY;\r
- \r
- /**\r
- * Get singleton instance. This pool has 0 resident threads and spawns\r
- * temporary threads up to the number of CPUs in the system.\r
- * NOTE! Do not use this shared resource for blocking actions.\r
- * \r
- * @return singleton instance which has one worker for each core\r
- */\r
- public synchronized static ThreadPool getInstance() {\r
- if (INSTANCE == null) \r
- INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors()); \r
- return INSTANCE;\r
- }\r
- \r
- public enum ThreadMode {\r
- Resident, // Works and stays resident (sleeps if no work available)\r
- Temporary, // Works until queue is empty\r
- Interrupted // Stops working right away or after current task\r
- }\r
- \r
- public ThreadPool() {\r
- super(ExecutorState.Active);\r
- }\r
- \r
- /**\r
- * Create thread pool with 0 initial worker threads.\r
- * \r
- * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads.\r
- */\r
- public ThreadPool(int maxNumberOfTemporaryThreads) {\r
- super(ExecutorState.Active);\r
- setSpawnOnDemandLimit(maxNumberOfTemporaryThreads);\r
- }\r
-\r
- public synchronized int getWorkingThreadCount()\r
- {\r
- int result = 0;\r
- for (ThreadMode m : threads.values())\r
- if (m != ThreadMode.Interrupted) {\r
- result++;\r
- }\r
- return result;\r
- }\r
- \r
- /**\r
- * Sets the limit for how many threads may be spawned on demand when new\r
- * work is queued.\r
- * <p>\r
- * WorkPool spawns more threads when new work is queued and \r
- * the number of working threads goes below this value.\r
- * <p>\r
- * New thread is spawned if number of threads is less than queued word and\r
- * the number of threads is less than <code>limit</code> value. \r
- * \r
- * @param limit on demand thread limit\r
- */\r
- public void setSpawnOnDemandLimit(int limit)\r
- {\r
- this.spawnOnDemandLimit = limit;\r
- }\r
- \r
- /**\r
- * Makes current thread yield to the service of the thread pool. \r
- * Service can be ended by invoking setMode(ThreadMode.Interrupted).\r
- * \r
- * @param stayResident if true works until interrupted, if false works until queue is empty\r
- */\r
- public void work(boolean stayResident)\r
- { \r
- Thread t = Thread.currentThread();\r
- synchronized(this) {\r
- threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);\r
- } \r
- for (;;) {\r
- synchronized(this) {\r
- ThreadMode mode = getThreadMode(t);\r
- ExecutorState state = getState();\r
- \r
- // Temporary threads can return if not working right now\r
- if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state))) \r
- mode = ThreadMode.Interrupted;\r
- \r
- if (mode == null || mode == ThreadMode.Interrupted) break;\r
- \r
- // Do not do anything if the worker is shut down\r
- if (state==ExecutorState.Terminated) break;\r
-\r
- \r
- if (state==ExecutorState.Paused) {\r
- // Remove temporary threads if paused\r
- if (mode==ThreadMode.Temporary) break;\r
- if (mode==ThreadMode.Resident) { \r
- try {\r
- waitForState(ExecutorState.NON_PAUSED_STATES);\r
- } catch (InterruptedException e) {\r
- }\r
- continue;\r
- }\r
- }\r
- \r
- }\r
- try { \r
- //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;\r
- s.acquire();\r
- WorkImpl work;\r
- synchronized(this) {\r
- work = queue.removeFirst();\r
- }\r
- try {\r
- work.setState(WorkState.Working);\r
- work.getRunnable().run();\r
- work.setState(WorkState.Complete);\r
- } catch (RuntimeException e) {\r
- work.setError(e);\r
- work.setState(WorkState.Error);\r
- }\r
- } catch (InterruptedException e) {\r
- }\r
- }\r
- // Go to shutdown state?\r
- boolean goToShutdown = false;\r
- synchronized(this) {\r
- threads.remove(t);\r
- // If this is last thread and queue is empty, then go to shutdown state\r
- goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();\r
- }\r
- // Go to shutdown state.\r
- if (goToShutdown)\r
- super.setState(ExecutorState.Terminated);\r
- }\r
- \r
- Runnable residentRun = new Runnable() \r
- {\r
- @Override\r
- public void run() {\r
- work(true);\r
- }\r
- };\r
-\r
- Runnable tempRun = new Runnable() \r
- {\r
- @Override\r
- public void run() {\r
- work(false);\r
- }\r
- };\r
- \r
- /**\r
- * Spawn new thread to the service of the thread pool\r
- * @param stayResident \r
- * @return new running thread\r
- */\r
- public Thread newThread(boolean stayResident)\r
- {\r
- Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");\r
- \r
- synchronized(this) {\r
- threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);\r
- }\r
- t.start();\r
- return t;\r
- }\r
-\r
- public synchronized ThreadMode getThreadMode(Thread t)\r
- {\r
- return threads.get(t);\r
- }\r
- \r
- public synchronized void setMode(Thread t, ThreadMode tm)\r
- {\r
- if (t==null || tm==null) throw new IllegalArgumentException();\r
- ThreadMode oldMode = threads.get(t);\r
- if (oldMode == tm) return; \r
- threads.put(t, tm);\r
- if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)\r
- { \r
- t.interrupt();\r
- }\r
- if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/) \r
- {\r
- t.interrupt();\r
- }\r
- }\r
- \r
- /**\r
- * Is work queue empty\r
- * @return true if the work queue is empty\r
- */\r
- public synchronized boolean isEmpty()\r
- {\r
- return queue.isEmpty();\r
- }\r
- \r
- void interrupt(Thread t)\r
- {\r
- t.interrupt();\r
- }\r
- \r
- public synchronized boolean remove(WorkImpl r)\r
- {\r
- return queue.remove(r);\r
- }\r
- \r
- @Override\r
- public WorkMonitor asyncExec(Runnable runnable, StateListener<WorkState> listener) \r
- throws WorkerClosedException \r
- {\r
- WorkImpl w = null;\r
- synchronized(this) {\r
- if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState()))\r
- throw new WorkerClosedException();\r
- w = new WorkImpl(runnable);\r
- if (listener!=null)\r
- w.addStateListener(listener);\r
- queue.add(w);\r
- spawnTempThreads();\r
- }\r
- s.release();\r
- return w;\r
- }\r
- \r
-\r
- // Raise visibility\r
- @Override\r
- public boolean setState(ExecutorState state) {\r
- // If worker is already shutdown then it must not be reactivated\r
- if (getState() == ExecutorState.Terminated) \r
- return false;\r
- synchronized(this) {\r
- // Go directly to shutdown if queue is empty and there are no threads\r
- if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown)\r
- {\r
- if (threads.isEmpty() && queue.isEmpty()) {\r
- super.setState( ExecutorState.Shutdown );\r
- super.setState( ExecutorState.Terminated );\r
- } else {\r
- super.setState( ExecutorState.Shutdown );\r
- }\r
- return true;\r
- }\r
- }\r
- return super.setState(state);\r
- }\r
- \r
- /**\r
- * Cancel all work.\r
- * @param mayInterrupt \r
- */\r
- public synchronized void cancelAll(boolean mayInterrupt) \r
- {\r
- for (WorkMonitor w : queue)\r
- w.cancel(mayInterrupt);\r
- queue.clear();\r
- }\r
- \r
- @Override\r
- protected void onStateTransition(ExecutorState oldState,\r
- ExecutorState newState) {\r
- if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState))\r
- spawnTempThreads();\r
- }\r
- \r
- private synchronized void spawnTempThreads()\r
- {\r
- if (!ExecutorState.WORKING_STATES.contains(getState())) return; \r
- if (spawnOnDemandLimit>0) {\r
- int workers = getWorkingThreadCount();\r
- int n = spawnOnDemandLimit - workers;\r
- for (int i=0; i<n; i++)\r
- newThread(false);\r
- } \r
- }\r
-\r
- public class WorkImpl extends AbstractWorkMonitor \r
- {\r
- public WorkImpl(Runnable r) {\r
- super(r);\r
- } \r
-\r
- @Override\r
- public boolean cancel(boolean mayInterrupt) {\r
- if (mayInterrupt) \r
- throw new RuntimeException("NOT IMPLEMENTED");\r
- synchronized(ThreadPool.this) \r
- {\r
- WorkState s = getState();\r
- if (s==WorkState.Ready) \r
- return remove(this); \r
- }\r
- return false;\r
- } \r
-\r
- // Raise visibility\r
- @Override\r
- public boolean setState(WorkState state) {\r
- return super.setState(state);\r
- }\r
- }\r
- \r
- @Override\r
- public synchronized void getQueuedWork(Collection<WorkMonitor> result) {\r
- result.addAll(queue);\r
- }\r
- \r
- \r
- public static void main(String[] args) throws InterruptedException {\r
- ThreadPool pool = new ThreadPool(2);\r
- \r
- pool.addStateListener(new StateListener<ExecutorState>() {\r
- @Override\r
- public void onStateTransition(IStatefulObject<ExecutorState, ?> monitor, ExecutorState oldState, ExecutorState newState) {\r
- System.out.println("Worker State: "+newState);\r
- }});\r
- \r
- StateListener<WorkState> l = new StateListener<WorkState>() {\r
- @Override\r
- public void onStateTransition(IStatefulObject<WorkState, ?> monitor, WorkState oldState, WorkState newState) {\r
- System.out.println("Work ("+monitor+"): "+newState);\r
- }\r
- };\r
- \r
- Runnable[] r = new Runnable[10];\r
- WorkMonitor[] m = new WorkMonitor[10];\r
- for (int i=0; i<r.length; i++)\r
- {\r
- final int j = i;\r
- r[i] = new Runnable() {\r
- @Override\r
- public String toString() {\r
- return ""+j;\r
- }\r
- @Override\r
- public void run() {\r
- try {\r
- Thread.sleep(1000);\r
- } catch (InterruptedException e) {\r
- //e.printStackTrace();\r
- }\r
- System.out.println(j+" completed by "+Thread.currentThread()); \r
- }};\r
- }\r
- \r
- int i=0;\r
- for (Runnable rr : r)\r
- m[i++] = pool.asyncExec(rr, l);\r
- \r
- for (int j=2; j<8; j++)\r
- m[j].cancel(false);\r
- \r
- pool.setState(ExecutorState.Terminated);\r
- m[9].waitForState(EnumSet.of(WorkState.Complete));\r
-\r
- \r
- System.out.println("fin");\r
- }\r
-\r
- public void setThreadFactory(ThreadFactory factory)\r
- {\r
- this.threadFactory = factory;\r
- }\r
-\r
- @Override\r
- public void execute(Runnable command) {\r
- asyncExec(command, null);\r
- }\r
- \r
- /**\r
- * Returns the task queue used by this executor. Access to the\r
- * task queue is intended primarily for debugging and monitoring.\r
- * This queue may be in active use. Retrieving the task queue\r
- * does not prevent queued tasks from executing.\r
- *\r
- * @return the task queue\r
- */\r
-// public BlockingQueue<Runnable> getQueue() {\r
-// return queue;\r
-// } \r
- \r
- \r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+/*
+ *
+ * @author Toni Kalajainen
+ */
+package org.simantics.utils.threads.ua;
+
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+
+
+/**
+ * ThreadPool contains a queue of work and a pool of threads processing them.
+ * <p>
+ * A thread can join the pool either as resident or as temporary.
+ * Temporary thread works until work queue is empty and returns.
+ * Thread work mode can be set with setMode().
+ * <p>
+ * Current thread can yield to the service of ThreadPool with work()-invoke.
+ * <p>
+ * A new permanent worker thread can be spawned with newThread().
+ * This thread will be released when the ThreadPool is closed or explicitely with
+ * setMode(thread, ThreadMode.Interrupted);
+ * <p>
+ * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit().
+ * <p>
+ * ThreadPool can be closed (or paused) with setState().
+ * <p>
+ * TODO Use ThreadFactory
+ *
+ * @see SynchronizedWorker Use queue to ensure processing order of work.
+ * @see Worker
+ * @author Toni Kalajainen (toni.kalajainen@vtt.fi)
+ */
+public class ThreadPool extends AbstractState<ExecutorState, RuntimeException> implements Worker, StatefulExecutor {
+
+
+ private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool");
+ /** Constructs daemon threads of "ThreadPool" group */
+ private final static ThreadFactory FACTORY = new ThreadFactory()
+ {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(THREADGROUP, r, "WorkerThread");
+ t.setDaemon(true);
+ return t;
+ }
+ };
+
+ LinkedList<WorkImpl> queue = new LinkedList<WorkImpl>();
+ Semaphore s = new Semaphore(0);
+
+ Map<Thread, ThreadMode> threads = new HashMap<Thread, ThreadMode>();
+ // How many threads may be spawned on demand when new work is queued
+ int spawnOnDemandLimit = 0;
+ static ThreadPool INSTANCE;
+ ThreadFactory threadFactory = FACTORY;
+
+ /**
+ * Get singleton instance. This pool has 0 resident threads and spawns
+ * temporary threads up to the number of CPUs in the system.
+ * NOTE! Do not use this shared resource for blocking actions.
+ *
+ * @return singleton instance which has one worker for each core
+ */
+ public synchronized static ThreadPool getInstance() {
+ if (INSTANCE == null)
+ INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors());
+ return INSTANCE;
+ }
+
+ public enum ThreadMode {
+ Resident, // Works and stays resident (sleeps if no work available)
+ Temporary, // Works until queue is empty
+ Interrupted // Stops working right away or after current task
+ }
+
+ public ThreadPool() {
+ super(ExecutorState.Active);
+ }
+
+ /**
+ * Create thread pool with 0 initial worker threads.
+ *
+ * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads.
+ */
+ public ThreadPool(int maxNumberOfTemporaryThreads) {
+ super(ExecutorState.Active);
+ setSpawnOnDemandLimit(maxNumberOfTemporaryThreads);
+ }
+
+ public synchronized int getWorkingThreadCount()
+ {
+ int result = 0;
+ for (ThreadMode m : threads.values())
+ if (m != ThreadMode.Interrupted) {
+ result++;
+ }
+ return result;
+ }
+
+ /**
+ * Sets the limit for how many threads may be spawned on demand when new
+ * work is queued.
+ * <p>
+ * WorkPool spawns more threads when new work is queued and
+ * the number of working threads goes below this value.
+ * <p>
+ * New thread is spawned if number of threads is less than queued word and
+ * the number of threads is less than <code>limit</code> value.
+ *
+ * @param limit on demand thread limit
+ */
+ public void setSpawnOnDemandLimit(int limit)
+ {
+ this.spawnOnDemandLimit = limit;
+ }
+
+ /**
+ * Makes current thread yield to the service of the thread pool.
+ * Service can be ended by invoking setMode(ThreadMode.Interrupted).
+ *
+ * @param stayResident if true works until interrupted, if false works until queue is empty
+ */
+ public void work(boolean stayResident)
+ {
+ Thread t = Thread.currentThread();
+ synchronized(this) {
+ threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
+ }
+ for (;;) {
+ synchronized(this) {
+ ThreadMode mode = getThreadMode(t);
+ ExecutorState state = getState();
+
+ // Temporary threads can return if not working right now
+ if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state)))
+ mode = ThreadMode.Interrupted;
+
+ if (mode == null || mode == ThreadMode.Interrupted) break;
+
+ // Do not do anything if the worker is shut down
+ if (state==ExecutorState.Terminated) break;
+
+
+ if (state==ExecutorState.Paused) {
+ // Remove temporary threads if paused
+ if (mode==ThreadMode.Temporary) break;
+ if (mode==ThreadMode.Resident) {
+ try {
+ waitForState(ExecutorState.NON_PAUSED_STATES);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+ }
+
+ }
+ try {
+ //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;
+ s.acquire();
+ WorkImpl work;
+ synchronized(this) {
+ work = queue.removeFirst();
+ }
+ try {
+ work.setState(WorkState.Working);
+ work.getRunnable().run();
+ work.setState(WorkState.Complete);
+ } catch (RuntimeException e) {
+ work.setError(e);
+ work.setState(WorkState.Error);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ // Go to shutdown state?
+ boolean goToShutdown = false;
+ synchronized(this) {
+ threads.remove(t);
+ // If this is last thread and queue is empty, then go to shutdown state
+ goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();
+ }
+ // Go to shutdown state.
+ if (goToShutdown)
+ super.setState(ExecutorState.Terminated);
+ }
+
+ Runnable residentRun = new Runnable()
+ {
+ @Override
+ public void run() {
+ work(true);
+ }
+ };
+
+ Runnable tempRun = new Runnable()
+ {
+ @Override
+ public void run() {
+ work(false);
+ }
+ };
+
+ /**
+ * Spawn new thread to the service of the thread pool
+ * @param stayResident
+ * @return new running thread
+ */
+ public Thread newThread(boolean stayResident)
+ {
+ Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");
+
+ synchronized(this) {
+ threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
+ }
+ t.start();
+ return t;
+ }
+
+ public synchronized ThreadMode getThreadMode(Thread t)
+ {
+ return threads.get(t);
+ }
+
+ public synchronized void setMode(Thread t, ThreadMode tm)
+ {
+ if (t==null || tm==null) throw new IllegalArgumentException();
+ ThreadMode oldMode = threads.get(t);
+ if (oldMode == tm) return;
+ threads.put(t, tm);
+ if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)
+ {
+ t.interrupt();
+ }
+ if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/)
+ {
+ t.interrupt();
+ }
+ }
+
+ /**
+ * Is work queue empty
+ * @return true if the work queue is empty
+ */
+ public synchronized boolean isEmpty()
+ {
+ return queue.isEmpty();
+ }
+
+ void interrupt(Thread t)
+ {
+ t.interrupt();
+ }
+
+ public synchronized boolean remove(WorkImpl r)
+ {
+ return queue.remove(r);
+ }
+
+ @Override
+ public WorkMonitor asyncExec(Runnable runnable, StateListener<WorkState> listener)
+ throws WorkerClosedException
+ {
+ WorkImpl w = null;
+ synchronized(this) {
+ if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState()))
+ throw new WorkerClosedException();
+ w = new WorkImpl(runnable);
+ if (listener!=null)
+ w.addStateListener(listener);
+ queue.add(w);
+ spawnTempThreads();
+ }
+ s.release();
+ return w;
+ }
+
+
+ // Raise visibility
+ @Override
+ public boolean setState(ExecutorState state) {
+ // If worker is already shutdown then it must not be reactivated
+ if (getState() == ExecutorState.Terminated)
+ return false;
+ synchronized(this) {
+ // Go directly to shutdown if queue is empty and there are no threads
+ if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown)
+ {
+ if (threads.isEmpty() && queue.isEmpty()) {
+ super.setState( ExecutorState.Shutdown );
+ super.setState( ExecutorState.Terminated );
+ } else {
+ super.setState( ExecutorState.Shutdown );
+ }
+ return true;
+ }
+ }
+ return super.setState(state);
+ }
+
+ /**
+ * Cancel all work.
+ * @param mayInterrupt
+ */
+ public synchronized void cancelAll(boolean mayInterrupt)
+ {
+ for (WorkMonitor w : queue)
+ w.cancel(mayInterrupt);
+ queue.clear();
+ }
+
+ @Override
+ protected void onStateTransition(ExecutorState oldState,
+ ExecutorState newState) {
+ if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState))
+ spawnTempThreads();
+ }
+
+ private synchronized void spawnTempThreads()
+ {
+ if (!ExecutorState.WORKING_STATES.contains(getState())) return;
+ if (spawnOnDemandLimit>0) {
+ int workers = getWorkingThreadCount();
+ int n = spawnOnDemandLimit - workers;
+ for (int i=0; i<n; i++)
+ newThread(false);
+ }
+ }
+
+ public class WorkImpl extends AbstractWorkMonitor
+ {
+ public WorkImpl(Runnable r) {
+ super(r);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterrupt) {
+ if (mayInterrupt)
+ throw new RuntimeException("NOT IMPLEMENTED");
+ synchronized(ThreadPool.this)
+ {
+ WorkState s = getState();
+ if (s==WorkState.Ready)
+ return remove(this);
+ }
+ return false;
+ }
+
+ // Raise visibility
+ @Override
+ public boolean setState(WorkState state) {
+ return super.setState(state);
+ }
+ }
+
+ @Override
+ public synchronized void getQueuedWork(Collection<WorkMonitor> result) {
+ result.addAll(queue);
+ }
+
+
+ public static void main(String[] args) throws InterruptedException {
+ ThreadPool pool = new ThreadPool(2);
+
+ pool.addStateListener(new StateListener<ExecutorState>() {
+ @Override
+ public void onStateTransition(IStatefulObject<ExecutorState, ?> monitor, ExecutorState oldState, ExecutorState newState) {
+ System.out.println("Worker State: "+newState);
+ }});
+
+ StateListener<WorkState> l = new StateListener<WorkState>() {
+ @Override
+ public void onStateTransition(IStatefulObject<WorkState, ?> monitor, WorkState oldState, WorkState newState) {
+ System.out.println("Work ("+monitor+"): "+newState);
+ }
+ };
+
+ Runnable[] r = new Runnable[10];
+ WorkMonitor[] m = new WorkMonitor[10];
+ for (int i=0; i<r.length; i++)
+ {
+ final int j = i;
+ r[i] = new Runnable() {
+ @Override
+ public String toString() {
+ return ""+j;
+ }
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ //e.printStackTrace();
+ }
+ System.out.println(j+" completed by "+Thread.currentThread());
+ }};
+ }
+
+ int i=0;
+ for (Runnable rr : r)
+ m[i++] = pool.asyncExec(rr, l);
+
+ for (int j=2; j<8; j++)
+ m[j].cancel(false);
+
+ pool.setState(ExecutorState.Terminated);
+ m[9].waitForState(EnumSet.of(WorkState.Complete));
+
+
+ System.out.println("fin");
+ }
+
+ public void setThreadFactory(ThreadFactory factory)
+ {
+ this.threadFactory = factory;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ asyncExec(command, null);
+ }
+
+ /**
+ * Returns the task queue used by this executor. Access to the
+ * task queue is intended primarily for debugging and monitoring.
+ * This queue may be in active use. Retrieving the task queue
+ * does not prevent queued tasks from executing.
+ *
+ * @return the task queue
+ */
+// public BlockingQueue<Runnable> getQueue() {
+// return queue;
+// }
+
+
+}