]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java
Fixed all line endings of the repository
[simantics/platform.git] / bundles / org.simantics.utils.thread / src / org / simantics / utils / threads / ua / ThreadPool.java
index cff196467f5871adbcdff42e86047e870e8d9368..6738771d9eca009289e0a29149bb33254fd54572 100644 (file)
-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- *     VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-/*\r
- *\r
- * @author Toni Kalajainen\r
- */\r
-package org.simantics.utils.threads.ua;\r
-\r
-import java.util.Collection;\r
-import java.util.EnumSet;\r
-import java.util.HashMap;\r
-import java.util.LinkedList;\r
-import java.util.Map;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.ThreadFactory;\r
-\r
-\r
-/**\r
- * ThreadPool contains a queue of work and a pool of threads processing them.\r
- * <p>\r
- * A thread can join the pool either as resident or as temporary.\r
- * Temporary thread works until work queue is empty and returns.  \r
- * Thread work mode can be set with setMode().\r
- * <p>\r
- * Current thread can yield to the service of ThreadPool with work()-invoke.\r
- * <p>\r
- * A new permanent worker thread can be spawned with newThread().\r
- * This thread will be released when the ThreadPool is closed or explicitely with\r
- * setMode(thread, ThreadMode.Interrupted);\r
- * <p>\r
- * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit().  \r
- * <p>\r
- * ThreadPool can be closed (or paused) with setState().\r
- * <p>\r
- * TODO Use ThreadFactory\r
- * \r
- * @see SynchronizedWorker Use queue to ensure processing order of work.\r
- * @see Worker \r
- * @author Toni Kalajainen (toni.kalajainen@vtt.fi)\r
- */\r
-public class ThreadPool extends AbstractState<ExecutorState, RuntimeException> implements Worker, StatefulExecutor {\r
-\r
-       \r
-       private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool");\r
-       /** Constructs daemon threads of "ThreadPool" group */\r
-       private final static ThreadFactory FACTORY = new ThreadFactory()\r
-       {\r
-               @Override\r
-               public Thread newThread(Runnable r) {\r
-                       Thread t = new Thread(THREADGROUP, r, "WorkerThread");\r
-                       t.setDaemon(true);\r
-                       return t;                       \r
-               }\r
-       };\r
-       \r
-       LinkedList<WorkImpl> queue = new LinkedList<WorkImpl>();\r
-       Semaphore s = new Semaphore(0);\r
-       \r
-       Map<Thread, ThreadMode> threads = new HashMap<Thread, ThreadMode>();\r
-       // How many threads may be spawned on demand when new work is queued\r
-       int spawnOnDemandLimit = 0;\r
-       static ThreadPool INSTANCE;\r
-       ThreadFactory threadFactory = FACTORY;\r
-       \r
-       /**\r
-        * Get singleton instance. This pool has 0 resident threads and spawns\r
-        * temporary threads up to the number of CPUs in the system.\r
-        * NOTE! Do not use this shared resource for blocking actions.\r
-        *  \r
-        * @return singleton instance which has one worker for each core\r
-        */\r
-       public synchronized static ThreadPool getInstance() {\r
-               if (INSTANCE == null) \r
-                       INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors());          \r
-               return INSTANCE;\r
-       }\r
-       \r
-       public enum ThreadMode {\r
-               Resident,    // Works and stays resident (sleeps if no work available)\r
-               Temporary,   // Works until queue is empty\r
-               Interrupted  // Stops working right away or after current task\r
-       }\r
-       \r
-       public ThreadPool() {\r
-               super(ExecutorState.Active);\r
-       }\r
-       \r
-       /**\r
-        * Create thread pool with 0 initial worker threads.\r
-        * \r
-        * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads.\r
-        */\r
-       public ThreadPool(int maxNumberOfTemporaryThreads) {\r
-               super(ExecutorState.Active);\r
-               setSpawnOnDemandLimit(maxNumberOfTemporaryThreads);\r
-       }\r
-\r
-       public synchronized int getWorkingThreadCount()\r
-       {\r
-               int result = 0;\r
-               for (ThreadMode m : threads.values())\r
-                       if (m != ThreadMode.Interrupted) {\r
-                               result++;\r
-                       }\r
-               return result;\r
-       }\r
-       \r
-       /**\r
-        * Sets the limit for how many threads may be spawned on demand when new\r
-        * work is queued.\r
-        * <p>\r
-        * WorkPool spawns more threads when new work is queued and \r
-        * the number of working threads goes below this value.\r
-        * <p>\r
-        * New thread is spawned if number of threads is less than queued word and\r
-        * the number of threads is less than <code>limit</code> value.   \r
-        * \r
-        * @param limit on demand thread limit\r
-        */\r
-       public void setSpawnOnDemandLimit(int limit)\r
-       {\r
-               this.spawnOnDemandLimit = limit;\r
-       }\r
-       \r
-       /**\r
-        * Makes current thread yield to the service of the thread pool. \r
-        * Service can be ended by invoking setMode(ThreadMode.Interrupted).\r
-        * \r
-        * @param stayResident if true works until interrupted, if false works until queue is empty\r
-        */\r
-       public void work(boolean stayResident)\r
-       {                       \r
-               Thread t = Thread.currentThread();\r
-               synchronized(this) {\r
-                       threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);\r
-               }               \r
-               for (;;) {\r
-                       synchronized(this) {\r
-                               ThreadMode mode = getThreadMode(t);\r
-                               ExecutorState state = getState();\r
-                               \r
-                               // Temporary threads can return if not working right now\r
-                               if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state))) \r
-                                       mode = ThreadMode.Interrupted;\r
-                               \r
-                               if (mode == null || mode == ThreadMode.Interrupted) break;\r
-                               \r
-                               // Do not do anything if the worker is shut down\r
-                               if (state==ExecutorState.Terminated) break;\r
-\r
-                               \r
-                               if (state==ExecutorState.Paused) {\r
-                                       // Remove temporary threads if paused\r
-                                       if (mode==ThreadMode.Temporary) break;\r
-                                       if (mode==ThreadMode.Resident) { \r
-                                               try {\r
-                                                       waitForState(ExecutorState.NON_PAUSED_STATES);\r
-                                               } catch (InterruptedException e) {\r
-                                               }\r
-                                               continue;\r
-                                       }\r
-                               }\r
-                               \r
-                       }\r
-                       try {                                                           \r
-                               //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;\r
-                               s.acquire();\r
-                               WorkImpl work;\r
-                               synchronized(this) {\r
-                                       work = queue.removeFirst();\r
-                               }\r
-                               try {\r
-                                       work.setState(WorkState.Working);\r
-                                       work.getRunnable().run();\r
-                                       work.setState(WorkState.Complete);\r
-                               } catch (RuntimeException e) {\r
-                                       work.setError(e);\r
-                                       work.setState(WorkState.Error);\r
-                               }\r
-                       } catch (InterruptedException e) {\r
-                       }\r
-               }\r
-               // Go to shutdown state?\r
-               boolean goToShutdown = false;\r
-               synchronized(this) {\r
-                       threads.remove(t);\r
-                       // If this is last thread and queue is empty, then go to shutdown state\r
-                       goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();\r
-               }\r
-               // Go to shutdown state.\r
-               if (goToShutdown)\r
-                       super.setState(ExecutorState.Terminated);\r
-       }\r
-       \r
-       Runnable residentRun = new Runnable() \r
-       {\r
-               @Override\r
-               public void run() {\r
-                       work(true);\r
-               }\r
-       };\r
-\r
-       Runnable tempRun = new Runnable() \r
-       {\r
-               @Override\r
-               public void run() {\r
-                       work(false);\r
-               }\r
-       };\r
-       \r
-       /**\r
-        * Spawn new thread to the service of the thread pool\r
-        * @param stayResident \r
-        * @return new running thread\r
-        */\r
-       public Thread newThread(boolean stayResident)\r
-       {\r
-               Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");\r
-               \r
-               synchronized(this) {\r
-                       threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);\r
-               }\r
-               t.start();\r
-               return t;\r
-       }\r
-\r
-       public synchronized ThreadMode getThreadMode(Thread t)\r
-       {\r
-               return threads.get(t);\r
-       }\r
-       \r
-       public synchronized void setMode(Thread t, ThreadMode tm)\r
-       {\r
-               if (t==null || tm==null) throw new IllegalArgumentException();\r
-               ThreadMode oldMode = threads.get(t);\r
-               if (oldMode == tm) return;              \r
-               threads.put(t, tm);\r
-               if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)\r
-               {                               \r
-                       t.interrupt();\r
-               }\r
-               if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/) \r
-               {\r
-                       t.interrupt();\r
-               }\r
-       }\r
-               \r
-       /**\r
-        * Is work queue empty\r
-        * @return true if the work queue is empty\r
-        */\r
-       public synchronized boolean isEmpty()\r
-       {\r
-               return queue.isEmpty();\r
-       }\r
-       \r
-       void interrupt(Thread t)\r
-       {\r
-               t.interrupt();\r
-       }\r
-       \r
-       public synchronized boolean remove(WorkImpl r)\r
-       {\r
-               return queue.remove(r);\r
-       }\r
-       \r
-       @Override\r
-       public WorkMonitor asyncExec(Runnable runnable, StateListener<WorkState> listener) \r
-       throws WorkerClosedException \r
-       {\r
-               WorkImpl w = null;\r
-               synchronized(this) {\r
-                       if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState()))\r
-                               throw new WorkerClosedException();\r
-                       w = new WorkImpl(runnable);\r
-                       if (listener!=null)\r
-                               w.addStateListener(listener);\r
-                       queue.add(w);\r
-                       spawnTempThreads();\r
-               }\r
-               s.release();\r
-               return w;\r
-       }\r
-       \r
-\r
-       // Raise visibility\r
-       @Override\r
-       public boolean setState(ExecutorState state) {\r
-               // If worker is already shutdown then it must not be reactivated\r
-               if (getState() == ExecutorState.Terminated) \r
-                       return false;\r
-               synchronized(this) {\r
-                       // Go directly to shutdown if queue is empty and there are no threads\r
-                       if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown)\r
-                       {\r
-                               if (threads.isEmpty() && queue.isEmpty()) {\r
-                                       super.setState( ExecutorState.Shutdown );\r
-                                       super.setState( ExecutorState.Terminated );\r
-                               } else {\r
-                                       super.setState( ExecutorState.Shutdown );\r
-                               }\r
-                               return true;\r
-                       }\r
-               }\r
-               return super.setState(state);\r
-       }\r
-       \r
-       /**\r
-        * Cancel all work.\r
-        * @param mayInterrupt \r
-        */\r
-       public synchronized void cancelAll(boolean mayInterrupt) \r
-       {\r
-               for (WorkMonitor w : queue)\r
-                       w.cancel(mayInterrupt);\r
-               queue.clear();\r
-       }\r
-       \r
-       @Override\r
-       protected void onStateTransition(ExecutorState oldState,\r
-                       ExecutorState newState) {\r
-               if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState))\r
-                       spawnTempThreads();\r
-       }\r
-       \r
-       private synchronized void spawnTempThreads()\r
-       {\r
-               if (!ExecutorState.WORKING_STATES.contains(getState())) return; \r
-               if (spawnOnDemandLimit>0) {\r
-                       int workers = getWorkingThreadCount();\r
-                       int n = spawnOnDemandLimit - workers;\r
-                       for (int i=0; i<n; i++)\r
-                               newThread(false);\r
-               }               \r
-       }\r
-\r
-       public class WorkImpl extends AbstractWorkMonitor \r
-       {\r
-               public WorkImpl(Runnable r) {\r
-                       super(r);\r
-               }       \r
-\r
-               @Override\r
-               public boolean cancel(boolean mayInterrupt) {\r
-                       if (mayInterrupt) \r
-                               throw new RuntimeException("NOT IMPLEMENTED");\r
-                       synchronized(ThreadPool.this) \r
-                       {\r
-                               WorkState s = getState();\r
-                               if (s==WorkState.Ready) \r
-                                       return remove(this);                             \r
-                       }\r
-                       return false;\r
-               }       \r
-\r
-               // Raise visibility\r
-               @Override\r
-               public boolean setState(WorkState state) {\r
-                       return super.setState(state);\r
-               }\r
-       }\r
-       \r
-       @Override\r
-       public synchronized void getQueuedWork(Collection<WorkMonitor> result) {\r
-               result.addAll(queue);\r
-       }\r
-       \r
-       \r
-       public static void main(String[] args) throws InterruptedException {\r
-               ThreadPool pool = new ThreadPool(2);\r
-               \r
-               pool.addStateListener(new StateListener<ExecutorState>() {\r
-                       @Override\r
-                       public void onStateTransition(IStatefulObject<ExecutorState, ?> monitor, ExecutorState oldState, ExecutorState newState) {\r
-                               System.out.println("Worker State: "+newState);\r
-                       }});\r
-               \r
-               StateListener<WorkState> l = new StateListener<WorkState>() {\r
-                       @Override\r
-                       public void onStateTransition(IStatefulObject<WorkState, ?> monitor, WorkState oldState, WorkState newState) {\r
-                               System.out.println("Work ("+monitor+"): "+newState);\r
-                       }\r
-               };\r
-               \r
-               Runnable[] r = new Runnable[10];\r
-               WorkMonitor[] m = new WorkMonitor[10];\r
-               for (int i=0; i<r.length; i++)\r
-               {\r
-                       final int j = i;\r
-                       r[i] = new Runnable() {\r
-                               @Override\r
-                               public String toString() {\r
-                                       return ""+j;\r
-                               }\r
-                               @Override\r
-                               public void run() {\r
-                                       try {\r
-                                               Thread.sleep(1000);\r
-                                       } catch (InterruptedException e) {\r
-                                               //e.printStackTrace();\r
-                                       }\r
-                                       System.out.println(j+" completed by "+Thread.currentThread()); \r
-                               }};\r
-               }\r
-               \r
-               int i=0;\r
-               for (Runnable rr : r)\r
-                       m[i++] = pool.asyncExec(rr, l);\r
-               \r
-               for (int j=2; j<8; j++)\r
-                       m[j].cancel(false);\r
-               \r
-               pool.setState(ExecutorState.Terminated);\r
-               m[9].waitForState(EnumSet.of(WorkState.Complete));\r
-\r
-               \r
-               System.out.println("fin");\r
-       }\r
-\r
-       public void setThreadFactory(ThreadFactory factory)\r
-       {\r
-               this.threadFactory = factory;\r
-       }\r
-\r
-       @Override\r
-       public void execute(Runnable command) {\r
-               asyncExec(command, null);\r
-       }\r
-       \r
-    /**\r
-     * Returns the task queue used by this executor. Access to the\r
-     * task queue is intended primarily for debugging and monitoring.\r
-     * This queue may be in active use.  Retrieving the task queue\r
-     * does not prevent queued tasks from executing.\r
-     *\r
-     * @return the task queue\r
-     */\r
-//    public BlockingQueue<Runnable> getQueue() {\r
-//        return queue;\r
-//    }        \r
-       \r
-       \r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+/*
+ *
+ * @author Toni Kalajainen
+ */
+package org.simantics.utils.threads.ua;
+
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+
+
+/**
+ * ThreadPool contains a queue of work and a pool of threads processing them.
+ * <p>
+ * A thread can join the pool either as resident or as temporary.
+ * Temporary thread works until work queue is empty and returns.  
+ * Thread work mode can be set with setMode().
+ * <p>
+ * Current thread can yield to the service of ThreadPool with work()-invoke.
+ * <p>
+ * A new permanent worker thread can be spawned with newThread().
+ * This thread will be released when the ThreadPool is closed or explicitely with
+ * setMode(thread, ThreadMode.Interrupted);
+ * <p>
+ * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit().  
+ * <p>
+ * ThreadPool can be closed (or paused) with setState().
+ * <p>
+ * TODO Use ThreadFactory
+ * 
+ * @see SynchronizedWorker Use queue to ensure processing order of work.
+ * @see Worker 
+ * @author Toni Kalajainen (toni.kalajainen@vtt.fi)
+ */
+public class ThreadPool extends AbstractState<ExecutorState, RuntimeException> implements Worker, StatefulExecutor {
+
+       
+       private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool");
+       /** Constructs daemon threads of "ThreadPool" group */
+       private final static ThreadFactory FACTORY = new ThreadFactory()
+       {
+               @Override
+               public Thread newThread(Runnable r) {
+                       Thread t = new Thread(THREADGROUP, r, "WorkerThread");
+                       t.setDaemon(true);
+                       return t;                       
+               }
+       };
+       
+       LinkedList<WorkImpl> queue = new LinkedList<WorkImpl>();
+       Semaphore s = new Semaphore(0);
+       
+       Map<Thread, ThreadMode> threads = new HashMap<Thread, ThreadMode>();
+       // How many threads may be spawned on demand when new work is queued
+       int spawnOnDemandLimit = 0;
+       static ThreadPool INSTANCE;
+       ThreadFactory threadFactory = FACTORY;
+       
+       /**
+        * Get singleton instance. This pool has 0 resident threads and spawns
+        * temporary threads up to the number of CPUs in the system.
+        * NOTE! Do not use this shared resource for blocking actions.
+        *  
+        * @return singleton instance which has one worker for each core
+        */
+       public synchronized static ThreadPool getInstance() {
+               if (INSTANCE == null) 
+                       INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors());          
+               return INSTANCE;
+       }
+       
+       public enum ThreadMode {
+               Resident,    // Works and stays resident (sleeps if no work available)
+               Temporary,   // Works until queue is empty
+               Interrupted  // Stops working right away or after current task
+       }
+       
+       public ThreadPool() {
+               super(ExecutorState.Active);
+       }
+       
+       /**
+        * Create thread pool with 0 initial worker threads.
+        * 
+        * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads.
+        */
+       public ThreadPool(int maxNumberOfTemporaryThreads) {
+               super(ExecutorState.Active);
+               setSpawnOnDemandLimit(maxNumberOfTemporaryThreads);
+       }
+
+       public synchronized int getWorkingThreadCount()
+       {
+               int result = 0;
+               for (ThreadMode m : threads.values())
+                       if (m != ThreadMode.Interrupted) {
+                               result++;
+                       }
+               return result;
+       }
+       
+       /**
+        * Sets the limit for how many threads may be spawned on demand when new
+        * work is queued.
+        * <p>
+        * WorkPool spawns more threads when new work is queued and 
+        * the number of working threads goes below this value.
+        * <p>
+        * New thread is spawned if number of threads is less than queued word and
+        * the number of threads is less than <code>limit</code> value.   
+        * 
+        * @param limit on demand thread limit
+        */
+       public void setSpawnOnDemandLimit(int limit)
+       {
+               this.spawnOnDemandLimit = limit;
+       }
+       
+       /**
+        * Makes current thread yield to the service of the thread pool. 
+        * Service can be ended by invoking setMode(ThreadMode.Interrupted).
+        * 
+        * @param stayResident if true works until interrupted, if false works until queue is empty
+        */
+       public void work(boolean stayResident)
+       {                       
+               Thread t = Thread.currentThread();
+               synchronized(this) {
+                       threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
+               }               
+               for (;;) {
+                       synchronized(this) {
+                               ThreadMode mode = getThreadMode(t);
+                               ExecutorState state = getState();
+                               
+                               // Temporary threads can return if not working right now
+                               if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state))) 
+                                       mode = ThreadMode.Interrupted;
+                               
+                               if (mode == null || mode == ThreadMode.Interrupted) break;
+                               
+                               // Do not do anything if the worker is shut down
+                               if (state==ExecutorState.Terminated) break;
+
+                               
+                               if (state==ExecutorState.Paused) {
+                                       // Remove temporary threads if paused
+                                       if (mode==ThreadMode.Temporary) break;
+                                       if (mode==ThreadMode.Resident) { 
+                                               try {
+                                                       waitForState(ExecutorState.NON_PAUSED_STATES);
+                                               } catch (InterruptedException e) {
+                                               }
+                                               continue;
+                                       }
+                               }
+                               
+                       }
+                       try {                                                           
+                               //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;
+                               s.acquire();
+                               WorkImpl work;
+                               synchronized(this) {
+                                       work = queue.removeFirst();
+                               }
+                               try {
+                                       work.setState(WorkState.Working);
+                                       work.getRunnable().run();
+                                       work.setState(WorkState.Complete);
+                               } catch (RuntimeException e) {
+                                       work.setError(e);
+                                       work.setState(WorkState.Error);
+                               }
+                       } catch (InterruptedException e) {
+                       }
+               }
+               // Go to shutdown state?
+               boolean goToShutdown = false;
+               synchronized(this) {
+                       threads.remove(t);
+                       // If this is last thread and queue is empty, then go to shutdown state
+                       goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();
+               }
+               // Go to shutdown state.
+               if (goToShutdown)
+                       super.setState(ExecutorState.Terminated);
+       }
+       
+       Runnable residentRun = new Runnable() 
+       {
+               @Override
+               public void run() {
+                       work(true);
+               }
+       };
+
+       Runnable tempRun = new Runnable() 
+       {
+               @Override
+               public void run() {
+                       work(false);
+               }
+       };
+       
+       /**
+        * Spawn new thread to the service of the thread pool
+        * @param stayResident 
+        * @return new running thread
+        */
+       public Thread newThread(boolean stayResident)
+       {
+               Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");
+               
+               synchronized(this) {
+                       threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
+               }
+               t.start();
+               return t;
+       }
+
+       public synchronized ThreadMode getThreadMode(Thread t)
+       {
+               return threads.get(t);
+       }
+       
+       public synchronized void setMode(Thread t, ThreadMode tm)
+       {
+               if (t==null || tm==null) throw new IllegalArgumentException();
+               ThreadMode oldMode = threads.get(t);
+               if (oldMode == tm) return;              
+               threads.put(t, tm);
+               if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)
+               {                               
+                       t.interrupt();
+               }
+               if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/) 
+               {
+                       t.interrupt();
+               }
+       }
+               
+       /**
+        * Is work queue empty
+        * @return true if the work queue is empty
+        */
+       public synchronized boolean isEmpty()
+       {
+               return queue.isEmpty();
+       }
+       
+       void interrupt(Thread t)
+       {
+               t.interrupt();
+       }
+       
+       public synchronized boolean remove(WorkImpl r)
+       {
+               return queue.remove(r);
+       }
+       
+       @Override
+       public WorkMonitor asyncExec(Runnable runnable, StateListener<WorkState> listener) 
+       throws WorkerClosedException 
+       {
+               WorkImpl w = null;
+               synchronized(this) {
+                       if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState()))
+                               throw new WorkerClosedException();
+                       w = new WorkImpl(runnable);
+                       if (listener!=null)
+                               w.addStateListener(listener);
+                       queue.add(w);
+                       spawnTempThreads();
+               }
+               s.release();
+               return w;
+       }
+       
+
+       // Raise visibility
+       @Override
+       public boolean setState(ExecutorState state) {
+               // If worker is already shutdown then it must not be reactivated
+               if (getState() == ExecutorState.Terminated) 
+                       return false;
+               synchronized(this) {
+                       // Go directly to shutdown if queue is empty and there are no threads
+                       if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown)
+                       {
+                               if (threads.isEmpty() && queue.isEmpty()) {
+                                       super.setState( ExecutorState.Shutdown );
+                                       super.setState( ExecutorState.Terminated );
+                               } else {
+                                       super.setState( ExecutorState.Shutdown );
+                               }
+                               return true;
+                       }
+               }
+               return super.setState(state);
+       }
+       
+       /**
+        * Cancel all work.
+        * @param mayInterrupt 
+        */
+       public synchronized void cancelAll(boolean mayInterrupt) 
+       {
+               for (WorkMonitor w : queue)
+                       w.cancel(mayInterrupt);
+               queue.clear();
+       }
+       
+       @Override
+       protected void onStateTransition(ExecutorState oldState,
+                       ExecutorState newState) {
+               if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState))
+                       spawnTempThreads();
+       }
+       
+       private synchronized void spawnTempThreads()
+       {
+               if (!ExecutorState.WORKING_STATES.contains(getState())) return; 
+               if (spawnOnDemandLimit>0) {
+                       int workers = getWorkingThreadCount();
+                       int n = spawnOnDemandLimit - workers;
+                       for (int i=0; i<n; i++)
+                               newThread(false);
+               }               
+       }
+
+       public class WorkImpl extends AbstractWorkMonitor 
+       {
+               public WorkImpl(Runnable r) {
+                       super(r);
+               }       
+
+               @Override
+               public boolean cancel(boolean mayInterrupt) {
+                       if (mayInterrupt) 
+                               throw new RuntimeException("NOT IMPLEMENTED");
+                       synchronized(ThreadPool.this) 
+                       {
+                               WorkState s = getState();
+                               if (s==WorkState.Ready) 
+                                       return remove(this);                             
+                       }
+                       return false;
+               }       
+
+               // Raise visibility
+               @Override
+               public boolean setState(WorkState state) {
+                       return super.setState(state);
+               }
+       }
+       
+       @Override
+       public synchronized void getQueuedWork(Collection<WorkMonitor> result) {
+               result.addAll(queue);
+       }
+       
+       
+       public static void main(String[] args) throws InterruptedException {
+               ThreadPool pool = new ThreadPool(2);
+               
+               pool.addStateListener(new StateListener<ExecutorState>() {
+                       @Override
+                       public void onStateTransition(IStatefulObject<ExecutorState, ?> monitor, ExecutorState oldState, ExecutorState newState) {
+                               System.out.println("Worker State: "+newState);
+                       }});
+               
+               StateListener<WorkState> l = new StateListener<WorkState>() {
+                       @Override
+                       public void onStateTransition(IStatefulObject<WorkState, ?> monitor, WorkState oldState, WorkState newState) {
+                               System.out.println("Work ("+monitor+"): "+newState);
+                       }
+               };
+               
+               Runnable[] r = new Runnable[10];
+               WorkMonitor[] m = new WorkMonitor[10];
+               for (int i=0; i<r.length; i++)
+               {
+                       final int j = i;
+                       r[i] = new Runnable() {
+                               @Override
+                               public String toString() {
+                                       return ""+j;
+                               }
+                               @Override
+                               public void run() {
+                                       try {
+                                               Thread.sleep(1000);
+                                       } catch (InterruptedException e) {
+                                               //e.printStackTrace();
+                                       }
+                                       System.out.println(j+" completed by "+Thread.currentThread()); 
+                               }};
+               }
+               
+               int i=0;
+               for (Runnable rr : r)
+                       m[i++] = pool.asyncExec(rr, l);
+               
+               for (int j=2; j<8; j++)
+                       m[j].cancel(false);
+               
+               pool.setState(ExecutorState.Terminated);
+               m[9].waitForState(EnumSet.of(WorkState.Complete));
+
+               
+               System.out.println("fin");
+       }
+
+       public void setThreadFactory(ThreadFactory factory)
+       {
+               this.threadFactory = factory;
+       }
+
+       @Override
+       public void execute(Runnable command) {
+               asyncExec(command, null);
+       }
+       
+    /**
+     * Returns the task queue used by this executor. Access to the
+     * task queue is intended primarily for debugging and monitoring.
+     * This queue may be in active use.  Retrieving the task queue
+     * does not prevent queued tasks from executing.
+     *
+     * @return the task queue
+     */
+//    public BlockingQueue<Runnable> getQueue() {
+//        return queue;
+//    }        
+       
+       
+}