1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
14 * @author Toni Kalajainen
\r
16 package org.simantics.utils.threads;
\r
18 import static java.util.concurrent.TimeUnit.NANOSECONDS;
\r
20 import java.awt.EventQueue;
\r
21 import java.lang.reflect.InvocationTargetException;
\r
22 import java.util.Collection;
\r
23 import java.util.Collections;
\r
24 import java.util.HashMap;
\r
25 import java.util.HashSet;
\r
26 import java.util.LinkedList;
\r
27 import java.util.List;
\r
28 import java.util.Map;
\r
29 import java.util.Set;
\r
30 import java.util.concurrent.AbstractExecutorService;
\r
31 import java.util.concurrent.Executor;
\r
32 import java.util.concurrent.ExecutorService;
\r
33 import java.util.concurrent.ScheduledExecutorService;
\r
34 import java.util.concurrent.ScheduledThreadPoolExecutor;
\r
35 import java.util.concurrent.Semaphore;
\r
36 import java.util.concurrent.ThreadFactory;
\r
37 import java.util.concurrent.TimeUnit;
\r
38 import java.util.concurrent.atomic.AtomicInteger;
\r
39 import java.util.concurrent.locks.Lock;
\r
42 * Utility for switching threads
\r
45 * @see IThreadWorkQueue
\r
47 public class ThreadUtils {
\r
49 public static final int CORES = Runtime.getRuntime().availableProcessors();
\r
52 * Defines the maximum thread pool size of
\r
53 * {@link #getBlockingWorkExecutor()}. The system will enforce this value to
\r
54 * be at least Math.max(CORES, 8). It can be configured through the system
\r
55 * property <em>simantics.executor.blockingMaxThreads</em>.
\r
57 public static final int MAX_BLOCKING_EXECUTOR_THREADS;
\r
60 String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES);
\r
61 int blockingMaxThreads = CORES;
\r
63 blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp);
\r
64 } catch (NumberFormatException e) {
\r
66 MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES);
\r
69 /** Never acquire non-blocking executor from this field, private use */
\r
70 public static ScheduledExecutorService NON_BLOCKING_EXECUTOR;
\r
71 /** Never acquire non-blocking executor from this field, private use */
\r
72 public static ExecutorService BLOCKING_EXECUTOR;
\r
74 static ScheduledExecutorService TIMER;
\r
77 * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread
\r
79 * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread
\r
81 public static CurrentThread getCurrentThread() {
\r
82 return CurrentThread.INSTANCE;
\r
86 * Get a timer executor. Never add blocking or long-wait runnables to this scheduler
\r
88 * @return Executor that executes a non-blocking work
\r
90 public static synchronized ScheduledExecutorService getTimer() {
\r
91 if (TIMER == null) {
\r
92 final ThreadGroup tg = new ThreadGroup("Timer");
\r
93 final AtomicInteger counter = new AtomicInteger(0);
\r
94 ThreadFactory tf = new ThreadFactory() {
\r
96 public Thread newThread(Runnable r) {
\r
97 Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet()));
\r
100 if (t.getPriority() != Thread.NORM_PRIORITY)
\r
101 t.setPriority(Thread.NORM_PRIORITY);
\r
105 TIMER = new ScheduledThreadPoolExecutor( 1, tf );
\r
111 * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks
\r
112 * anything, no semaphores, no synchronized() {} blocks. no Object.wait().
\r
114 * @return Executor that executes a non-blocking work
\r
116 public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() {
\r
117 if (NON_BLOCKING_EXECUTOR == null) {
\r
118 final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group");
\r
119 final AtomicInteger counter = new AtomicInteger(0);
\r
120 ThreadFactory tf = new ThreadFactory() {
\r
122 public Thread newThread(Runnable r) {
\r
123 Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet()));
\r
126 if (t.getPriority() != Thread.NORM_PRIORITY)
\r
127 t.setPriority(Thread.NORM_PRIORITY);
\r
131 NON_BLOCKING_EXECUTOR =
\r
132 new ScheduledThreadPoolExecutor( CORES, tf );
\r
134 return NON_BLOCKING_EXECUTOR;
\r
138 * Get Executor executes work that may lock or block.
\r
140 * @return executor for blocking operations
\r
142 public static synchronized ExecutorService getBlockingWorkExecutor() {
\r
143 if (BLOCKING_EXECUTOR == null) {
\r
144 final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group");
\r
145 final AtomicInteger counter = new AtomicInteger(0);
\r
146 ThreadFactory tf = new ThreadFactory() {
\r
148 public Thread newThread(Runnable r) {
\r
149 Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet()));
\r
152 if (t.getPriority() != Thread.NORM_PRIORITY)
\r
153 t.setPriority(Thread.NORM_PRIORITY);
\r
157 BLOCKING_EXECUTOR =
\r
158 // new ThreadPoolExecutor(
\r
160 // MAX_BLOCKING_EXECUTOR_THREADS,
\r
161 // 3L, TimeUnit.SECONDS,
\r
162 // new SynchronousQueue<Runnable>(),
\r
164 new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf);
\r
166 return BLOCKING_EXECUTOR;
\r
170 * Better thread access allows thread context switching back to waiting
\r
174 * @return an enhanced version of the specified queue
\r
176 public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access)
\r
178 if (access instanceof BetterThreadAccess)
\r
180 return new BetterThreadAccess(access);
\r
183 static Map<Thread, WaitingThread> map =
\r
184 new HashMap<Thread, WaitingThread>();
\r
187 * Executes a runnable in thread synchronously.
\r
188 * If the thread locked and waits for this thread, then
\r
189 * the runnable is ran in that thread.
\r
191 * This works only if all thread switching is done
\r
192 * using this method.
\r
194 * @param threadAccess
\r
196 * @return true if thread accepted the runnable
\r
198 public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
\r
200 if (threadAccess instanceof BetterThreadAccess)
\r
201 threadAccess = ((BetterThreadAccess) threadAccess).ta;
\r
203 // Using current thread
\r
204 if (threadAccess.currentThreadAccess())
\r
208 } catch (RuntimeException e) {
\r
209 handleRunnableError(e);
\r
214 final Thread senderThread = Thread.currentThread();
\r
215 final WaitingThread wt = new WaitingThread(senderThread);
\r
216 WaitingThread prevWt;
\r
217 Event e = new Event(runnable, new EventListener() {
\r
219 public void eventDone(Event e) {
\r
223 synchronized (ThreadUtils.class) {
\r
224 // Check if target has WaitingThread. Use it if it does
\r
225 WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
\r
226 Thread waitingForThread = null;
\r
227 if (targetWt != null)
\r
229 // Check if it is allowed to use the target WT
\r
230 if (isEventQueuingAllowed(senderThread, targetWt))
\r
232 if (targetWt.addEvent(e)) {
\r
234 waitingForThread = targetWt.thread;
\r
235 e.setThread(waitingForThread);
\r
242 if (waitingForThread == null) {
\r
244 waitingForThread = threadAccess.asyncExec(e);
\r
245 if (waitingForThread==null) return false;
\r
246 e.setThread(waitingForThread);
\r
251 prevWt = setWaitingThread(senderThread, wt);
\r
253 // run errands in the mean time
\r
254 wt.waitAndProcessEvents();
\r
255 wt.stopAcceptingEvents();
\r
256 // Stop accepting errands
\r
257 removeWaitingThread(senderThread, prevWt);
\r
259 wt.waitAndProcessEvents();
\r
263 public static boolean multiSyncExec(Collection<Executable> executions)
\r
265 if (executions.isEmpty()) return true;
\r
266 return multiSyncExec(executions.toArray(new Executable[executions.size()]));
\r
270 * Executes a list of executables in multiple threads and waits for all to complete.
\r
272 * @param executions
\r
273 * @return <code>true</code> once execution has completed (is this necessary?)
\r
275 public static boolean multiSyncExec(Executable ... executions)
\r
277 if (executions.length==0) return true;
\r
278 if (executions.length==1) {
\r
279 return syncExec(executions[0].threadAccess, executions[0].runnable);
\r
281 final Thread senderThread = Thread.currentThread();
\r
282 final WaitingThread wt = new WaitingThread(senderThread);
\r
283 WaitingThread prevWt = null;
\r
284 synchronized (ThreadUtils.class) {
\r
285 for (Executable pair : executions)
\r
287 IThreadWorkQueue threadAccess = pair.threadAccess;
\r
288 if (threadAccess.currentThreadAccess())
\r
290 if (threadAccess instanceof BetterThreadAccess)
\r
291 threadAccess = ((BetterThreadAccess) threadAccess).ta;
\r
292 Runnable runnable = pair.runnable;
\r
294 Event e = new Event(runnable, new EventListener() {
\r
296 public void eventDone(Event e) {
\r
300 // Check if target has WaitingThread. Use it if it does
\r
301 WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
\r
302 Thread waitingForThread = null;
\r
303 if (targetWt != null)
\r
305 // Check if it is allowed to use the target WT
\r
306 if (isEventQueuingAllowed(senderThread, targetWt))
\r
308 if (targetWt.addEvent(e)) {
\r
310 waitingForThread = targetWt.thread;
\r
311 e.setThread(waitingForThread);
\r
318 if (waitingForThread == null) {
\r
320 waitingForThread = threadAccess.asyncExec(e);
\r
321 if (waitingForThread==null)
\r
323 e.setThread(waitingForThread);
\r
328 prevWt = setWaitingThread(senderThread, wt);
\r
331 // Run local runnables
\r
332 for (Executable pair : executions)
\r
334 IThreadWorkQueue threadAccess = pair.threadAccess;
\r
335 Runnable runnable = pair.runnable;
\r
336 if (threadAccess.currentThreadAccess())
\r
339 } catch (RuntimeException e) {
\r
340 handleRunnableError(e);
\r
344 // run errands in the mean time
\r
345 wt.waitAndProcessEvents();
\r
346 // Stop accepting errands
\r
347 wt.stopAcceptingEvents();
\r
348 removeWaitingThread(senderThread, prevWt);
\r
350 wt.waitAndProcessEvents();
\r
354 public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
\r
356 if (threadAccess instanceof BetterThreadAccess)
\r
357 threadAccess = ((BetterThreadAccess) threadAccess).ta;
\r
359 final Thread senderThread = Thread.currentThread();
\r
360 synchronized (ThreadUtils.class) {
\r
362 Event e = new Event(runnable, null, null);
\r
364 // Check if target has WaitingThread. Use it if it does
\r
365 WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
\r
366 if (targetWt != null)
\r
368 // Check if it is allowed to use the target WT
\r
369 if (isEventQueuingAllowed(senderThread, targetWt))
\r
371 if (targetWt.addEvent(e))
\r
372 return targetWt.thread;
\r
375 return threadAccess.asyncExec(runnable);
\r
379 private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set<Thread> visitedTargetThreads)
\r
381 assert(targetThread!=null);
\r
382 if (visitedTargetThreads.contains(targetThread))
\r
384 visitedTargetThreads.add(targetThread);
\r
385 if (sourceThread == targetThread) return false;
\r
386 Set<Thread> waitsFor = getWaitsForThreads(targetThread);
\r
387 if (waitsFor==null||waitsFor.isEmpty()) return false;
\r
388 for (Thread aThreadTargetThreadWaitsFor : waitsFor)
\r
390 if (aThreadTargetThreadWaitsFor==sourceThread) return true;
\r
391 if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue;
\r
392 if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads))
\r
398 static boolean waitsFor(Thread sourceThread, Thread targetThread)
\r
400 return _waitsFor(sourceThread, targetThread, new HashSet<Thread>(3));
\r
403 static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue)
\r
405 if (!eventQueue.acceptEvents) return false;
\r
406 // queuing is allowed if target thread does wait for source thread
\r
407 return waitsFor(sourceThread, eventQueue.thread);
\r
410 private static void handleRunnableError(Throwable t)
\r
412 t.printStackTrace();
\r
415 interface EventListener {
\r
416 void eventDone(Event e);
\r
419 public static class Event implements Runnable {
\r
424 public Event(Runnable r, EventListener l, Semaphore s) {
\r
429 public void run() {
\r
430 setThread(Thread.currentThread());
\r
433 } catch (RuntimeException e) {
\r
434 handleRunnableError(e);
\r
442 public synchronized Thread getThread()
\r
447 } catch (InterruptedException e) {
\r
452 public synchronized void setThread(Thread t)
\r
463 * WaitingThread is a thread that waits for something and in the meantime
\r
466 static class WaitingThread
\r
468 final Thread thread;
\r
469 LinkedList<Event> queue = new LinkedList<Event>();
\r
470 boolean acceptEvents = true;
\r
471 Set<Event> waitingFor = new HashSet<Event>();
\r
472 Set<Event> completed = new HashSet<Event>();
\r
473 public WaitingThread(Thread thread) {
\r
474 this.thread = thread;
\r
476 public synchronized void waitFor(Event event)
\r
478 assert(thread!=null);
\r
479 waitingFor.add(event);
\r
481 public synchronized void completed(Event event)
\r
483 //assert(waitingFor.contains(event));
\r
484 completed.add(event);
\r
485 if (completed.size()==waitingFor.size())
\r
488 synchronized boolean isEmpty() {
\r
489 return queue.isEmpty();
\r
491 synchronized boolean keepWaiting() {
\r
492 if(waitingFor.size()!=completed.size())
\r
494 assert(waitingFor.equals(completed));
\r
497 public synchronized boolean addEvent(Event r)
\r
499 if (!acceptEvents) return false;
\r
505 static private int WAIT_MS = 10000;
\r
506 static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS;
\r
508 public void waitAndProcessEvents()
\r
510 while (keepWaiting() || !isEmpty()) {
\r
514 if (!queue.isEmpty())
\r
516 if (e==null && keepWaiting())
\r
518 long now = System.nanoTime();
\r
520 long duration = System.nanoTime()-now;
\r
521 if(duration > (WAIT_THRESHOLD_NS)) {
\r
522 for(Thread t : getWaitingForThreads())
\r
524 throw new IllegalStateException("Thread '" + thread + "' has died.");
\r
526 } catch (InterruptedException e1) {
\r
532 } catch (RuntimeException e1) {
\r
533 e1.printStackTrace();
\r
542 if (!queue.isEmpty())
\r
548 } catch (RuntimeException e1) {
\r
549 e1.printStackTrace();
\r
554 public synchronized void stopAcceptingEvents()
\r
556 acceptEvents = false;
\r
558 public synchronized Set<Thread> getWaitingForThreads()
\r
560 Set<Thread> result = new HashSet<Thread>(waitingFor.size());
\r
561 for (Event e : waitingFor)
\r
563 if (completed.contains(e)) continue;
\r
564 result.add(e.getThread());
\r
571 * Blocks until waiting thread has been set for thread t
\r
573 * @return waiting thread
\r
575 synchronized static WaitingThread getWaitingThreadSync(Thread t)
\r
577 WaitingThread result;
\r
579 result = map.get(t);
\r
580 if (result!=null) return result;
\r
582 ThreadUtils.class.wait();
\r
583 } catch (InterruptedException e) {
\r
589 * Return an array which may have nulls
\r
593 static Set<Thread> getWaitsForThreads(Thread t)
\r
595 WaitingThread wt = getWaitingThread(t);
\r
596 if (wt==null) return null;
\r
597 return wt.getWaitingForThreads();
\r
600 static synchronized WaitingThread getWaitingThread(Thread t)
\r
605 static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt)
\r
607 WaitingThread prev = map.put(t, wt);
\r
608 ThreadUtils.class.notifyAll();
\r
612 static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith)
\r
614 assert(t == Thread.currentThread());
\r
616 if (replaceWith!=null)
\r
617 map.put(t, replaceWith);
\r
620 static class BetterThreadAccess implements IThreadWorkQueue {
\r
621 IThreadWorkQueue ta;
\r
622 public BetterThreadAccess(IThreadWorkQueue ta)
\r
624 if (ta instanceof BetterThreadAccess)
\r
625 ta = ((BetterThreadAccess) ta).ta;
\r
629 public Thread asyncExec(Runnable runnable) {
\r
630 return ThreadUtils.asyncExec(ta, runnable);
\r
633 public boolean currentThreadAccess() {
\r
634 return ta.currentThreadAccess();
\r
637 public Thread getThread() {
\r
638 return ta.getThread();
\r
641 public boolean syncExec(Runnable runnable) {
\r
642 return ThreadUtils.syncExec(ta, runnable);
\r
649 * Executes command in the executor while maintaining possibility to
\r
650 * return back to caller.
\r
655 public static void exec(Executor executor, final Runnable command)
\r
657 final Thread[] calleeThread = new Thread[1];
\r
658 final Thread callerThread = Thread.currentThread();
\r
659 final Runnable wrappedCommand = new Runnable() {
\r
661 public void run() {
\r
662 calleeThread[0] = Thread.currentThread();
\r
671 DEPENDENCIES.put(callerThread, calleeThread);
\r
672 executor.execute(wrappedCommand);
\r
673 DEPENDENCIES.remove(callerThread);
\r
675 private static Map<Thread, Thread[]> DEPENDENCIES =
\r
676 Collections.synchronizedMap( new HashMap<Thread, Thread[]>() );
\r
677 @SuppressWarnings("unused")
\r
678 private static boolean hasDependency(Thread waiter, Thread worker)
\r
680 // Follow dependency chain until worker is reached
\r
681 for (Thread t = waiter; t!=null;)
\r
683 Thread[] potentialResult = DEPENDENCIES.get(t);
\r
684 if (potentialResult==null) break;
\r
685 t = potentialResult[0];
\r
686 if (t==worker) return true;
\r
692 * Lock multiple locks simultaneously. If all locks cannot be locked the
\r
693 * thread sleeps a moment and tries again.
\r
695 * If all locks cannot be locked at once, this method doesn't prevent
\r
696 * other from locking them in the mean time.
\r
698 * @param locks an array of locks. null values are ignored
\r
700 public static void lock(Lock...locks) {
\r
701 if (locks.length==0) return;
\r
702 if (locks.length==1) {
\r
709 for (;i<locks.length; i++) {
\r
711 if (l==null) continue;
\r
712 if ( !locks[i].tryLock() ) break;
\r
714 if (i==locks.length) return;
\r
715 for (int j=0; j<i; j++) {
\r
717 if (l==null) continue;
\r
720 try { NANOSECONDS.sleep(10000); } catch (InterruptedException e) {}
\r
725 * Lock multiple locks simultaneously. If all locks cannot be locked the
\r
726 * thread sleeps a moment and tries again.
\r
728 * If all locks cannot be locked at once, this method doesn't prevent
\r
729 * other from locking them in the mean time.
\r
733 * @param locks1 an array of locks. null values are ignored
\r
734 * @param locks2 an array of locks. null values are ignored
\r
736 public static void lock2(Lock[] locks1, Lock[] locks2) {
\r
737 int l1 = locks1.length;
\r
738 int l2 = locks2.length;
\r
740 if (l1==0 && l2==0) return;
\r
744 // Attempt to lock everything in locks 1
\r
746 Lock l = locks1[i];
\r
747 if (l==null) continue;
\r
748 if ( !locks1[i].tryLock() ) break;
\r
750 // Attempt to lock everything in locks 2
\r
753 Lock l = locks2[i];
\r
754 if (l==null) continue;
\r
755 if ( !locks2[i-l1].tryLock() ) break;
\r
762 for (int j=l1; j<i; j++) {
\r
763 Lock l = locks2[j-l1];
\r
764 if (l==null) continue;
\r
770 for (int j=0; j<i; j++) {
\r
771 Lock l = locks1[j];
\r
772 if (l==null) continue;
\r
776 try { NANOSECONDS.sleep(1); } catch (InterruptedException e) {}
\r
781 * Try to lock multiple locks simultaneously. If all locks cannot be locked the
\r
782 * thread sleeps a moment and tries again.
\r
784 * If all locks cannot be locked at once, this method doesn't prevent
\r
785 * other from locking them in the mean time.
\r
787 * @param locks an array of locks. null values are ignored
\r
790 public static boolean tryLock(Lock...locks) {
\r
791 if (locks.length==0) return true;
\r
792 if (locks.length==1) {
\r
793 return locks[0].tryLock();
\r
797 for (;i<locks.length; i++) {
\r
799 if (l==null) continue;
\r
800 if ( !locks[i].tryLock() ) break;
\r
802 if (i==locks.length) return true;
\r
805 for (int j=0; j<i; j++) {
\r
807 if (l==null) continue;
\r
815 * Unlock multiple locks. Use this after locking multiple locks.
\r
817 * @param locks an array of locks. Null values are ignored
\r
819 public static void unlock(Lock...locks) {
\r
820 for (Lock lock : locks) {
\r
821 if (lock != null) lock.unlock();
\r
826 * Unlock multiple locks. Use this after locking multiple locks.
\r
828 * @param locks1 an array of locks. Null values are ignored
\r
829 * @param locsk2 an array of locks. Null values are ignored
\r
831 public static void unlock2(Lock[] locks1, Lock[] locks2) {
\r
832 for (Lock lock : locks1) {
\r
833 if (lock != null) lock.unlock();
\r
835 for (Lock lock : locks2) {
\r
836 if (lock != null) lock.unlock();
\r
841 * Join multiple arrays into a single array
\r
843 * @param lockArrays
\r
846 public static Lock[] appendLockArrays(Lock[]...lockArrays) {
\r
848 for (Lock[] array : lockArrays) len +=array.length;
\r
849 Lock[] result = new Lock[ len ];
\r
851 for (Lock[] array : lockArrays) {
\r
852 System.arraycopy(array, 0, result, i, array.length);
\r
858 public static synchronized void shutdown() {
\r
859 if (TIMER != null) {
\r
860 //System.out.println("TIMERS");
\r
861 shutdownAndAwaitTermination(TIMER, 1000);
\r
864 if (NON_BLOCKING_EXECUTOR != null) {
\r
865 //System.out.println("NON_BLOCKING");
\r
866 shutdownAndAwaitTermination(NON_BLOCKING_EXECUTOR, 1000);
\r
867 NON_BLOCKING_EXECUTOR = null;
\r
869 if (BLOCKING_EXECUTOR != null) {
\r
870 //System.out.println("BLOCKING");
\r
871 shutdownAndAwaitTermination(BLOCKING_EXECUTOR, 1000);
\r
872 BLOCKING_EXECUTOR = null;
\r
877 * Grabbed from {@link ExecutorService} javadoc.
\r
879 * @param pool {@link ExecutorService} to shut down
\r
881 private static void shutdownAndAwaitTermination(ExecutorService pool, long timeoutMs) {
\r
882 //long t = System.currentTimeMillis();
\r
883 pool.shutdown(); // Disable new tasks from being submitted
\r
885 // Wait a while for existing tasks to terminate
\r
886 if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
\r
887 List<Runnable> leftovers = pool.shutdownNow(); // Cancel currently executing tasks
\r
888 if (!leftovers.isEmpty())
\r
889 System.err.println("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers);
\r
890 // Wait a while for tasks to respond to being cancelled
\r
891 if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS))
\r
892 System.err.println("Thread pool '" + pool.toString() + "' did not terminate");
\r
894 } catch (InterruptedException ie) {
\r
895 // (Re-)Cancel if current thread also interrupted
\r
896 pool.shutdownNow();
\r
897 // Preserve interrupt status
\r
898 Thread.currentThread().interrupt();
\r
900 //long e = System.currentTimeMillis();
\r
901 //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms");
\r
904 @SuppressWarnings("unused")
\r
905 private static void uncheckedAwaitTermination(ExecutorService service, long time) {
\r
907 NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS);
\r
908 } catch (InterruptedException e) {
\r
912 // Executor support
\r
914 // Executor that runs in current thread
\r
915 public static Executor CURRENT_THREAD = new CurrentThreadExecutor();
\r
917 // Async executor queues the command into AWT event queue
\r
918 public static ExecutorService AWT_EDT = AWTThread.INSTANCE;
\r
920 // Sync executor blocks the call until the command is finished
\r
921 public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync();
\r
925 class AWTExecutorSync extends AbstractExecutorService {
\r
928 public void execute(Runnable command) {
\r
929 if (EventQueue.isDispatchThread())
\r
934 EventQueue.invokeAndWait(command);
\r
935 } catch (InterruptedException e) {
\r
936 throw new RuntimeException(e);
\r
937 } catch (InvocationTargetException e) {
\r
938 throw new RuntimeException(e.getCause());
\r
944 public void shutdown() {
\r
948 public List<Runnable> shutdownNow() {
\r
953 public boolean isShutdown() {
\r
958 public boolean isTerminated() {
\r
963 public boolean awaitTermination(long timeout, TimeUnit unit)
\r
964 throws InterruptedException {
\r
968 public static String getStackTrace(int levels){
\r
969 StringBuilder sb = new StringBuilder();
\r
970 sb.append( Thread.currentThread() );
\r
972 Exception e3 = new Exception();
\r
973 try { throw e3; } catch(Exception e2) {
\r
975 for (int i=1; i<Math.max(e2.getStackTrace().length, levels+1); i++) {
\r
976 StackTraceElement ste = e2.getStackTrace()[i];
\r
982 return sb.toString();
\r
986 class CurrentThreadExecutor implements Executor {
\r
988 public void execute(Runnable command) {
\r