]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.utils.thread / src / org / simantics / utils / threads / ua / ThreadPool.java
diff --git a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java
new file mode 100644 (file)
index 0000000..cff1964
--- /dev/null
@@ -0,0 +1,452 @@
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ *     VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+/*\r
+ *\r
+ * @author Toni Kalajainen\r
+ */\r
+package org.simantics.utils.threads.ua;\r
+\r
+import java.util.Collection;\r
+import java.util.EnumSet;\r
+import java.util.HashMap;\r
+import java.util.LinkedList;\r
+import java.util.Map;\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.ThreadFactory;\r
+\r
+\r
+/**\r
+ * ThreadPool contains a queue of work and a pool of threads processing them.\r
+ * <p>\r
+ * A thread can join the pool either as resident or as temporary.\r
+ * Temporary thread works until work queue is empty and returns.  \r
+ * Thread work mode can be set with setMode().\r
+ * <p>\r
+ * Current thread can yield to the service of ThreadPool with work()-invoke.\r
+ * <p>\r
+ * A new permanent worker thread can be spawned with newThread().\r
+ * This thread will be released when the ThreadPool is closed or explicitely with\r
+ * setMode(thread, ThreadMode.Interrupted);\r
+ * <p>\r
+ * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit().  \r
+ * <p>\r
+ * ThreadPool can be closed (or paused) with setState().\r
+ * <p>\r
+ * TODO Use ThreadFactory\r
+ * \r
+ * @see SynchronizedWorker Use queue to ensure processing order of work.\r
+ * @see Worker \r
+ * @author Toni Kalajainen (toni.kalajainen@vtt.fi)\r
+ */\r
+public class ThreadPool extends AbstractState<ExecutorState, RuntimeException> implements Worker, StatefulExecutor {\r
+\r
+       \r
+       private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool");\r
+       /** Constructs daemon threads of "ThreadPool" group */\r
+       private final static ThreadFactory FACTORY = new ThreadFactory()\r
+       {\r
+               @Override\r
+               public Thread newThread(Runnable r) {\r
+                       Thread t = new Thread(THREADGROUP, r, "WorkerThread");\r
+                       t.setDaemon(true);\r
+                       return t;                       \r
+               }\r
+       };\r
+       \r
+       LinkedList<WorkImpl> queue = new LinkedList<WorkImpl>();\r
+       Semaphore s = new Semaphore(0);\r
+       \r
+       Map<Thread, ThreadMode> threads = new HashMap<Thread, ThreadMode>();\r
+       // How many threads may be spawned on demand when new work is queued\r
+       int spawnOnDemandLimit = 0;\r
+       static ThreadPool INSTANCE;\r
+       ThreadFactory threadFactory = FACTORY;\r
+       \r
+       /**\r
+        * Get singleton instance. This pool has 0 resident threads and spawns\r
+        * temporary threads up to the number of CPUs in the system.\r
+        * NOTE! Do not use this shared resource for blocking actions.\r
+        *  \r
+        * @return singleton instance which has one worker for each core\r
+        */\r
+       public synchronized static ThreadPool getInstance() {\r
+               if (INSTANCE == null) \r
+                       INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors());          \r
+               return INSTANCE;\r
+       }\r
+       \r
+       public enum ThreadMode {\r
+               Resident,    // Works and stays resident (sleeps if no work available)\r
+               Temporary,   // Works until queue is empty\r
+               Interrupted  // Stops working right away or after current task\r
+       }\r
+       \r
+       public ThreadPool() {\r
+               super(ExecutorState.Active);\r
+       }\r
+       \r
+       /**\r
+        * Create thread pool with 0 initial worker threads.\r
+        * \r
+        * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads.\r
+        */\r
+       public ThreadPool(int maxNumberOfTemporaryThreads) {\r
+               super(ExecutorState.Active);\r
+               setSpawnOnDemandLimit(maxNumberOfTemporaryThreads);\r
+       }\r
+\r
+       public synchronized int getWorkingThreadCount()\r
+       {\r
+               int result = 0;\r
+               for (ThreadMode m : threads.values())\r
+                       if (m != ThreadMode.Interrupted) {\r
+                               result++;\r
+                       }\r
+               return result;\r
+       }\r
+       \r
+       /**\r
+        * Sets the limit for how many threads may be spawned on demand when new\r
+        * work is queued.\r
+        * <p>\r
+        * WorkPool spawns more threads when new work is queued and \r
+        * the number of working threads goes below this value.\r
+        * <p>\r
+        * New thread is spawned if number of threads is less than queued word and\r
+        * the number of threads is less than <code>limit</code> value.   \r
+        * \r
+        * @param limit on demand thread limit\r
+        */\r
+       public void setSpawnOnDemandLimit(int limit)\r
+       {\r
+               this.spawnOnDemandLimit = limit;\r
+       }\r
+       \r
+       /**\r
+        * Makes current thread yield to the service of the thread pool. \r
+        * Service can be ended by invoking setMode(ThreadMode.Interrupted).\r
+        * \r
+        * @param stayResident if true works until interrupted, if false works until queue is empty\r
+        */\r
+       public void work(boolean stayResident)\r
+       {                       \r
+               Thread t = Thread.currentThread();\r
+               synchronized(this) {\r
+                       threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);\r
+               }               \r
+               for (;;) {\r
+                       synchronized(this) {\r
+                               ThreadMode mode = getThreadMode(t);\r
+                               ExecutorState state = getState();\r
+                               \r
+                               // Temporary threads can return if not working right now\r
+                               if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state))) \r
+                                       mode = ThreadMode.Interrupted;\r
+                               \r
+                               if (mode == null || mode == ThreadMode.Interrupted) break;\r
+                               \r
+                               // Do not do anything if the worker is shut down\r
+                               if (state==ExecutorState.Terminated) break;\r
+\r
+                               \r
+                               if (state==ExecutorState.Paused) {\r
+                                       // Remove temporary threads if paused\r
+                                       if (mode==ThreadMode.Temporary) break;\r
+                                       if (mode==ThreadMode.Resident) { \r
+                                               try {\r
+                                                       waitForState(ExecutorState.NON_PAUSED_STATES);\r
+                                               } catch (InterruptedException e) {\r
+                                               }\r
+                                               continue;\r
+                                       }\r
+                               }\r
+                               \r
+                       }\r
+                       try {                                                           \r
+                               //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;\r
+                               s.acquire();\r
+                               WorkImpl work;\r
+                               synchronized(this) {\r
+                                       work = queue.removeFirst();\r
+                               }\r
+                               try {\r
+                                       work.setState(WorkState.Working);\r
+                                       work.getRunnable().run();\r
+                                       work.setState(WorkState.Complete);\r
+                               } catch (RuntimeException e) {\r
+                                       work.setError(e);\r
+                                       work.setState(WorkState.Error);\r
+                               }\r
+                       } catch (InterruptedException e) {\r
+                       }\r
+               }\r
+               // Go to shutdown state?\r
+               boolean goToShutdown = false;\r
+               synchronized(this) {\r
+                       threads.remove(t);\r
+                       // If this is last thread and queue is empty, then go to shutdown state\r
+                       goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();\r
+               }\r
+               // Go to shutdown state.\r
+               if (goToShutdown)\r
+                       super.setState(ExecutorState.Terminated);\r
+       }\r
+       \r
+       Runnable residentRun = new Runnable() \r
+       {\r
+               @Override\r
+               public void run() {\r
+                       work(true);\r
+               }\r
+       };\r
+\r
+       Runnable tempRun = new Runnable() \r
+       {\r
+               @Override\r
+               public void run() {\r
+                       work(false);\r
+               }\r
+       };\r
+       \r
+       /**\r
+        * Spawn new thread to the service of the thread pool\r
+        * @param stayResident \r
+        * @return new running thread\r
+        */\r
+       public Thread newThread(boolean stayResident)\r
+       {\r
+               Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");\r
+               \r
+               synchronized(this) {\r
+                       threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);\r
+               }\r
+               t.start();\r
+               return t;\r
+       }\r
+\r
+       public synchronized ThreadMode getThreadMode(Thread t)\r
+       {\r
+               return threads.get(t);\r
+       }\r
+       \r
+       public synchronized void setMode(Thread t, ThreadMode tm)\r
+       {\r
+               if (t==null || tm==null) throw new IllegalArgumentException();\r
+               ThreadMode oldMode = threads.get(t);\r
+               if (oldMode == tm) return;              \r
+               threads.put(t, tm);\r
+               if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)\r
+               {                               \r
+                       t.interrupt();\r
+               }\r
+               if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/) \r
+               {\r
+                       t.interrupt();\r
+               }\r
+       }\r
+               \r
+       /**\r
+        * Is work queue empty\r
+        * @return true if the work queue is empty\r
+        */\r
+       public synchronized boolean isEmpty()\r
+       {\r
+               return queue.isEmpty();\r
+       }\r
+       \r
+       void interrupt(Thread t)\r
+       {\r
+               t.interrupt();\r
+       }\r
+       \r
+       public synchronized boolean remove(WorkImpl r)\r
+       {\r
+               return queue.remove(r);\r
+       }\r
+       \r
+       @Override\r
+       public WorkMonitor asyncExec(Runnable runnable, StateListener<WorkState> listener) \r
+       throws WorkerClosedException \r
+       {\r
+               WorkImpl w = null;\r
+               synchronized(this) {\r
+                       if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState()))\r
+                               throw new WorkerClosedException();\r
+                       w = new WorkImpl(runnable);\r
+                       if (listener!=null)\r
+                               w.addStateListener(listener);\r
+                       queue.add(w);\r
+                       spawnTempThreads();\r
+               }\r
+               s.release();\r
+               return w;\r
+       }\r
+       \r
+\r
+       // Raise visibility\r
+       @Override\r
+       public boolean setState(ExecutorState state) {\r
+               // If worker is already shutdown then it must not be reactivated\r
+               if (getState() == ExecutorState.Terminated) \r
+                       return false;\r
+               synchronized(this) {\r
+                       // Go directly to shutdown if queue is empty and there are no threads\r
+                       if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown)\r
+                       {\r
+                               if (threads.isEmpty() && queue.isEmpty()) {\r
+                                       super.setState( ExecutorState.Shutdown );\r
+                                       super.setState( ExecutorState.Terminated );\r
+                               } else {\r
+                                       super.setState( ExecutorState.Shutdown );\r
+                               }\r
+                               return true;\r
+                       }\r
+               }\r
+               return super.setState(state);\r
+       }\r
+       \r
+       /**\r
+        * Cancel all work.\r
+        * @param mayInterrupt \r
+        */\r
+       public synchronized void cancelAll(boolean mayInterrupt) \r
+       {\r
+               for (WorkMonitor w : queue)\r
+                       w.cancel(mayInterrupt);\r
+               queue.clear();\r
+       }\r
+       \r
+       @Override\r
+       protected void onStateTransition(ExecutorState oldState,\r
+                       ExecutorState newState) {\r
+               if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState))\r
+                       spawnTempThreads();\r
+       }\r
+       \r
+       private synchronized void spawnTempThreads()\r
+       {\r
+               if (!ExecutorState.WORKING_STATES.contains(getState())) return; \r
+               if (spawnOnDemandLimit>0) {\r
+                       int workers = getWorkingThreadCount();\r
+                       int n = spawnOnDemandLimit - workers;\r
+                       for (int i=0; i<n; i++)\r
+                               newThread(false);\r
+               }               \r
+       }\r
+\r
+       public class WorkImpl extends AbstractWorkMonitor \r
+       {\r
+               public WorkImpl(Runnable r) {\r
+                       super(r);\r
+               }       \r
+\r
+               @Override\r
+               public boolean cancel(boolean mayInterrupt) {\r
+                       if (mayInterrupt) \r
+                               throw new RuntimeException("NOT IMPLEMENTED");\r
+                       synchronized(ThreadPool.this) \r
+                       {\r
+                               WorkState s = getState();\r
+                               if (s==WorkState.Ready) \r
+                                       return remove(this);                             \r
+                       }\r
+                       return false;\r
+               }       \r
+\r
+               // Raise visibility\r
+               @Override\r
+               public boolean setState(WorkState state) {\r
+                       return super.setState(state);\r
+               }\r
+       }\r
+       \r
+       @Override\r
+       public synchronized void getQueuedWork(Collection<WorkMonitor> result) {\r
+               result.addAll(queue);\r
+       }\r
+       \r
+       \r
+       public static void main(String[] args) throws InterruptedException {\r
+               ThreadPool pool = new ThreadPool(2);\r
+               \r
+               pool.addStateListener(new StateListener<ExecutorState>() {\r
+                       @Override\r
+                       public void onStateTransition(IStatefulObject<ExecutorState, ?> monitor, ExecutorState oldState, ExecutorState newState) {\r
+                               System.out.println("Worker State: "+newState);\r
+                       }});\r
+               \r
+               StateListener<WorkState> l = new StateListener<WorkState>() {\r
+                       @Override\r
+                       public void onStateTransition(IStatefulObject<WorkState, ?> monitor, WorkState oldState, WorkState newState) {\r
+                               System.out.println("Work ("+monitor+"): "+newState);\r
+                       }\r
+               };\r
+               \r
+               Runnable[] r = new Runnable[10];\r
+               WorkMonitor[] m = new WorkMonitor[10];\r
+               for (int i=0; i<r.length; i++)\r
+               {\r
+                       final int j = i;\r
+                       r[i] = new Runnable() {\r
+                               @Override\r
+                               public String toString() {\r
+                                       return ""+j;\r
+                               }\r
+                               @Override\r
+                               public void run() {\r
+                                       try {\r
+                                               Thread.sleep(1000);\r
+                                       } catch (InterruptedException e) {\r
+                                               //e.printStackTrace();\r
+                                       }\r
+                                       System.out.println(j+" completed by "+Thread.currentThread()); \r
+                               }};\r
+               }\r
+               \r
+               int i=0;\r
+               for (Runnable rr : r)\r
+                       m[i++] = pool.asyncExec(rr, l);\r
+               \r
+               for (int j=2; j<8; j++)\r
+                       m[j].cancel(false);\r
+               \r
+               pool.setState(ExecutorState.Terminated);\r
+               m[9].waitForState(EnumSet.of(WorkState.Complete));\r
+\r
+               \r
+               System.out.println("fin");\r
+       }\r
+\r
+       public void setThreadFactory(ThreadFactory factory)\r
+       {\r
+               this.threadFactory = factory;\r
+       }\r
+\r
+       @Override\r
+       public void execute(Runnable command) {\r
+               asyncExec(command, null);\r
+       }\r
+       \r
+    /**\r
+     * Returns the task queue used by this executor. Access to the\r
+     * task queue is intended primarily for debugging and monitoring.\r
+     * This queue may be in active use.  Retrieving the task queue\r
+     * does not prevent queued tasks from executing.\r
+     *\r
+     * @return the task queue\r
+     */\r
+//    public BlockingQueue<Runnable> getQueue() {\r
+//        return queue;\r
+//    }        \r
+       \r
+       \r
+}\r