]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.utils.thread / src / org / simantics / utils / threads / ua / ThreadPool.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 /*\r
13  *\r
14  * @author Toni Kalajainen\r
15  */\r
16 package org.simantics.utils.threads.ua;\r
17 \r
18 import java.util.Collection;\r
19 import java.util.EnumSet;\r
20 import java.util.HashMap;\r
21 import java.util.LinkedList;\r
22 import java.util.Map;\r
23 import java.util.concurrent.Semaphore;\r
24 import java.util.concurrent.ThreadFactory;\r
25 \r
26 \r
27 /**\r
28  * ThreadPool contains a queue of work and a pool of threads processing them.\r
29  * <p>\r
30  * A thread can join the pool either as resident or as temporary.\r
31  * Temporary thread works until work queue is empty and returns.  \r
32  * Thread work mode can be set with setMode().\r
33  * <p>\r
34  * Current thread can yield to the service of ThreadPool with work()-invoke.\r
35  * <p>\r
36  * A new permanent worker thread can be spawned with newThread().\r
37  * This thread will be released when the ThreadPool is closed or explicitely with\r
38  * setMode(thread, ThreadMode.Interrupted);\r
39  * <p>\r
40  * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit().  \r
41  * <p>\r
42  * ThreadPool can be closed (or paused) with setState().\r
43  * <p>\r
44  * TODO Use ThreadFactory\r
45  * \r
46  * @see SynchronizedWorker Use queue to ensure processing order of work.\r
47  * @see Worker \r
48  * @author Toni Kalajainen (toni.kalajainen@vtt.fi)\r
49  */\r
50 public class ThreadPool extends AbstractState<ExecutorState, RuntimeException> implements Worker, StatefulExecutor {\r
51 \r
52         \r
53         private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool");\r
54         /** Constructs daemon threads of "ThreadPool" group */\r
55         private final static ThreadFactory FACTORY = new ThreadFactory()\r
56         {\r
57                 @Override\r
58                 public Thread newThread(Runnable r) {\r
59                         Thread t = new Thread(THREADGROUP, r, "WorkerThread");\r
60                         t.setDaemon(true);\r
61                         return t;                       \r
62                 }\r
63         };\r
64         \r
65         LinkedList<WorkImpl> queue = new LinkedList<WorkImpl>();\r
66         Semaphore s = new Semaphore(0);\r
67         \r
68         Map<Thread, ThreadMode> threads = new HashMap<Thread, ThreadMode>();\r
69         // How many threads may be spawned on demand when new work is queued\r
70         int spawnOnDemandLimit = 0;\r
71         static ThreadPool INSTANCE;\r
72         ThreadFactory threadFactory = FACTORY;\r
73         \r
74         /**\r
75          * Get singleton instance. This pool has 0 resident threads and spawns\r
76          * temporary threads up to the number of CPUs in the system.\r
77          * NOTE! Do not use this shared resource for blocking actions.\r
78          *  \r
79          * @return singleton instance which has one worker for each core\r
80          */\r
81         public synchronized static ThreadPool getInstance() {\r
82                 if (INSTANCE == null) \r
83                         INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors());          \r
84                 return INSTANCE;\r
85         }\r
86         \r
87         public enum ThreadMode {\r
88                 Resident,    // Works and stays resident (sleeps if no work available)\r
89                 Temporary,   // Works until queue is empty\r
90                 Interrupted  // Stops working right away or after current task\r
91         }\r
92         \r
93         public ThreadPool() {\r
94                 super(ExecutorState.Active);\r
95         }\r
96         \r
97         /**\r
98          * Create thread pool with 0 initial worker threads.\r
99          * \r
100          * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads.\r
101          */\r
102         public ThreadPool(int maxNumberOfTemporaryThreads) {\r
103                 super(ExecutorState.Active);\r
104                 setSpawnOnDemandLimit(maxNumberOfTemporaryThreads);\r
105         }\r
106 \r
107         public synchronized int getWorkingThreadCount()\r
108         {\r
109                 int result = 0;\r
110                 for (ThreadMode m : threads.values())\r
111                         if (m != ThreadMode.Interrupted) {\r
112                                 result++;\r
113                         }\r
114                 return result;\r
115         }\r
116         \r
117         /**\r
118          * Sets the limit for how many threads may be spawned on demand when new\r
119          * work is queued.\r
120          * <p>\r
121          * WorkPool spawns more threads when new work is queued and \r
122          * the number of working threads goes below this value.\r
123          * <p>\r
124          * New thread is spawned if number of threads is less than queued word and\r
125          * the number of threads is less than <code>limit</code> value.   \r
126          * \r
127          * @param limit on demand thread limit\r
128          */\r
129         public void setSpawnOnDemandLimit(int limit)\r
130         {\r
131                 this.spawnOnDemandLimit = limit;\r
132         }\r
133         \r
134         /**\r
135          * Makes current thread yield to the service of the thread pool. \r
136          * Service can be ended by invoking setMode(ThreadMode.Interrupted).\r
137          * \r
138          * @param stayResident if true works until interrupted, if false works until queue is empty\r
139          */\r
140         public void work(boolean stayResident)\r
141         {                       \r
142                 Thread t = Thread.currentThread();\r
143                 synchronized(this) {\r
144                         threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);\r
145                 }               \r
146                 for (;;) {\r
147                         synchronized(this) {\r
148                                 ThreadMode mode = getThreadMode(t);\r
149                                 ExecutorState state = getState();\r
150                                 \r
151                                 // Temporary threads can return if not working right now\r
152                                 if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state))) \r
153                                         mode = ThreadMode.Interrupted;\r
154                                 \r
155                                 if (mode == null || mode == ThreadMode.Interrupted) break;\r
156                                 \r
157                                 // Do not do anything if the worker is shut down\r
158                                 if (state==ExecutorState.Terminated) break;\r
159 \r
160                                 \r
161                                 if (state==ExecutorState.Paused) {\r
162                                         // Remove temporary threads if paused\r
163                                         if (mode==ThreadMode.Temporary) break;\r
164                                         if (mode==ThreadMode.Resident) { \r
165                                                 try {\r
166                                                         waitForState(ExecutorState.NON_PAUSED_STATES);\r
167                                                 } catch (InterruptedException e) {\r
168                                                 }\r
169                                                 continue;\r
170                                         }\r
171                                 }\r
172                                 \r
173                         }\r
174                         try {                                                           \r
175                                 //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;\r
176                                 s.acquire();\r
177                                 WorkImpl work;\r
178                                 synchronized(this) {\r
179                                         work = queue.removeFirst();\r
180                                 }\r
181                                 try {\r
182                                         work.setState(WorkState.Working);\r
183                                         work.getRunnable().run();\r
184                                         work.setState(WorkState.Complete);\r
185                                 } catch (RuntimeException e) {\r
186                                         work.setError(e);\r
187                                         work.setState(WorkState.Error);\r
188                                 }\r
189                         } catch (InterruptedException e) {\r
190                         }\r
191                 }\r
192                 // Go to shutdown state?\r
193                 boolean goToShutdown = false;\r
194                 synchronized(this) {\r
195                         threads.remove(t);\r
196                         // If this is last thread and queue is empty, then go to shutdown state\r
197                         goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();\r
198                 }\r
199                 // Go to shutdown state.\r
200                 if (goToShutdown)\r
201                         super.setState(ExecutorState.Terminated);\r
202         }\r
203         \r
204         Runnable residentRun = new Runnable() \r
205         {\r
206                 @Override\r
207                 public void run() {\r
208                         work(true);\r
209                 }\r
210         };\r
211 \r
212         Runnable tempRun = new Runnable() \r
213         {\r
214                 @Override\r
215                 public void run() {\r
216                         work(false);\r
217                 }\r
218         };\r
219         \r
220         /**\r
221          * Spawn new thread to the service of the thread pool\r
222          * @param stayResident \r
223          * @return new running thread\r
224          */\r
225         public Thread newThread(boolean stayResident)\r
226         {\r
227                 Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");\r
228                 \r
229                 synchronized(this) {\r
230                         threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);\r
231                 }\r
232                 t.start();\r
233                 return t;\r
234         }\r
235 \r
236         public synchronized ThreadMode getThreadMode(Thread t)\r
237         {\r
238                 return threads.get(t);\r
239         }\r
240         \r
241         public synchronized void setMode(Thread t, ThreadMode tm)\r
242         {\r
243                 if (t==null || tm==null) throw new IllegalArgumentException();\r
244                 ThreadMode oldMode = threads.get(t);\r
245                 if (oldMode == tm) return;              \r
246                 threads.put(t, tm);\r
247                 if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)\r
248                 {                               \r
249                         t.interrupt();\r
250                 }\r
251                 if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/) \r
252                 {\r
253                         t.interrupt();\r
254                 }\r
255         }\r
256                 \r
257         /**\r
258          * Is work queue empty\r
259          * @return true if the work queue is empty\r
260          */\r
261         public synchronized boolean isEmpty()\r
262         {\r
263                 return queue.isEmpty();\r
264         }\r
265         \r
266         void interrupt(Thread t)\r
267         {\r
268                 t.interrupt();\r
269         }\r
270         \r
271         public synchronized boolean remove(WorkImpl r)\r
272         {\r
273                 return queue.remove(r);\r
274         }\r
275         \r
276         @Override\r
277         public WorkMonitor asyncExec(Runnable runnable, StateListener<WorkState> listener) \r
278         throws WorkerClosedException \r
279         {\r
280                 WorkImpl w = null;\r
281                 synchronized(this) {\r
282                         if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState()))\r
283                                 throw new WorkerClosedException();\r
284                         w = new WorkImpl(runnable);\r
285                         if (listener!=null)\r
286                                 w.addStateListener(listener);\r
287                         queue.add(w);\r
288                         spawnTempThreads();\r
289                 }\r
290                 s.release();\r
291                 return w;\r
292         }\r
293         \r
294 \r
295         // Raise visibility\r
296         @Override\r
297         public boolean setState(ExecutorState state) {\r
298                 // If worker is already shutdown then it must not be reactivated\r
299                 if (getState() == ExecutorState.Terminated) \r
300                         return false;\r
301                 synchronized(this) {\r
302                         // Go directly to shutdown if queue is empty and there are no threads\r
303                         if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown)\r
304                         {\r
305                                 if (threads.isEmpty() && queue.isEmpty()) {\r
306                                         super.setState( ExecutorState.Shutdown );\r
307                                         super.setState( ExecutorState.Terminated );\r
308                                 } else {\r
309                                         super.setState( ExecutorState.Shutdown );\r
310                                 }\r
311                                 return true;\r
312                         }\r
313                 }\r
314                 return super.setState(state);\r
315         }\r
316         \r
317         /**\r
318          * Cancel all work.\r
319          * @param mayInterrupt \r
320          */\r
321         public synchronized void cancelAll(boolean mayInterrupt) \r
322         {\r
323                 for (WorkMonitor w : queue)\r
324                         w.cancel(mayInterrupt);\r
325                 queue.clear();\r
326         }\r
327         \r
328         @Override\r
329         protected void onStateTransition(ExecutorState oldState,\r
330                         ExecutorState newState) {\r
331                 if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState))\r
332                         spawnTempThreads();\r
333         }\r
334         \r
335         private synchronized void spawnTempThreads()\r
336         {\r
337                 if (!ExecutorState.WORKING_STATES.contains(getState())) return; \r
338                 if (spawnOnDemandLimit>0) {\r
339                         int workers = getWorkingThreadCount();\r
340                         int n = spawnOnDemandLimit - workers;\r
341                         for (int i=0; i<n; i++)\r
342                                 newThread(false);\r
343                 }               \r
344         }\r
345 \r
346         public class WorkImpl extends AbstractWorkMonitor \r
347         {\r
348                 public WorkImpl(Runnable r) {\r
349                         super(r);\r
350                 }       \r
351 \r
352                 @Override\r
353                 public boolean cancel(boolean mayInterrupt) {\r
354                         if (mayInterrupt) \r
355                                 throw new RuntimeException("NOT IMPLEMENTED");\r
356                         synchronized(ThreadPool.this) \r
357                         {\r
358                                 WorkState s = getState();\r
359                                 if (s==WorkState.Ready) \r
360                                         return remove(this);                             \r
361                         }\r
362                         return false;\r
363                 }       \r
364 \r
365                 // Raise visibility\r
366                 @Override\r
367                 public boolean setState(WorkState state) {\r
368                         return super.setState(state);\r
369                 }\r
370         }\r
371         \r
372         @Override\r
373         public synchronized void getQueuedWork(Collection<WorkMonitor> result) {\r
374                 result.addAll(queue);\r
375         }\r
376         \r
377         \r
378         public static void main(String[] args) throws InterruptedException {\r
379                 ThreadPool pool = new ThreadPool(2);\r
380                 \r
381                 pool.addStateListener(new StateListener<ExecutorState>() {\r
382                         @Override\r
383                         public void onStateTransition(IStatefulObject<ExecutorState, ?> monitor, ExecutorState oldState, ExecutorState newState) {\r
384                                 System.out.println("Worker State: "+newState);\r
385                         }});\r
386                 \r
387                 StateListener<WorkState> l = new StateListener<WorkState>() {\r
388                         @Override\r
389                         public void onStateTransition(IStatefulObject<WorkState, ?> monitor, WorkState oldState, WorkState newState) {\r
390                                 System.out.println("Work ("+monitor+"): "+newState);\r
391                         }\r
392                 };\r
393                 \r
394                 Runnable[] r = new Runnable[10];\r
395                 WorkMonitor[] m = new WorkMonitor[10];\r
396                 for (int i=0; i<r.length; i++)\r
397                 {\r
398                         final int j = i;\r
399                         r[i] = new Runnable() {\r
400                                 @Override\r
401                                 public String toString() {\r
402                                         return ""+j;\r
403                                 }\r
404                                 @Override\r
405                                 public void run() {\r
406                                         try {\r
407                                                 Thread.sleep(1000);\r
408                                         } catch (InterruptedException e) {\r
409                                                 //e.printStackTrace();\r
410                                         }\r
411                                         System.out.println(j+" completed by "+Thread.currentThread()); \r
412                                 }};\r
413                 }\r
414                 \r
415                 int i=0;\r
416                 for (Runnable rr : r)\r
417                         m[i++] = pool.asyncExec(rr, l);\r
418                 \r
419                 for (int j=2; j<8; j++)\r
420                         m[j].cancel(false);\r
421                 \r
422                 pool.setState(ExecutorState.Terminated);\r
423                 m[9].waitForState(EnumSet.of(WorkState.Complete));\r
424 \r
425                 \r
426                 System.out.println("fin");\r
427         }\r
428 \r
429         public void setThreadFactory(ThreadFactory factory)\r
430         {\r
431                 this.threadFactory = factory;\r
432         }\r
433 \r
434         @Override\r
435         public void execute(Runnable command) {\r
436                 asyncExec(command, null);\r
437         }\r
438         \r
439     /**\r
440      * Returns the task queue used by this executor. Access to the\r
441      * task queue is intended primarily for debugging and monitoring.\r
442      * This queue may be in active use.  Retrieving the task queue\r
443      * does not prevent queued tasks from executing.\r
444      *\r
445      * @return the task queue\r
446      */\r
447 //    public BlockingQueue<Runnable> getQueue() {\r
448 //        return queue;\r
449 //    } \r
450         \r
451         \r
452 }\r