1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
14 * @author Toni Kalajainen
16 package org.simantics.utils.threads;
18 import static java.util.concurrent.TimeUnit.NANOSECONDS;
20 import java.awt.EventQueue;
21 import java.lang.reflect.InvocationTargetException;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.LinkedList;
27 import java.util.List;
30 import java.util.concurrent.AbstractExecutorService;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.ScheduledThreadPoolExecutor;
35 import java.util.concurrent.Semaphore;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.locks.Lock;
42 * Utility for switching threads
45 * @see IThreadWorkQueue
47 public class ThreadUtils {
49 public static final int CORES = Runtime.getRuntime().availableProcessors();
52 * Defines the maximum thread pool size of
53 * {@link #getBlockingWorkExecutor()}. The system will enforce this value to
54 * be at least Math.max(CORES, 8). It can be configured through the system
55 * property <em>simantics.executor.blockingMaxThreads</em>.
57 public static final int MAX_BLOCKING_EXECUTOR_THREADS;
60 String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES);
61 int blockingMaxThreads = CORES;
63 blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp);
64 } catch (NumberFormatException e) {
66 MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES);
69 /** Never acquire non-blocking executor from this field, private use */
70 public static ScheduledExecutorService NON_BLOCKING_EXECUTOR;
71 /** Never acquire non-blocking executor from this field, private use */
72 public static ExecutorService BLOCKING_EXECUTOR;
74 static ScheduledExecutorService TIMER;
77 * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread
79 * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread
81 public static CurrentThread getCurrentThread() {
82 return CurrentThread.INSTANCE;
86 * Get a timer executor. Never add blocking or long-wait runnables to this scheduler
88 * @return Executor that executes a non-blocking work
90 public static synchronized ScheduledExecutorService getTimer() {
92 final ThreadGroup tg = new ThreadGroup("Timer");
93 final AtomicInteger counter = new AtomicInteger(0);
94 ThreadFactory tf = new ThreadFactory() {
96 public Thread newThread(Runnable r) {
97 Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet()));
100 if (t.getPriority() != Thread.NORM_PRIORITY)
101 t.setPriority(Thread.NORM_PRIORITY);
105 TIMER = new ScheduledThreadPoolExecutor( 1, tf );
111 * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks
112 * anything, no semaphores, no synchronized() {} blocks. no Object.wait().
114 * @return Executor that executes a non-blocking work
116 public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() {
117 if (NON_BLOCKING_EXECUTOR == null) {
118 final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group");
119 final AtomicInteger counter = new AtomicInteger(0);
120 ThreadFactory tf = new ThreadFactory() {
122 public Thread newThread(Runnable r) {
123 Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet()));
126 if (t.getPriority() != Thread.NORM_PRIORITY)
127 t.setPriority(Thread.NORM_PRIORITY);
131 NON_BLOCKING_EXECUTOR =
132 new ScheduledThreadPoolExecutor( CORES, tf );
134 return NON_BLOCKING_EXECUTOR;
138 * Get Executor executes work that may lock or block.
140 * @return executor for blocking operations
142 public static synchronized ExecutorService getBlockingWorkExecutor() {
143 if (BLOCKING_EXECUTOR == null) {
144 final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group");
145 final AtomicInteger counter = new AtomicInteger(0);
146 ThreadFactory tf = new ThreadFactory() {
148 public Thread newThread(Runnable r) {
149 Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet()));
152 if (t.getPriority() != Thread.NORM_PRIORITY)
153 t.setPriority(Thread.NORM_PRIORITY);
158 // new ThreadPoolExecutor(
160 // MAX_BLOCKING_EXECUTOR_THREADS,
161 // 3L, TimeUnit.SECONDS,
162 // new SynchronousQueue<Runnable>(),
164 new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf);
166 return BLOCKING_EXECUTOR;
170 * Better thread access allows thread context switching back to waiting
174 * @return an enhanced version of the specified queue
176 public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access)
178 if (access instanceof BetterThreadAccess)
180 return new BetterThreadAccess(access);
183 static Map<Thread, WaitingThread> map =
184 new HashMap<Thread, WaitingThread>();
187 * Executes a runnable in thread synchronously.
188 * If the thread locked and waits for this thread, then
189 * the runnable is ran in that thread.
191 * This works only if all thread switching is done
194 * @param threadAccess
196 * @return true if thread accepted the runnable
198 public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
200 if (threadAccess instanceof BetterThreadAccess)
201 threadAccess = ((BetterThreadAccess) threadAccess).ta;
203 // Using current thread
204 if (threadAccess.currentThreadAccess())
208 } catch (RuntimeException e) {
209 handleRunnableError(e);
214 final Thread senderThread = Thread.currentThread();
215 final WaitingThread wt = new WaitingThread(senderThread);
216 WaitingThread prevWt;
217 Event e = new Event(runnable, new EventListener() {
219 public void eventDone(Event e) {
223 synchronized (ThreadUtils.class) {
224 // Check if target has WaitingThread. Use it if it does
225 WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
226 Thread waitingForThread = null;
227 if (targetWt != null)
229 // Check if it is allowed to use the target WT
230 if (isEventQueuingAllowed(senderThread, targetWt))
232 if (targetWt.addEvent(e)) {
234 waitingForThread = targetWt.thread;
235 e.setThread(waitingForThread);
242 if (waitingForThread == null) {
244 waitingForThread = threadAccess.asyncExec(e);
245 if (waitingForThread==null) return false;
246 e.setThread(waitingForThread);
251 prevWt = setWaitingThread(senderThread, wt);
253 // run errands in the mean time
254 wt.waitAndProcessEvents();
255 wt.stopAcceptingEvents();
256 // Stop accepting errands
257 removeWaitingThread(senderThread, prevWt);
259 wt.waitAndProcessEvents();
263 public static boolean multiSyncExec(Collection<Executable> executions)
265 if (executions.isEmpty()) return true;
266 return multiSyncExec(executions.toArray(new Executable[executions.size()]));
270 * Executes a list of executables in multiple threads and waits for all to complete.
273 * @return <code>true</code> once execution has completed (is this necessary?)
275 public static boolean multiSyncExec(Executable ... executions)
277 if (executions.length==0) return true;
278 if (executions.length==1) {
279 return syncExec(executions[0].threadAccess, executions[0].runnable);
281 final Thread senderThread = Thread.currentThread();
282 final WaitingThread wt = new WaitingThread(senderThread);
283 WaitingThread prevWt = null;
284 synchronized (ThreadUtils.class) {
285 for (Executable pair : executions)
287 IThreadWorkQueue threadAccess = pair.threadAccess;
288 if (threadAccess.currentThreadAccess())
290 if (threadAccess instanceof BetterThreadAccess)
291 threadAccess = ((BetterThreadAccess) threadAccess).ta;
292 Runnable runnable = pair.runnable;
294 Event e = new Event(runnable, new EventListener() {
296 public void eventDone(Event e) {
300 // Check if target has WaitingThread. Use it if it does
301 WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
302 Thread waitingForThread = null;
303 if (targetWt != null)
305 // Check if it is allowed to use the target WT
306 if (isEventQueuingAllowed(senderThread, targetWt))
308 if (targetWt.addEvent(e)) {
310 waitingForThread = targetWt.thread;
311 e.setThread(waitingForThread);
318 if (waitingForThread == null) {
320 waitingForThread = threadAccess.asyncExec(e);
321 if (waitingForThread==null)
323 e.setThread(waitingForThread);
328 prevWt = setWaitingThread(senderThread, wt);
331 // Run local runnables
332 for (Executable pair : executions)
334 IThreadWorkQueue threadAccess = pair.threadAccess;
335 Runnable runnable = pair.runnable;
336 if (threadAccess.currentThreadAccess())
339 } catch (RuntimeException e) {
340 handleRunnableError(e);
344 // run errands in the mean time
345 wt.waitAndProcessEvents();
346 // Stop accepting errands
347 wt.stopAcceptingEvents();
348 removeWaitingThread(senderThread, prevWt);
350 wt.waitAndProcessEvents();
354 public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
356 if (threadAccess instanceof BetterThreadAccess)
357 threadAccess = ((BetterThreadAccess) threadAccess).ta;
359 final Thread senderThread = Thread.currentThread();
360 synchronized (ThreadUtils.class) {
362 Event e = new Event(runnable, null, null);
364 // Check if target has WaitingThread. Use it if it does
365 WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
366 if (targetWt != null)
368 // Check if it is allowed to use the target WT
369 if (isEventQueuingAllowed(senderThread, targetWt))
371 if (targetWt.addEvent(e))
372 return targetWt.thread;
375 return threadAccess.asyncExec(runnable);
379 private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set<Thread> visitedTargetThreads)
381 assert(targetThread!=null);
382 if (visitedTargetThreads.contains(targetThread))
384 visitedTargetThreads.add(targetThread);
385 if (sourceThread == targetThread) return false;
386 Set<Thread> waitsFor = getWaitsForThreads(targetThread);
387 if (waitsFor==null||waitsFor.isEmpty()) return false;
388 for (Thread aThreadTargetThreadWaitsFor : waitsFor)
390 if (aThreadTargetThreadWaitsFor==sourceThread) return true;
391 if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue;
392 if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads))
398 static boolean waitsFor(Thread sourceThread, Thread targetThread)
400 return _waitsFor(sourceThread, targetThread, new HashSet<Thread>(3));
403 static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue)
405 if (!eventQueue.acceptEvents) return false;
406 // queuing is allowed if target thread does wait for source thread
407 return waitsFor(sourceThread, eventQueue.thread);
410 private static void handleRunnableError(Throwable t)
415 interface EventListener {
416 void eventDone(Event e);
419 public static class Event implements Runnable {
424 public Event(Runnable r, EventListener l, Semaphore s) {
430 setThread(Thread.currentThread());
433 } catch (RuntimeException e) {
434 handleRunnableError(e);
442 public synchronized Thread getThread()
447 } catch (InterruptedException e) {
452 public synchronized void setThread(Thread t)
463 * WaitingThread is a thread that waits for something and in the meantime
466 static class WaitingThread
469 LinkedList<Event> queue = new LinkedList<Event>();
470 boolean acceptEvents = true;
471 Set<Event> waitingFor = new HashSet<Event>();
472 Set<Event> completed = new HashSet<Event>();
473 public WaitingThread(Thread thread) {
474 this.thread = thread;
476 public synchronized void waitFor(Event event)
478 assert(thread!=null);
479 waitingFor.add(event);
481 public synchronized void completed(Event event)
483 //assert(waitingFor.contains(event));
484 completed.add(event);
485 if (completed.size()==waitingFor.size())
488 synchronized boolean isEmpty() {
489 return queue.isEmpty();
491 synchronized boolean keepWaiting() {
492 if(waitingFor.size()!=completed.size())
494 assert(waitingFor.equals(completed));
497 public synchronized boolean addEvent(Event r)
499 if (!acceptEvents) return false;
505 static private int WAIT_MS = 10000;
506 static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS;
508 public void waitAndProcessEvents()
510 while (keepWaiting() || !isEmpty()) {
514 if (!queue.isEmpty())
516 if (e==null && keepWaiting())
518 long now = System.nanoTime();
520 long duration = System.nanoTime()-now;
521 if(duration > (WAIT_THRESHOLD_NS)) {
522 for(Thread t : getWaitingForThreads())
524 throw new IllegalStateException("Thread '" + thread + "' has died.");
526 } catch (InterruptedException e1) {
532 } catch (RuntimeException e1) {
533 e1.printStackTrace();
542 if (!queue.isEmpty())
548 } catch (RuntimeException e1) {
549 e1.printStackTrace();
554 public synchronized void stopAcceptingEvents()
556 acceptEvents = false;
558 public synchronized Set<Thread> getWaitingForThreads()
560 Set<Thread> result = new HashSet<Thread>(waitingFor.size());
561 for (Event e : waitingFor)
563 if (completed.contains(e)) continue;
564 result.add(e.getThread());
571 * Blocks until waiting thread has been set for thread t
573 * @return waiting thread
575 synchronized static WaitingThread getWaitingThreadSync(Thread t)
577 WaitingThread result;
580 if (result!=null) return result;
582 ThreadUtils.class.wait();
583 } catch (InterruptedException e) {
589 * Return an array which may have nulls
593 static Set<Thread> getWaitsForThreads(Thread t)
595 WaitingThread wt = getWaitingThread(t);
596 if (wt==null) return null;
597 return wt.getWaitingForThreads();
600 static synchronized WaitingThread getWaitingThread(Thread t)
605 static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt)
607 WaitingThread prev = map.put(t, wt);
608 ThreadUtils.class.notifyAll();
612 static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith)
614 assert(t == Thread.currentThread());
616 if (replaceWith!=null)
617 map.put(t, replaceWith);
620 static class BetterThreadAccess implements IThreadWorkQueue {
622 public BetterThreadAccess(IThreadWorkQueue ta)
624 if (ta instanceof BetterThreadAccess)
625 ta = ((BetterThreadAccess) ta).ta;
629 public Thread asyncExec(Runnable runnable) {
630 return ThreadUtils.asyncExec(ta, runnable);
633 public boolean currentThreadAccess() {
634 return ta.currentThreadAccess();
637 public Thread getThread() {
638 return ta.getThread();
641 public boolean syncExec(Runnable runnable) {
642 return ThreadUtils.syncExec(ta, runnable);
649 * Executes command in the executor while maintaining possibility to
650 * return back to caller.
655 public static void exec(Executor executor, final Runnable command)
657 final Thread[] calleeThread = new Thread[1];
658 final Thread callerThread = Thread.currentThread();
659 final Runnable wrappedCommand = new Runnable() {
662 calleeThread[0] = Thread.currentThread();
671 DEPENDENCIES.put(callerThread, calleeThread);
672 executor.execute(wrappedCommand);
673 DEPENDENCIES.remove(callerThread);
675 private static Map<Thread, Thread[]> DEPENDENCIES =
676 Collections.synchronizedMap( new HashMap<Thread, Thread[]>() );
677 @SuppressWarnings("unused")
678 private static boolean hasDependency(Thread waiter, Thread worker)
680 // Follow dependency chain until worker is reached
681 for (Thread t = waiter; t!=null;)
683 Thread[] potentialResult = DEPENDENCIES.get(t);
684 if (potentialResult==null) break;
685 t = potentialResult[0];
686 if (t==worker) return true;
692 * Lock multiple locks simultaneously. If all locks cannot be locked the
693 * thread sleeps a moment and tries again.
695 * If all locks cannot be locked at once, this method doesn't prevent
696 * other from locking them in the mean time.
698 * @param locks an array of locks. null values are ignored
700 public static void lock(Lock...locks) {
701 if (locks.length==0) return;
702 if (locks.length==1) {
709 for (;i<locks.length; i++) {
711 if (l==null) continue;
712 if ( !locks[i].tryLock() ) break;
714 if (i==locks.length) return;
715 for (int j=0; j<i; j++) {
717 if (l==null) continue;
720 try { NANOSECONDS.sleep(10000); } catch (InterruptedException e) {}
725 * Lock multiple locks simultaneously. If all locks cannot be locked the
726 * thread sleeps a moment and tries again.
728 * If all locks cannot be locked at once, this method doesn't prevent
729 * other from locking them in the mean time.
733 * @param locks1 an array of locks. null values are ignored
734 * @param locks2 an array of locks. null values are ignored
736 public static void lock2(Lock[] locks1, Lock[] locks2) {
737 int l1 = locks1.length;
738 int l2 = locks2.length;
740 if (l1==0 && l2==0) return;
744 // Attempt to lock everything in locks 1
747 if (l==null) continue;
748 if ( !locks1[i].tryLock() ) break;
750 // Attempt to lock everything in locks 2
754 if (l==null) continue;
755 if ( !locks2[i-l1].tryLock() ) break;
762 for (int j=l1; j<i; j++) {
763 Lock l = locks2[j-l1];
764 if (l==null) continue;
770 for (int j=0; j<i; j++) {
772 if (l==null) continue;
776 try { NANOSECONDS.sleep(1); } catch (InterruptedException e) {}
781 * Try to lock multiple locks simultaneously. If all locks cannot be locked the
782 * thread sleeps a moment and tries again.
784 * If all locks cannot be locked at once, this method doesn't prevent
785 * other from locking them in the mean time.
787 * @param locks an array of locks. null values are ignored
790 public static boolean tryLock(Lock...locks) {
791 if (locks.length==0) return true;
792 if (locks.length==1) {
793 return locks[0].tryLock();
797 for (;i<locks.length; i++) {
799 if (l==null) continue;
800 if ( !locks[i].tryLock() ) break;
802 if (i==locks.length) return true;
805 for (int j=0; j<i; j++) {
807 if (l==null) continue;
815 * Unlock multiple locks. Use this after locking multiple locks.
817 * @param locks an array of locks. Null values are ignored
819 public static void unlock(Lock...locks) {
820 for (Lock lock : locks) {
821 if (lock != null) lock.unlock();
826 * Unlock multiple locks. Use this after locking multiple locks.
828 * @param locks1 an array of locks. Null values are ignored
829 * @param locsk2 an array of locks. Null values are ignored
831 public static void unlock2(Lock[] locks1, Lock[] locks2) {
832 for (Lock lock : locks1) {
833 if (lock != null) lock.unlock();
835 for (Lock lock : locks2) {
836 if (lock != null) lock.unlock();
841 * Join multiple arrays into a single array
846 public static Lock[] appendLockArrays(Lock[]...lockArrays) {
848 for (Lock[] array : lockArrays) len +=array.length;
849 Lock[] result = new Lock[ len ];
851 for (Lock[] array : lockArrays) {
852 System.arraycopy(array, 0, result, i, array.length);
858 public static synchronized void shutdown() {
860 //System.out.println("TIMERS");
861 shutdownAndAwaitTermination(TIMER, 1000);
864 if (NON_BLOCKING_EXECUTOR != null) {
865 //System.out.println("NON_BLOCKING");
866 shutdownAndAwaitTermination(NON_BLOCKING_EXECUTOR, 1000);
867 NON_BLOCKING_EXECUTOR = null;
869 if (BLOCKING_EXECUTOR != null) {
870 //System.out.println("BLOCKING");
871 shutdownAndAwaitTermination(BLOCKING_EXECUTOR, 1000);
872 BLOCKING_EXECUTOR = null;
877 * Grabbed from {@link ExecutorService} javadoc.
879 * @param pool {@link ExecutorService} to shut down
881 private static void shutdownAndAwaitTermination(ExecutorService pool, long timeoutMs) {
882 //long t = System.currentTimeMillis();
883 pool.shutdown(); // Disable new tasks from being submitted
885 // Wait a while for existing tasks to terminate
886 if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
887 List<Runnable> leftovers = pool.shutdownNow(); // Cancel currently executing tasks
888 if (!leftovers.isEmpty())
889 System.err.println("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers);
890 // Wait a while for tasks to respond to being cancelled
891 if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS))
892 System.err.println("Thread pool '" + pool.toString() + "' did not terminate");
894 } catch (InterruptedException ie) {
895 // (Re-)Cancel if current thread also interrupted
897 // Preserve interrupt status
898 Thread.currentThread().interrupt();
900 //long e = System.currentTimeMillis();
901 //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms");
904 @SuppressWarnings("unused")
905 private static void uncheckedAwaitTermination(ExecutorService service, long time) {
907 NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS);
908 } catch (InterruptedException e) {
914 // Executor that runs in current thread
915 public static Executor CURRENT_THREAD = new CurrentThreadExecutor();
917 // Async executor queues the command into AWT event queue
918 public static ExecutorService AWT_EDT = AWTThread.INSTANCE;
920 // Sync executor blocks the call until the command is finished
921 public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync();
925 class AWTExecutorSync extends AbstractExecutorService {
928 public void execute(Runnable command) {
929 if (EventQueue.isDispatchThread())
934 EventQueue.invokeAndWait(command);
935 } catch (InterruptedException e) {
936 throw new RuntimeException(e);
937 } catch (InvocationTargetException e) {
938 throw new RuntimeException(e.getCause());
944 public void shutdown() {
948 public List<Runnable> shutdownNow() {
953 public boolean isShutdown() {
958 public boolean isTerminated() {
963 public boolean awaitTermination(long timeout, TimeUnit unit)
964 throws InterruptedException {
968 public static String getStackTrace(int levels){
969 StringBuilder sb = new StringBuilder();
970 sb.append( Thread.currentThread() );
972 Exception e3 = new Exception();
973 try { throw e3; } catch(Exception e2) {
975 for (int i=1; i<Math.max(e2.getStackTrace().length, levels+1); i++) {
976 StackTraceElement ste = e2.getStackTrace()[i];
982 return sb.toString();
986 class CurrentThreadExecutor implements Executor {
988 public void execute(Runnable command) {