1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
14 * @author Toni Kalajainen
16 package org.simantics.utils.threads;
18 import static java.util.concurrent.TimeUnit.NANOSECONDS;
20 import java.awt.EventQueue;
21 import java.lang.reflect.InvocationTargetException;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.LinkedList;
27 import java.util.List;
30 import java.util.concurrent.AbstractExecutorService;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.ScheduledThreadPoolExecutor;
35 import java.util.concurrent.Semaphore;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.locks.Lock;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * Utility for switching threads
48 * @see IThreadWorkQueue
50 public class ThreadUtils {
52 private static final Logger LOGGER = LoggerFactory.getLogger(ThreadUtils.class);
53 public static final int CORES = Runtime.getRuntime().availableProcessors();
56 * Defines the maximum thread pool size of
57 * {@link #getBlockingWorkExecutor()}. The system will enforce this value to
58 * be at least Math.max(CORES, 8). It can be configured through the system
59 * property <em>simantics.executor.blockingMaxThreads</em>.
61 public static final int MAX_BLOCKING_EXECUTOR_THREADS;
64 String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES);
65 int blockingMaxThreads = CORES;
67 blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp);
68 } catch (NumberFormatException e) {
70 MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES);
73 /** Never acquire non-blocking executor from this field, private use */
74 public static ScheduledExecutorService NON_BLOCKING_EXECUTOR;
75 /** Never acquire non-blocking executor from this field, private use */
76 public static ExecutorService BLOCKING_EXECUTOR;
78 static ScheduledExecutorService TIMER;
81 * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread
83 * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread
85 public static CurrentThread getCurrentThread() {
86 return CurrentThread.INSTANCE;
90 * Get a timer executor. Never add blocking or long-wait runnables to this scheduler
92 * @return Executor that executes a non-blocking work
94 public static synchronized ScheduledExecutorService getTimer() {
96 final ThreadGroup tg = new ThreadGroup("Timer");
97 final AtomicInteger counter = new AtomicInteger(0);
98 ThreadFactory tf = new ThreadFactory() {
100 public Thread newThread(Runnable r) {
101 Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet()));
104 if (t.getPriority() != Thread.NORM_PRIORITY)
105 t.setPriority(Thread.NORM_PRIORITY);
109 TIMER = new ScheduledThreadPoolExecutor( 1, tf );
115 * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks
116 * anything, no semaphores, no synchronized() {} blocks. no Object.wait().
118 * @return Executor that executes a non-blocking work
120 public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() {
121 if (NON_BLOCKING_EXECUTOR == null) {
122 final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group");
123 final AtomicInteger counter = new AtomicInteger(0);
124 ThreadFactory tf = new ThreadFactory() {
126 public Thread newThread(Runnable r) {
127 Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet()));
130 if (t.getPriority() != Thread.NORM_PRIORITY)
131 t.setPriority(Thread.NORM_PRIORITY);
135 NON_BLOCKING_EXECUTOR =
136 new ScheduledThreadPoolExecutor( CORES, tf );
138 return NON_BLOCKING_EXECUTOR;
142 * Get Executor executes work that may lock or block.
144 * @return executor for blocking operations
146 public static synchronized ExecutorService getBlockingWorkExecutor() {
147 if (BLOCKING_EXECUTOR == null) {
148 final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group");
149 final AtomicInteger counter = new AtomicInteger(0);
150 ThreadFactory tf = new ThreadFactory() {
152 public Thread newThread(Runnable r) {
153 Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet()));
156 if (t.getPriority() != Thread.NORM_PRIORITY)
157 t.setPriority(Thread.NORM_PRIORITY);
162 // new ThreadPoolExecutor(
164 // MAX_BLOCKING_EXECUTOR_THREADS,
165 // 3L, TimeUnit.SECONDS,
166 // new SynchronousQueue<Runnable>(),
168 new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf);
170 return BLOCKING_EXECUTOR;
174 * Better thread access allows thread context switching back to waiting
178 * @return an enhanced version of the specified queue
180 public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access)
182 if (access instanceof BetterThreadAccess)
184 return new BetterThreadAccess(access);
187 static Map<Thread, WaitingThread> map =
188 new HashMap<Thread, WaitingThread>();
191 * Executes a runnable in thread synchronously.
192 * If the thread locked and waits for this thread, then
193 * the runnable is ran in that thread.
195 * This works only if all thread switching is done
198 * @param threadAccess
200 * @return true if thread accepted the runnable
202 public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
204 if (threadAccess instanceof BetterThreadAccess)
205 threadAccess = ((BetterThreadAccess) threadAccess).ta;
207 // Using current thread
208 if (threadAccess.currentThreadAccess())
212 } catch (RuntimeException e) {
213 handleRunnableError(e);
218 final Thread senderThread = Thread.currentThread();
219 final WaitingThread wt = new WaitingThread(senderThread);
220 WaitingThread prevWt;
221 Event e = new Event(runnable, new EventListener() {
223 public void eventDone(Event e) {
227 synchronized (ThreadUtils.class) {
228 // Check if target has WaitingThread. Use it if it does
229 WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
230 Thread waitingForThread = null;
231 if (targetWt != null)
233 // Check if it is allowed to use the target WT
234 if (isEventQueuingAllowed(senderThread, targetWt))
236 if (targetWt.addEvent(e)) {
238 waitingForThread = targetWt.thread;
239 e.setThread(waitingForThread);
246 if (waitingForThread == null) {
248 waitingForThread = threadAccess.asyncExec(e);
249 if (waitingForThread==null) return false;
250 e.setThread(waitingForThread);
255 prevWt = setWaitingThread(senderThread, wt);
257 // run errands in the mean time
258 wt.waitAndProcessEvents();
259 wt.stopAcceptingEvents();
260 // Stop accepting errands
261 removeWaitingThread(senderThread, prevWt);
263 wt.waitAndProcessEvents();
267 public static boolean multiSyncExec(Collection<Executable> executions)
269 if (executions.isEmpty()) return true;
270 return multiSyncExec(executions.toArray(new Executable[executions.size()]));
274 * Executes a list of executables in multiple threads and waits for all to complete.
277 * @return <code>true</code> once execution has completed (is this necessary?)
279 public static boolean multiSyncExec(Executable ... executions)
281 if (executions.length==0) return true;
282 if (executions.length==1) {
283 return syncExec(executions[0].threadAccess, executions[0].runnable);
285 final Thread senderThread = Thread.currentThread();
286 final WaitingThread wt = new WaitingThread(senderThread);
287 WaitingThread prevWt = null;
288 synchronized (ThreadUtils.class) {
289 for (Executable pair : executions)
291 IThreadWorkQueue threadAccess = pair.threadAccess;
292 if (threadAccess.currentThreadAccess())
294 if (threadAccess instanceof BetterThreadAccess)
295 threadAccess = ((BetterThreadAccess) threadAccess).ta;
296 Runnable runnable = pair.runnable;
298 Event e = new Event(runnable, new EventListener() {
300 public void eventDone(Event e) {
304 // Check if target has WaitingThread. Use it if it does
305 WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
306 Thread waitingForThread = null;
307 if (targetWt != null)
309 // Check if it is allowed to use the target WT
310 if (isEventQueuingAllowed(senderThread, targetWt))
312 if (targetWt.addEvent(e)) {
314 waitingForThread = targetWt.thread;
315 e.setThread(waitingForThread);
322 if (waitingForThread == null) {
324 waitingForThread = threadAccess.asyncExec(e);
325 if (waitingForThread==null)
327 e.setThread(waitingForThread);
332 prevWt = setWaitingThread(senderThread, wt);
335 // Run local runnables
336 for (Executable pair : executions)
338 IThreadWorkQueue threadAccess = pair.threadAccess;
339 Runnable runnable = pair.runnable;
340 if (threadAccess.currentThreadAccess())
343 } catch (RuntimeException e) {
344 handleRunnableError(e);
348 // run errands in the mean time
349 wt.waitAndProcessEvents();
350 // Stop accepting errands
351 wt.stopAcceptingEvents();
352 removeWaitingThread(senderThread, prevWt);
354 wt.waitAndProcessEvents();
358 public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
360 if (threadAccess instanceof BetterThreadAccess)
361 threadAccess = ((BetterThreadAccess) threadAccess).ta;
363 final Thread senderThread = Thread.currentThread();
364 synchronized (ThreadUtils.class) {
366 Event e = new Event(runnable, null, null);
368 // Check if target has WaitingThread. Use it if it does
369 WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
370 if (targetWt != null)
372 // Check if it is allowed to use the target WT
373 if (isEventQueuingAllowed(senderThread, targetWt))
375 if (targetWt.addEvent(e))
376 return targetWt.thread;
379 return threadAccess.asyncExec(runnable);
383 private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set<Thread> visitedTargetThreads)
385 assert(targetThread!=null);
386 if (visitedTargetThreads.contains(targetThread))
388 visitedTargetThreads.add(targetThread);
389 if (sourceThread == targetThread) return false;
390 Set<Thread> waitsFor = getWaitsForThreads(targetThread);
391 if (waitsFor==null||waitsFor.isEmpty()) return false;
392 for (Thread aThreadTargetThreadWaitsFor : waitsFor)
394 if (aThreadTargetThreadWaitsFor==sourceThread) return true;
395 if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue;
396 if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads))
402 static boolean waitsFor(Thread sourceThread, Thread targetThread)
404 return _waitsFor(sourceThread, targetThread, new HashSet<Thread>(3));
407 static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue)
409 if (!eventQueue.acceptEvents) return false;
410 // queuing is allowed if target thread does wait for source thread
411 return waitsFor(sourceThread, eventQueue.thread);
414 private static void handleRunnableError(Throwable t)
419 interface EventListener {
420 void eventDone(Event e);
423 public static class Event implements Runnable {
428 public Event(Runnable r, EventListener l, Semaphore s) {
434 setThread(Thread.currentThread());
437 } catch (RuntimeException e) {
438 handleRunnableError(e);
446 public synchronized Thread getThread()
451 } catch (InterruptedException e) {
456 public synchronized void setThread(Thread t)
467 * WaitingThread is a thread that waits for something and in the meantime
470 static class WaitingThread
473 LinkedList<Event> queue = new LinkedList<Event>();
474 boolean acceptEvents = true;
475 Set<Event> waitingFor = new HashSet<Event>();
476 Set<Event> completed = new HashSet<Event>();
477 public WaitingThread(Thread thread) {
478 this.thread = thread;
480 public synchronized void waitFor(Event event)
482 assert(thread!=null);
483 waitingFor.add(event);
485 public synchronized void completed(Event event)
487 //assert(waitingFor.contains(event));
488 completed.add(event);
489 if (completed.size()==waitingFor.size())
492 synchronized boolean isEmpty() {
493 return queue.isEmpty();
495 synchronized boolean keepWaiting() {
496 if(waitingFor.size()!=completed.size())
498 assert(waitingFor.equals(completed));
501 public synchronized boolean addEvent(Event r)
503 if (!acceptEvents) return false;
509 static private int WAIT_MS = 10000;
510 static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS;
512 public void waitAndProcessEvents()
514 while (keepWaiting() || !isEmpty()) {
518 if (!queue.isEmpty())
520 if (e==null && keepWaiting())
522 long now = System.nanoTime();
524 long duration = System.nanoTime()-now;
525 if(duration > (WAIT_THRESHOLD_NS)) {
526 for(Thread t : getWaitingForThreads())
528 throw new IllegalStateException("Thread '" + thread + "' has died.");
530 } catch (InterruptedException e1) {
536 } catch (RuntimeException e1) {
537 e1.printStackTrace();
546 if (!queue.isEmpty())
552 } catch (RuntimeException e1) {
553 e1.printStackTrace();
558 public synchronized void stopAcceptingEvents()
560 acceptEvents = false;
562 public synchronized Set<Thread> getWaitingForThreads()
564 Set<Thread> result = new HashSet<Thread>(waitingFor.size());
565 for (Event e : waitingFor)
567 if (completed.contains(e)) continue;
568 result.add(e.getThread());
575 * Blocks until waiting thread has been set for thread t
577 * @return waiting thread
579 synchronized static WaitingThread getWaitingThreadSync(Thread t)
581 WaitingThread result;
584 if (result!=null) return result;
586 ThreadUtils.class.wait();
587 } catch (InterruptedException e) {
593 * Return an array which may have nulls
597 static Set<Thread> getWaitsForThreads(Thread t)
599 WaitingThread wt = getWaitingThread(t);
600 if (wt==null) return null;
601 return wt.getWaitingForThreads();
604 static synchronized WaitingThread getWaitingThread(Thread t)
609 static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt)
611 WaitingThread prev = map.put(t, wt);
612 ThreadUtils.class.notifyAll();
616 static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith)
618 assert(t == Thread.currentThread());
620 if (replaceWith!=null)
621 map.put(t, replaceWith);
624 static class BetterThreadAccess implements IThreadWorkQueue {
626 public BetterThreadAccess(IThreadWorkQueue ta)
628 if (ta instanceof BetterThreadAccess)
629 ta = ((BetterThreadAccess) ta).ta;
633 public Thread asyncExec(Runnable runnable) {
634 return ThreadUtils.asyncExec(ta, runnable);
637 public boolean currentThreadAccess() {
638 return ta.currentThreadAccess();
641 public Thread getThread() {
642 return ta.getThread();
645 public boolean syncExec(Runnable runnable) {
646 return ThreadUtils.syncExec(ta, runnable);
653 * Executes command in the executor while maintaining possibility to
654 * return back to caller.
659 public static void exec(Executor executor, final Runnable command)
661 final Thread[] calleeThread = new Thread[1];
662 final Thread callerThread = Thread.currentThread();
663 final Runnable wrappedCommand = new Runnable() {
666 calleeThread[0] = Thread.currentThread();
675 DEPENDENCIES.put(callerThread, calleeThread);
676 executor.execute(wrappedCommand);
677 DEPENDENCIES.remove(callerThread);
679 private static Map<Thread, Thread[]> DEPENDENCIES =
680 Collections.synchronizedMap( new HashMap<Thread, Thread[]>() );
681 @SuppressWarnings("unused")
682 private static boolean hasDependency(Thread waiter, Thread worker)
684 // Follow dependency chain until worker is reached
685 for (Thread t = waiter; t!=null;)
687 Thread[] potentialResult = DEPENDENCIES.get(t);
688 if (potentialResult==null) break;
689 t = potentialResult[0];
690 if (t==worker) return true;
696 * Lock multiple locks simultaneously. If all locks cannot be locked the
697 * thread sleeps a moment and tries again.
699 * If all locks cannot be locked at once, this method doesn't prevent
700 * other from locking them in the mean time.
702 * @param locks an array of locks. null values are ignored
704 public static void lock(Lock...locks) {
705 if (locks.length==0) return;
706 if (locks.length==1) {
713 for (;i<locks.length; i++) {
715 if (l==null) continue;
716 if ( !locks[i].tryLock() ) break;
718 if (i==locks.length) return;
719 for (int j=0; j<i; j++) {
721 if (l==null) continue;
724 try { NANOSECONDS.sleep(10000); } catch (InterruptedException e) {}
729 * Lock multiple locks simultaneously. If all locks cannot be locked the
730 * thread sleeps a moment and tries again.
732 * If all locks cannot be locked at once, this method doesn't prevent
733 * other from locking them in the mean time.
737 * @param locks1 an array of locks. null values are ignored
738 * @param locks2 an array of locks. null values are ignored
740 public static void lock2(Lock[] locks1, Lock[] locks2) {
741 int l1 = locks1.length;
742 int l2 = locks2.length;
744 if (l1==0 && l2==0) return;
748 // Attempt to lock everything in locks 1
751 if (l==null) continue;
752 if ( !locks1[i].tryLock() ) break;
754 // Attempt to lock everything in locks 2
758 if (l==null) continue;
759 if ( !locks2[i-l1].tryLock() ) break;
766 for (int j=l1; j<i; j++) {
767 Lock l = locks2[j-l1];
768 if (l==null) continue;
774 for (int j=0; j<i; j++) {
776 if (l==null) continue;
780 try { NANOSECONDS.sleep(1); } catch (InterruptedException e) {}
785 * Try to lock multiple locks simultaneously. If all locks cannot be locked the
786 * thread sleeps a moment and tries again.
788 * If all locks cannot be locked at once, this method doesn't prevent
789 * other from locking them in the mean time.
791 * @param locks an array of locks. null values are ignored
794 public static boolean tryLock(Lock...locks) {
795 if (locks.length==0) return true;
796 if (locks.length==1) {
797 return locks[0].tryLock();
801 for (;i<locks.length; i++) {
803 if (l==null) continue;
804 if ( !locks[i].tryLock() ) break;
806 if (i==locks.length) return true;
809 for (int j=0; j<i; j++) {
811 if (l==null) continue;
819 * Unlock multiple locks. Use this after locking multiple locks.
821 * @param locks an array of locks. Null values are ignored
823 public static void unlock(Lock...locks) {
824 for (Lock lock : locks) {
825 if (lock != null) lock.unlock();
830 * Unlock multiple locks. Use this after locking multiple locks.
832 * @param locks1 an array of locks. Null values are ignored
833 * @param locsk2 an array of locks. Null values are ignored
835 public static void unlock2(Lock[] locks1, Lock[] locks2) {
836 for (Lock lock : locks1) {
837 if (lock != null) lock.unlock();
839 for (Lock lock : locks2) {
840 if (lock != null) lock.unlock();
845 * Join multiple arrays into a single array
850 public static Lock[] appendLockArrays(Lock[]...lockArrays) {
852 for (Lock[] array : lockArrays) len +=array.length;
853 Lock[] result = new Lock[ len ];
855 for (Lock[] array : lockArrays) {
856 System.arraycopy(array, 0, result, i, array.length);
862 public static synchronized void shutdown() {
864 //System.out.println("TIMERS");
865 shutdownAndAwaitTermination(TIMER, 1000);
868 if (NON_BLOCKING_EXECUTOR != null) {
869 //System.out.println("NON_BLOCKING");
870 shutdownAndAwaitTermination(NON_BLOCKING_EXECUTOR, 1000);
871 NON_BLOCKING_EXECUTOR = null;
873 if (BLOCKING_EXECUTOR != null) {
874 //System.out.println("BLOCKING");
875 shutdownAndAwaitTermination(BLOCKING_EXECUTOR, 1000);
876 BLOCKING_EXECUTOR = null;
881 * Grabbed from {@link ExecutorService} javadoc.
883 * @param pool {@link ExecutorService} to shut down
885 private static void shutdownAndAwaitTermination(ExecutorService pool, long timeoutMs) {
886 //long t = System.currentTimeMillis();
887 pool.shutdown(); // Disable new tasks from being submitted
889 // Wait a while for existing tasks to terminate
890 if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
891 List<Runnable> leftovers = pool.shutdownNow(); // Cancel currently executing tasks
892 if (!leftovers.isEmpty())
893 LOGGER.warn("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers);
894 // Wait a while for tasks to respond to being cancelled
895 if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS))
896 LOGGER.warn("Thread pool '" + pool.toString() + "' did not terminate");
898 } catch (InterruptedException ie) {
899 // (Re-)Cancel if current thread also interrupted
901 // Preserve interrupt status
902 Thread.currentThread().interrupt();
904 //long e = System.currentTimeMillis();
905 //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms");
908 @SuppressWarnings("unused")
909 private static void uncheckedAwaitTermination(ExecutorService service, long time) {
911 NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS);
912 } catch (InterruptedException e) {
918 // Executor that runs in current thread
919 public static Executor CURRENT_THREAD = new CurrentThreadExecutor();
921 // Async executor queues the command into AWT event queue
922 public static ExecutorService AWT_EDT = AWTThread.INSTANCE;
924 // Sync executor blocks the call until the command is finished
925 public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync();
929 class AWTExecutorSync extends AbstractExecutorService {
932 public void execute(Runnable command) {
933 if (EventQueue.isDispatchThread())
938 EventQueue.invokeAndWait(command);
939 } catch (InterruptedException e) {
940 throw new RuntimeException(e);
941 } catch (InvocationTargetException e) {
942 throw new RuntimeException(e.getCause());
948 public void shutdown() {
952 public List<Runnable> shutdownNow() {
957 public boolean isShutdown() {
962 public boolean isTerminated() {
967 public boolean awaitTermination(long timeout, TimeUnit unit)
968 throws InterruptedException {
972 public static String getStackTrace(int levels){
973 StringBuilder sb = new StringBuilder();
974 sb.append( Thread.currentThread() );
976 Exception e3 = new Exception();
977 try { throw e3; } catch(Exception e2) {
979 for (int i=1; i<Math.max(e2.getStackTrace().length, levels+1); i++) {
980 StackTraceElement ste = e2.getStackTrace()[i];
986 return sb.toString();
990 class CurrentThreadExecutor implements Executor {
992 public void execute(Runnable command) {