]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ua/ThreadPool.java
Fixed all line endings of the repository
[simantics/platform.git] / bundles / org.simantics.utils.thread / src / org / simantics / utils / threads / ua / ThreadPool.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 /*
13  *
14  * @author Toni Kalajainen
15  */
16 package org.simantics.utils.threads.ua;
17
18 import java.util.Collection;
19 import java.util.EnumSet;
20 import java.util.HashMap;
21 import java.util.LinkedList;
22 import java.util.Map;
23 import java.util.concurrent.Semaphore;
24 import java.util.concurrent.ThreadFactory;
25
26
27 /**
28  * ThreadPool contains a queue of work and a pool of threads processing them.
29  * <p>
30  * A thread can join the pool either as resident or as temporary.
31  * Temporary thread works until work queue is empty and returns.  
32  * Thread work mode can be set with setMode().
33  * <p>
34  * Current thread can yield to the service of ThreadPool with work()-invoke.
35  * <p>
36  * A new permanent worker thread can be spawned with newThread().
37  * This thread will be released when the ThreadPool is closed or explicitely with
38  * setMode(thread, ThreadMode.Interrupted);
39  * <p>
40  * Temporaray threads can be spawned on-demand with setSpawnOnDemandLimit().  
41  * <p>
42  * ThreadPool can be closed (or paused) with setState().
43  * <p>
44  * TODO Use ThreadFactory
45  * 
46  * @see SynchronizedWorker Use queue to ensure processing order of work.
47  * @see Worker 
48  * @author Toni Kalajainen (toni.kalajainen@vtt.fi)
49  */
50 public class ThreadPool extends AbstractState<ExecutorState, RuntimeException> implements Worker, StatefulExecutor {
51
52         
53         private final static ThreadGroup THREADGROUP = new ThreadGroup("ThreadPool");
54         /** Constructs daemon threads of "ThreadPool" group */
55         private final static ThreadFactory FACTORY = new ThreadFactory()
56         {
57                 @Override
58                 public Thread newThread(Runnable r) {
59                         Thread t = new Thread(THREADGROUP, r, "WorkerThread");
60                         t.setDaemon(true);
61                         return t;                       
62                 }
63         };
64         
65         LinkedList<WorkImpl> queue = new LinkedList<WorkImpl>();
66         Semaphore s = new Semaphore(0);
67         
68         Map<Thread, ThreadMode> threads = new HashMap<Thread, ThreadMode>();
69         // How many threads may be spawned on demand when new work is queued
70         int spawnOnDemandLimit = 0;
71         static ThreadPool INSTANCE;
72         ThreadFactory threadFactory = FACTORY;
73         
74         /**
75          * Get singleton instance. This pool has 0 resident threads and spawns
76          * temporary threads up to the number of CPUs in the system.
77          * NOTE! Do not use this shared resource for blocking actions.
78          *  
79          * @return singleton instance which has one worker for each core
80          */
81         public synchronized static ThreadPool getInstance() {
82                 if (INSTANCE == null) 
83                         INSTANCE = new ThreadPool(Runtime.getRuntime().availableProcessors());          
84                 return INSTANCE;
85         }
86         
87         public enum ThreadMode {
88                 Resident,    // Works and stays resident (sleeps if no work available)
89                 Temporary,   // Works until queue is empty
90                 Interrupted  // Stops working right away or after current task
91         }
92         
93         public ThreadPool() {
94                 super(ExecutorState.Active);
95         }
96         
97         /**
98          * Create thread pool with 0 initial worker threads.
99          * 
100          * @param maxNumberOfTemporaryThreads The number of ad-hoc temporary worker threads.
101          */
102         public ThreadPool(int maxNumberOfTemporaryThreads) {
103                 super(ExecutorState.Active);
104                 setSpawnOnDemandLimit(maxNumberOfTemporaryThreads);
105         }
106
107         public synchronized int getWorkingThreadCount()
108         {
109                 int result = 0;
110                 for (ThreadMode m : threads.values())
111                         if (m != ThreadMode.Interrupted) {
112                                 result++;
113                         }
114                 return result;
115         }
116         
117         /**
118          * Sets the limit for how many threads may be spawned on demand when new
119          * work is queued.
120          * <p>
121          * WorkPool spawns more threads when new work is queued and 
122          * the number of working threads goes below this value.
123          * <p>
124          * New thread is spawned if number of threads is less than queued word and
125          * the number of threads is less than <code>limit</code> value.   
126          * 
127          * @param limit on demand thread limit
128          */
129         public void setSpawnOnDemandLimit(int limit)
130         {
131                 this.spawnOnDemandLimit = limit;
132         }
133         
134         /**
135          * Makes current thread yield to the service of the thread pool. 
136          * Service can be ended by invoking setMode(ThreadMode.Interrupted).
137          * 
138          * @param stayResident if true works until interrupted, if false works until queue is empty
139          */
140         public void work(boolean stayResident)
141         {                       
142                 Thread t = Thread.currentThread();
143                 synchronized(this) {
144                         threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
145                 }               
146                 for (;;) {
147                         synchronized(this) {
148                                 ThreadMode mode = getThreadMode(t);
149                                 ExecutorState state = getState();
150                                 
151                                 // Temporary threads can return if not working right now
152                                 if (mode == ThreadMode.Temporary && (isEmpty() || !ExecutorState.WORKING_STATES.contains(state))) 
153                                         mode = ThreadMode.Interrupted;
154                                 
155                                 if (mode == null || mode == ThreadMode.Interrupted) break;
156                                 
157                                 // Do not do anything if the worker is shut down
158                                 if (state==ExecutorState.Terminated) break;
159
160                                 
161                                 if (state==ExecutorState.Paused) {
162                                         // Remove temporary threads if paused
163                                         if (mode==ThreadMode.Temporary) break;
164                                         if (mode==ThreadMode.Resident) { 
165                                                 try {
166                                                         waitForState(ExecutorState.NON_PAUSED_STATES);
167                                                 } catch (InterruptedException e) {
168                                                 }
169                                                 continue;
170                                         }
171                                 }
172                                 
173                         }
174                         try {                                                           
175                                 //s.tryAcquire(1000, TimeUnit.MILLISECONDS)) continue;
176                                 s.acquire();
177                                 WorkImpl work;
178                                 synchronized(this) {
179                                         work = queue.removeFirst();
180                                 }
181                                 try {
182                                         work.setState(WorkState.Working);
183                                         work.getRunnable().run();
184                                         work.setState(WorkState.Complete);
185                                 } catch (RuntimeException e) {
186                                         work.setError(e);
187                                         work.setState(WorkState.Error);
188                                 }
189                         } catch (InterruptedException e) {
190                         }
191                 }
192                 // Go to shutdown state?
193                 boolean goToShutdown = false;
194                 synchronized(this) {
195                         threads.remove(t);
196                         // If this is last thread and queue is empty, then go to shutdown state
197                         goToShutdown = getState() == ExecutorState.Shutdown && threads.isEmpty() && queue.isEmpty();
198                 }
199                 // Go to shutdown state.
200                 if (goToShutdown)
201                         super.setState(ExecutorState.Terminated);
202         }
203         
204         Runnable residentRun = new Runnable() 
205         {
206                 @Override
207                 public void run() {
208                         work(true);
209                 }
210         };
211
212         Runnable tempRun = new Runnable() 
213         {
214                 @Override
215                 public void run() {
216                         work(false);
217                 }
218         };
219         
220         /**
221          * Spawn new thread to the service of the thread pool
222          * @param stayResident 
223          * @return new running thread
224          */
225         public Thread newThread(boolean stayResident)
226         {
227                 Thread t = new Thread(THREADGROUP, stayResident ? residentRun : tempRun, "WorkerThread");
228                 
229                 synchronized(this) {
230                         threads.put(t, stayResident ? ThreadMode.Resident : ThreadMode.Temporary);
231                 }
232                 t.start();
233                 return t;
234         }
235
236         public synchronized ThreadMode getThreadMode(Thread t)
237         {
238                 return threads.get(t);
239         }
240         
241         public synchronized void setMode(Thread t, ThreadMode tm)
242         {
243                 if (t==null || tm==null) throw new IllegalArgumentException();
244                 ThreadMode oldMode = threads.get(t);
245                 if (oldMode == tm) return;              
246                 threads.put(t, tm);
247                 if (tm == ThreadMode.Interrupted /*&& t.getState() == Thread.State.WAITING*/)
248                 {                               
249                         t.interrupt();
250                 }
251                 if (tm == ThreadMode.Temporary && isEmpty() /*&& t.getState() == Thread.State.WAITING*/) 
252                 {
253                         t.interrupt();
254                 }
255         }
256                 
257         /**
258          * Is work queue empty
259          * @return true if the work queue is empty
260          */
261         public synchronized boolean isEmpty()
262         {
263                 return queue.isEmpty();
264         }
265         
266         void interrupt(Thread t)
267         {
268                 t.interrupt();
269         }
270         
271         public synchronized boolean remove(WorkImpl r)
272         {
273                 return queue.remove(r);
274         }
275         
276         @Override
277         public WorkMonitor asyncExec(Runnable runnable, StateListener<WorkState> listener) 
278         throws WorkerClosedException 
279         {
280                 WorkImpl w = null;
281                 synchronized(this) {
282                         if (!ExecutorState.ACCEPTS_WORK_STATES.contains(getState()))
283                                 throw new WorkerClosedException();
284                         w = new WorkImpl(runnable);
285                         if (listener!=null)
286                                 w.addStateListener(listener);
287                         queue.add(w);
288                         spawnTempThreads();
289                 }
290                 s.release();
291                 return w;
292         }
293         
294
295         // Raise visibility
296         @Override
297         public boolean setState(ExecutorState state) {
298                 // If worker is already shutdown then it must not be reactivated
299                 if (getState() == ExecutorState.Terminated) 
300                         return false;
301                 synchronized(this) {
302                         // Go directly to shutdown if queue is empty and there are no threads
303                         if (state==ExecutorState.Terminated || state==ExecutorState.Shutdown)
304                         {
305                                 if (threads.isEmpty() && queue.isEmpty()) {
306                                         super.setState( ExecutorState.Shutdown );
307                                         super.setState( ExecutorState.Terminated );
308                                 } else {
309                                         super.setState( ExecutorState.Shutdown );
310                                 }
311                                 return true;
312                         }
313                 }
314                 return super.setState(state);
315         }
316         
317         /**
318          * Cancel all work.
319          * @param mayInterrupt 
320          */
321         public synchronized void cancelAll(boolean mayInterrupt) 
322         {
323                 for (WorkMonitor w : queue)
324                         w.cancel(mayInterrupt);
325                 queue.clear();
326         }
327         
328         @Override
329         protected void onStateTransition(ExecutorState oldState,
330                         ExecutorState newState) {
331                 if (!ExecutorState.WORKING_STATES.contains(oldState) && ExecutorState.WORKING_STATES.contains(newState))
332                         spawnTempThreads();
333         }
334         
335         private synchronized void spawnTempThreads()
336         {
337                 if (!ExecutorState.WORKING_STATES.contains(getState())) return; 
338                 if (spawnOnDemandLimit>0) {
339                         int workers = getWorkingThreadCount();
340                         int n = spawnOnDemandLimit - workers;
341                         for (int i=0; i<n; i++)
342                                 newThread(false);
343                 }               
344         }
345
346         public class WorkImpl extends AbstractWorkMonitor 
347         {
348                 public WorkImpl(Runnable r) {
349                         super(r);
350                 }       
351
352                 @Override
353                 public boolean cancel(boolean mayInterrupt) {
354                         if (mayInterrupt) 
355                                 throw new RuntimeException("NOT IMPLEMENTED");
356                         synchronized(ThreadPool.this) 
357                         {
358                                 WorkState s = getState();
359                                 if (s==WorkState.Ready) 
360                                         return remove(this);                             
361                         }
362                         return false;
363                 }       
364
365                 // Raise visibility
366                 @Override
367                 public boolean setState(WorkState state) {
368                         return super.setState(state);
369                 }
370         }
371         
372         @Override
373         public synchronized void getQueuedWork(Collection<WorkMonitor> result) {
374                 result.addAll(queue);
375         }
376         
377         
378         public static void main(String[] args) throws InterruptedException {
379                 ThreadPool pool = new ThreadPool(2);
380                 
381                 pool.addStateListener(new StateListener<ExecutorState>() {
382                         @Override
383                         public void onStateTransition(IStatefulObject<ExecutorState, ?> monitor, ExecutorState oldState, ExecutorState newState) {
384                                 System.out.println("Worker State: "+newState);
385                         }});
386                 
387                 StateListener<WorkState> l = new StateListener<WorkState>() {
388                         @Override
389                         public void onStateTransition(IStatefulObject<WorkState, ?> monitor, WorkState oldState, WorkState newState) {
390                                 System.out.println("Work ("+monitor+"): "+newState);
391                         }
392                 };
393                 
394                 Runnable[] r = new Runnable[10];
395                 WorkMonitor[] m = new WorkMonitor[10];
396                 for (int i=0; i<r.length; i++)
397                 {
398                         final int j = i;
399                         r[i] = new Runnable() {
400                                 @Override
401                                 public String toString() {
402                                         return ""+j;
403                                 }
404                                 @Override
405                                 public void run() {
406                                         try {
407                                                 Thread.sleep(1000);
408                                         } catch (InterruptedException e) {
409                                                 //e.printStackTrace();
410                                         }
411                                         System.out.println(j+" completed by "+Thread.currentThread()); 
412                                 }};
413                 }
414                 
415                 int i=0;
416                 for (Runnable rr : r)
417                         m[i++] = pool.asyncExec(rr, l);
418                 
419                 for (int j=2; j<8; j++)
420                         m[j].cancel(false);
421                 
422                 pool.setState(ExecutorState.Terminated);
423                 m[9].waitForState(EnumSet.of(WorkState.Complete));
424
425                 
426                 System.out.println("fin");
427         }
428
429         public void setThreadFactory(ThreadFactory factory)
430         {
431                 this.threadFactory = factory;
432         }
433
434         @Override
435         public void execute(Runnable command) {
436                 asyncExec(command, null);
437         }
438         
439     /**
440      * Returns the task queue used by this executor. Access to the
441      * task queue is intended primarily for debugging and monitoring.
442      * This queue may be in active use.  Retrieving the task queue
443      * does not prevent queued tasks from executing.
444      *
445      * @return the task queue
446      */
447 //    public BlockingQueue<Runnable> getQueue() {
448 //        return queue;
449 //    } 
450         
451         
452 }