--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+/*\r
+ *\r
+ * @author Toni Kalajainen\r
+ */\r
+package org.simantics.utils.threads.ua;\r
+\r
+import java.util.Collection;\r
+import java.util.EnumSet;\r
+import java.util.HashMap;\r
+import java.util.LinkedList;\r
+import java.util.Map;\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.ThreadFactory;\r
+\r
+\r
+/**\r
+ * ThreadPool contains a queue of work and a pool of threads processing them.\r
+ * <p>\r
+ * A thread can join the pool either as resident or as temporary.\r
+ * Temporary thread works until work queue is empty and returns. \r
+ * Thread work mode can be set with setMode().\r
+ * <p>\r
+ * Current thread can yield to the service of ThreadPool with work()-invoke.\r
+ * <p>\r
+ * A new permanent worker thread can be spawned with newThread().\r
+ * This thread will be released when the ThreadPool is closed or explicitely with\r
+ * setMode(thread, ThreadMode.Interrupted);\r
+ * <p>\r
+ * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit(). \r
+ * <p>\r
+ * ThreadPool can be closed (or paused) with setState().\r
+ * <p>\r
+ * TODO Use ThreadFactory\r
+ * \r
+ * @see SynchronizedWorker Use queue to ensure processing order of work.\r
+ * @see Worker \r
+ * @author Toni Kalajainen (toni.kalajainen@vtt.fi)\r
+ */\r
+public class ThreadPool extends AbstractState<ExecutorState, RuntimeException> implements Worker, StatefulExecutor {\r
+\r
+ \r
+ private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool");\r
+ /** Constructs daemon threads of "ThreadPool" group */\r
+ private final static ThreadFactory FACTORY = new ThreadFactory()\r
+ {\r
+ @Override\r
+ public Thread newThread(Runnable r) {\r
+ Thread t = new Thread(THREADGROUP, r, "WorkerThread");\r
+ t.setDaemon(true);\r
+ return t; \r
+ }\r
+ };\r
+ \r
+ LinkedList<WorkImpl> queue = new LinkedList<WorkImpl>();\r
+ Semaphore s = new Semaphore(0);\r
+ \r
+ Map<Thread, ThreadMode> threads = new HashMap<Thread, ThreadMode>();\r
+ // How many threads may be spawned on demand when new work is queued\r
+ int spawnOnDemandLimit = 0;\r
+ static ThreadPool INSTANCE;\r
+ ThreadFactory threadFactory = FACTORY;\r
+ \r
+ /**\r
+ * Get singleton instance. This pool has 0 resident threads and spawns\r
+ * temporary threads up to the number of CPUs in the system.\r
+ * NOTE! Do not use this shared resource for blocking actions.\r
+ * \r
+ * @return singleton instance which has one worker for each core\r
+ */\r
+ public synchronized static ThreadPool getInstance() {\r
+ if (INSTANCE == null) \r
+ INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors()); \r
+ return INSTANCE;\r
+ }\r
+ \r
+ public enum ThreadMode {\r
+ Resident, // Works and stays resident (sleeps if no work available)\r
+ Temporary, // Works until queue is empty\r
+ Interrupted // Stops working right away or after current task\r
+ }\r
+ \r
+ public ThreadPool() {\r
+ super(ExecutorState.Active);\r
+ }\r
+ \r
+ /**\r
+ * Create thread pool with 0 initial worker threads.\r
+ * \r
+ * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads.\r
+ */\r
+ public ThreadPool(int maxNumberOfTemporaryThreads) {\r
+ super(ExecutorState.Active);\r
+ setSpawnOnDemandLimit(maxNumberOfTemporaryThreads);\r
+ }\r
+\r
+ public synchronized int getWorkingThreadCount()\r
+ {\r
+ int result = 0;\r
+ for (ThreadMode m : threads.values())\r
+ if (m != ThreadMode.Interrupted) {\r
+ result++;\r
+ }\r
+ return result;\r
+ }\r
+ \r
+ /**\r
+ * Sets the limit for how many threads may be spawned on demand when new\r
+ * work is queued.\r
+ * <p>\r
+ * WorkPool spawns more threads when new work is queued and \r
+ * the number of working threads goes below this value.\r
+ * <p>\r
+ * New thread is spawned if number of threads is less than queued word and\r
+ * the number of threads is less than <code>limit</code> value. \r
+ * \r
+ * @param limit on demand thread limit\r
+ */\r
+ public void setSpawnOnDemandLimit(int limit)\r
+ {\r
+ this.spawnOnDemandLimit = limit;\r
+ }\r
+ \r
+ /**\r
+ * Makes current thread yield to the service of the thread pool. \r
+ * Service can be ended by invoking setMode(ThreadMode.Interrupted).\r
+ * \r
+ * @param stayResident if true works until interrupted, if false works until queue is empty\r
+ */\r
+ public void work(boolean stayResident)\r
+ { \r
+ Thread t = Thread.currentThread();\r
+ synchronized(this) {\r
+ threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);\r
+ } \r
+ for (;;) {\r
+ synchronized(this) {\r
+ ThreadMode mode = getThreadMode(t);\r
+ ExecutorState state = getState();\r
+ \r
+ // Temporary threads can return if not working right now\r
+ if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state))) \r
+ mode = ThreadMode.Interrupted;\r
+ \r
+ if (mode == null || mode == ThreadMode.Interrupted) break;\r
+ \r
+ // Do not do anything if the worker is shut down\r
+ if (state==ExecutorState.Terminated) break;\r
+\r
+ \r
+ if (state==ExecutorState.Paused) {\r
+ // Remove temporary threads if paused\r
+ if (mode==ThreadMode.Temporary) break;\r
+ if (mode==ThreadMode.Resident) { \r
+ try {\r
+ waitForState(ExecutorState.NON_PAUSED_STATES);\r
+ } catch (InterruptedException e) {\r
+ }\r
+ continue;\r
+ }\r
+ }\r
+ \r
+ }\r
+ try { \r
+ //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;\r
+ s.acquire();\r
+ WorkImpl work;\r
+ synchronized(this) {\r
+ work = queue.removeFirst();\r
+ }\r
+ try {\r
+ work.setState(WorkState.Working);\r
+ work.getRunnable().run();\r
+ work.setState(WorkState.Complete);\r
+ } catch (RuntimeException e) {\r
+ work.setError(e);\r
+ work.setState(WorkState.Error);\r
+ }\r
+ } catch (InterruptedException e) {\r
+ }\r
+ }\r
+ // Go to shutdown state?\r
+ boolean goToShutdown = false;\r
+ synchronized(this) {\r
+ threads.remove(t);\r
+ // If this is last thread and queue is empty, then go to shutdown state\r
+ goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();\r
+ }\r
+ // Go to shutdown state.\r
+ if (goToShutdown)\r
+ super.setState(ExecutorState.Terminated);\r
+ }\r
+ \r
+ Runnable residentRun = new Runnable() \r
+ {\r
+ @Override\r
+ public void run() {\r
+ work(true);\r
+ }\r
+ };\r
+\r
+ Runnable tempRun = new Runnable() \r
+ {\r
+ @Override\r
+ public void run() {\r
+ work(false);\r
+ }\r
+ };\r
+ \r
+ /**\r
+ * Spawn new thread to the service of the thread pool\r
+ * @param stayResident \r
+ * @return new running thread\r
+ */\r
+ public Thread newThread(boolean stayResident)\r
+ {\r
+ Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");\r
+ \r
+ synchronized(this) {\r
+ threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);\r
+ }\r
+ t.start();\r
+ return t;\r
+ }\r
+\r
+ public synchronized ThreadMode getThreadMode(Thread t)\r
+ {\r
+ return threads.get(t);\r
+ }\r
+ \r
+ public synchronized void setMode(Thread t, ThreadMode tm)\r
+ {\r
+ if (t==null || tm==null) throw new IllegalArgumentException();\r
+ ThreadMode oldMode = threads.get(t);\r
+ if (oldMode == tm) return; \r
+ threads.put(t, tm);\r
+ if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)\r
+ { \r
+ t.interrupt();\r
+ }\r
+ if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/) \r
+ {\r
+ t.interrupt();\r
+ }\r
+ }\r
+ \r
+ /**\r
+ * Is work queue empty\r
+ * @return true if the work queue is empty\r
+ */\r
+ public synchronized boolean isEmpty()\r
+ {\r
+ return queue.isEmpty();\r
+ }\r
+ \r
+ void interrupt(Thread t)\r
+ {\r
+ t.interrupt();\r
+ }\r
+ \r
+ public synchronized boolean remove(WorkImpl r)\r
+ {\r
+ return queue.remove(r);\r
+ }\r
+ \r
+ @Override\r
+ public WorkMonitor asyncExec(Runnable runnable, StateListener<WorkState> listener) \r
+ throws WorkerClosedException \r
+ {\r
+ WorkImpl w = null;\r
+ synchronized(this) {\r
+ if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState()))\r
+ throw new WorkerClosedException();\r
+ w = new WorkImpl(runnable);\r
+ if (listener!=null)\r
+ w.addStateListener(listener);\r
+ queue.add(w);\r
+ spawnTempThreads();\r
+ }\r
+ s.release();\r
+ return w;\r
+ }\r
+ \r
+\r
+ // Raise visibility\r
+ @Override\r
+ public boolean setState(ExecutorState state) {\r
+ // If worker is already shutdown then it must not be reactivated\r
+ if (getState() == ExecutorState.Terminated) \r
+ return false;\r
+ synchronized(this) {\r
+ // Go directly to shutdown if queue is empty and there are no threads\r
+ if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown)\r
+ {\r
+ if (threads.isEmpty() && queue.isEmpty()) {\r
+ super.setState( ExecutorState.Shutdown );\r
+ super.setState( ExecutorState.Terminated );\r
+ } else {\r
+ super.setState( ExecutorState.Shutdown );\r
+ }\r
+ return true;\r
+ }\r
+ }\r
+ return super.setState(state);\r
+ }\r
+ \r
+ /**\r
+ * Cancel all work.\r
+ * @param mayInterrupt \r
+ */\r
+ public synchronized void cancelAll(boolean mayInterrupt) \r
+ {\r
+ for (WorkMonitor w : queue)\r
+ w.cancel(mayInterrupt);\r
+ queue.clear();\r
+ }\r
+ \r
+ @Override\r
+ protected void onStateTransition(ExecutorState oldState,\r
+ ExecutorState newState) {\r
+ if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState))\r
+ spawnTempThreads();\r
+ }\r
+ \r
+ private synchronized void spawnTempThreads()\r
+ {\r
+ if (!ExecutorState.WORKING_STATES.contains(getState())) return; \r
+ if (spawnOnDemandLimit>0) {\r
+ int workers = getWorkingThreadCount();\r
+ int n = spawnOnDemandLimit - workers;\r
+ for (int i=0; i<n; i++)\r
+ newThread(false);\r
+ } \r
+ }\r
+\r
+ public class WorkImpl extends AbstractWorkMonitor \r
+ {\r
+ public WorkImpl(Runnable r) {\r
+ super(r);\r
+ } \r
+\r
+ @Override\r
+ public boolean cancel(boolean mayInterrupt) {\r
+ if (mayInterrupt) \r
+ throw new RuntimeException("NOT IMPLEMENTED");\r
+ synchronized(ThreadPool.this) \r
+ {\r
+ WorkState s = getState();\r
+ if (s==WorkState.Ready) \r
+ return remove(this); \r
+ }\r
+ return false;\r
+ } \r
+\r
+ // Raise visibility\r
+ @Override\r
+ public boolean setState(WorkState state) {\r
+ return super.setState(state);\r
+ }\r
+ }\r
+ \r
+ @Override\r
+ public synchronized void getQueuedWork(Collection<WorkMonitor> result) {\r
+ result.addAll(queue);\r
+ }\r
+ \r
+ \r
+ public static void main(String[] args) throws InterruptedException {\r
+ ThreadPool pool = new ThreadPool(2);\r
+ \r
+ pool.addStateListener(new StateListener<ExecutorState>() {\r
+ @Override\r
+ public void onStateTransition(IStatefulObject<ExecutorState, ?> monitor, ExecutorState oldState, ExecutorState newState) {\r
+ System.out.println("Worker State: "+newState);\r
+ }});\r
+ \r
+ StateListener<WorkState> l = new StateListener<WorkState>() {\r
+ @Override\r
+ public void onStateTransition(IStatefulObject<WorkState, ?> monitor, WorkState oldState, WorkState newState) {\r
+ System.out.println("Work ("+monitor+"): "+newState);\r
+ }\r
+ };\r
+ \r
+ Runnable[] r = new Runnable[10];\r
+ WorkMonitor[] m = new WorkMonitor[10];\r
+ for (int i=0; i<r.length; i++)\r
+ {\r
+ final int j = i;\r
+ r[i] = new Runnable() {\r
+ @Override\r
+ public String toString() {\r
+ return ""+j;\r
+ }\r
+ @Override\r
+ public void run() {\r
+ try {\r
+ Thread.sleep(1000);\r
+ } catch (InterruptedException e) {\r
+ //e.printStackTrace();\r
+ }\r
+ System.out.println(j+" completed by "+Thread.currentThread()); \r
+ }};\r
+ }\r
+ \r
+ int i=0;\r
+ for (Runnable rr : r)\r
+ m[i++] = pool.asyncExec(rr, l);\r
+ \r
+ for (int j=2; j<8; j++)\r
+ m[j].cancel(false);\r
+ \r
+ pool.setState(ExecutorState.Terminated);\r
+ m[9].waitForState(EnumSet.of(WorkState.Complete));\r
+\r
+ \r
+ System.out.println("fin");\r
+ }\r
+\r
+ public void setThreadFactory(ThreadFactory factory)\r
+ {\r
+ this.threadFactory = factory;\r
+ }\r
+\r
+ @Override\r
+ public void execute(Runnable command) {\r
+ asyncExec(command, null);\r
+ }\r
+ \r
+ /**\r
+ * Returns the task queue used by this executor. Access to the\r
+ * task queue is intended primarily for debugging and monitoring.\r
+ * This queue may be in active use. Retrieving the task queue\r
+ * does not prevent queued tasks from executing.\r
+ *\r
+ * @return the task queue\r
+ */\r
+// public BlockingQueue<Runnable> getQueue() {\r
+// return queue;\r
+// } \r
+ \r
+ \r
+}\r