/*******************************************************************************
* Copyright (c) 2007, 2010 Association for Decentralized Information Management
* in Industry THTH ry.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* VTT Technical Research Centre of Finland - initial API and implementation
*******************************************************************************/
/*
*
* @author Toni Kalajainen
*/
package org.simantics.utils.threads;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.awt.EventQueue;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility for switching threads
*
* @see Executors2
* @see IThreadWorkQueue
*/
public class ThreadUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadUtils.class);
public static final int CORES = Runtime.getRuntime().availableProcessors();
/**
* Defines the maximum thread pool size of
* {@link #getBlockingWorkExecutor()}. The system will enforce this value to
* be at least Math.max(CORES, 8). It can be configured through the system
* property simantics.executor.blockingMaxThreads.
*/
public static final int MAX_BLOCKING_EXECUTOR_THREADS;
static {
String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES);
int blockingMaxThreads = CORES;
try {
blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp);
} catch (NumberFormatException e) {
}
MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES);
}
/** Never acquire non-blocking executor from this field, private use */
public static ScheduledExecutorService NON_BLOCKING_EXECUTOR;
/** Never acquire non-blocking executor from this field, private use */
public static ExecutorService BLOCKING_EXECUTOR;
static ScheduledExecutorService TIMER;
/**
* Get an {@link Executor} and {@link IThreadWorkQueue} for current thread
*
* @return an {@link Executor} and {@link IThreadWorkQueue} for current thread
*/
public static CurrentThread getCurrentThread() {
return CurrentThread.INSTANCE;
}
/**
* Get a timer executor. Never add blocking or long-wait runnables to this scheduler
*
* @return Executor that executes a non-blocking work
*/
public static synchronized ScheduledExecutorService getTimer() {
if (TIMER == null) {
final ThreadGroup tg = new ThreadGroup("Timer");
final AtomicInteger counter = new AtomicInteger(0);
ThreadFactory tf = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet()));
if (!t.isDaemon())
t.setDaemon(true);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
};
TIMER = new ScheduledThreadPoolExecutor( 1, tf );
}
return TIMER;
}
/**
* Get Executor for work that doesn't lock or block. A non-blocking runnable never locks
* anything, no semaphores, no synchronized() {} blocks. no Object.wait().
*
* @return Executor that executes a non-blocking work
*/
public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() {
if (NON_BLOCKING_EXECUTOR == null) {
final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group");
final AtomicInteger counter = new AtomicInteger(0);
ThreadFactory tf = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet()));
if (!t.isDaemon())
t.setDaemon(true);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
};
NON_BLOCKING_EXECUTOR =
new ScheduledThreadPoolExecutor( CORES, tf );
}
return NON_BLOCKING_EXECUTOR;
}
/**
* Get Executor executes work that may lock or block.
*
* @return executor for blocking operations
*/
public static synchronized ExecutorService getBlockingWorkExecutor() {
if (BLOCKING_EXECUTOR == null) {
final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group");
final AtomicInteger counter = new AtomicInteger(0);
ThreadFactory tf = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet()));
if (!t.isDaemon())
t.setDaemon(true);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
};
BLOCKING_EXECUTOR =
// new ThreadPoolExecutor(
// 0,
// MAX_BLOCKING_EXECUTOR_THREADS,
// 3L, TimeUnit.SECONDS,
// new SynchronousQueue(),
// tf);
new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf);
}
return BLOCKING_EXECUTOR;
}
/**
* Better thread access allows thread context switching back to waiting
* threads.
*
* @param access
* @return an enhanced version of the specified queue
*/
public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access)
{
if (access instanceof BetterThreadAccess)
return access;
return new BetterThreadAccess(access);
}
static Map map =
new HashMap();
/**
* Executes a runnable in thread synchronously.
* If the thread locked and waits for this thread, then
* the runnable is ran in that thread.
*
* This works only if all thread switching is done
* using this method.
*
* @param threadAccess
* @param runnable
* @return true if thread accepted the runnable
*/
public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
{
if (threadAccess instanceof BetterThreadAccess)
threadAccess = ((BetterThreadAccess) threadAccess).ta;
// Using current thread
if (threadAccess.currentThreadAccess())
{
try {
runnable.run();
} catch (RuntimeException e) {
handleRunnableError(e);
}
return true;
}
final Thread senderThread = Thread.currentThread();
final WaitingThread wt = new WaitingThread(senderThread);
WaitingThread prevWt;
Event e = new Event(runnable, new EventListener() {
@Override
public void eventDone(Event e) {
wt.completed(e);
}}, null);
synchronized (ThreadUtils.class) {
// Check if target has WaitingThread. Use it if it does
WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
Thread waitingForThread = null;
if (targetWt != null)
{
// Check if it is allowed to use the target WT
if (isEventQueuingAllowed(senderThread, targetWt))
{
if (targetWt.addEvent(e)) {
synchronized(wt) {
waitingForThread = targetWt.thread;
e.setThread(waitingForThread);
wt.waitFor(e);
}
}
}
}
if (waitingForThread == null) {
synchronized(wt) {
waitingForThread = threadAccess.asyncExec(e);
if (waitingForThread==null) return false;
e.setThread(waitingForThread);
wt.waitFor(e);
}
}
prevWt = setWaitingThread(senderThread, wt);
}
// run errands in the mean time
wt.waitAndProcessEvents();
wt.stopAcceptingEvents();
// Stop accepting errands
removeWaitingThread(senderThread, prevWt);
// Run last events
wt.waitAndProcessEvents();
return true;
}
public static boolean multiSyncExec(Collection executions)
{
if (executions.isEmpty()) return true;
return multiSyncExec(executions.toArray(new Executable[executions.size()]));
}
/**
* Executes a list of executables in multiple threads and waits for all to complete.
*
* @param executions
* @return true
once execution has completed (is this necessary?)
*/
public static boolean multiSyncExec(Executable ... executions)
{
if (executions.length==0) return true;
if (executions.length==1) {
return syncExec(executions[0].threadAccess, executions[0].runnable);
}
final Thread senderThread = Thread.currentThread();
final WaitingThread wt = new WaitingThread(senderThread);
WaitingThread prevWt = null;
synchronized (ThreadUtils.class) {
for (Executable pair : executions)
{
IThreadWorkQueue threadAccess = pair.threadAccess;
if (threadAccess.currentThreadAccess())
continue;
if (threadAccess instanceof BetterThreadAccess)
threadAccess = ((BetterThreadAccess) threadAccess).ta;
Runnable runnable = pair.runnable;
Event e = new Event(runnable, new EventListener() {
@Override
public void eventDone(Event e) {
wt.completed(e);
}}, null);
// Check if target has WaitingThread. Use it if it does
WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
Thread waitingForThread = null;
if (targetWt != null)
{
// Check if it is allowed to use the target WT
if (isEventQueuingAllowed(senderThread, targetWt))
{
if (targetWt.addEvent(e)) {
synchronized(wt) {
waitingForThread = targetWt.thread;
e.setThread(waitingForThread);
wt.waitFor(e);
}
}
}
}
if (waitingForThread == null) {
synchronized(wt) {
waitingForThread = threadAccess.asyncExec(e);
if (waitingForThread==null)
return false;
e.setThread(waitingForThread);
wt.waitFor(e);
}
}
}
prevWt = setWaitingThread(senderThread, wt);
}
// Run local runnables
for (Executable pair : executions)
{
IThreadWorkQueue threadAccess = pair.threadAccess;
Runnable runnable = pair.runnable;
if (threadAccess.currentThreadAccess())
try {
runnable.run();
} catch (RuntimeException e) {
handleRunnableError(e);
}
}
// run errands in the mean time
wt.waitAndProcessEvents();
// Stop accepting errands
wt.stopAcceptingEvents();
removeWaitingThread(senderThread, prevWt);
// Run last events
wt.waitAndProcessEvents();
return true;
}
public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
{
if (threadAccess instanceof BetterThreadAccess)
threadAccess = ((BetterThreadAccess) threadAccess).ta;
final Thread senderThread = Thread.currentThread();
synchronized (ThreadUtils.class) {
Event e = new Event(runnable, null, null);
// Check if target has WaitingThread. Use it if it does
WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
if (targetWt != null)
{
// Check if it is allowed to use the target WT
if (isEventQueuingAllowed(senderThread, targetWt))
{
if (targetWt.addEvent(e))
return targetWt.thread;
}
}
return threadAccess.asyncExec(runnable);
}
}
private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set visitedTargetThreads)
{
assert(targetThread!=null);
if (visitedTargetThreads.contains(targetThread))
return false;
visitedTargetThreads.add(targetThread);
if (sourceThread == targetThread) return false;
Set waitsFor = getWaitsForThreads(targetThread);
if (waitsFor==null||waitsFor.isEmpty()) return false;
for (Thread aThreadTargetThreadWaitsFor : waitsFor)
{
if (aThreadTargetThreadWaitsFor==sourceThread) return true;
if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue;
if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads))
return true;
}
return false;
}
static boolean waitsFor(Thread sourceThread, Thread targetThread)
{
return _waitsFor(sourceThread, targetThread, new HashSet(3));
}
static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue)
{
if (!eventQueue.acceptEvents) return false;
// queuing is allowed if target thread does wait for source thread
return waitsFor(sourceThread, eventQueue.thread);
}
private static void handleRunnableError(Throwable t)
{
t.printStackTrace();
}
interface EventListener {
void eventDone(Event e);
}
public static class Event implements Runnable {
Runnable r;
EventListener l;
Semaphore s;
Thread t;
public Event(Runnable r, EventListener l, Semaphore s) {
this.r = r;
this.l = l;
this.s = s;
}
public void run() {
setThread(Thread.currentThread());
try {
r.run();
} catch (RuntimeException e) {
handleRunnableError(e);
} finally {
if (s!=null)
s.release(1);
if (l!=null)
l.eventDone(this);
}
}
public synchronized Thread getThread()
{
while (t==null) {
try {
t.wait();
} catch (InterruptedException e) {
}
}
return t;
}
public synchronized void setThread(Thread t)
{
assert(t!=null);
if (this.t!=null)
assert(this.t==t);
this.t = t;
notify();
}
}
/**
* WaitingThread is a thread that waits for something and in the meantime
* runs errands.
*/
static class WaitingThread
{
final Thread thread;
LinkedList queue = new LinkedList();
boolean acceptEvents = true;
Set waitingFor = new HashSet();
Set completed = new HashSet();
public WaitingThread(Thread thread) {
this.thread = thread;
}
public synchronized void waitFor(Event event)
{
assert(thread!=null);
waitingFor.add(event);
}
public synchronized void completed(Event event)
{
//assert(waitingFor.contains(event));
completed.add(event);
if (completed.size()==waitingFor.size())
notify();
}
synchronized boolean isEmpty() {
return queue.isEmpty();
}
synchronized boolean keepWaiting() {
if(waitingFor.size()!=completed.size())
return true;
assert(waitingFor.equals(completed));
return false;
}
public synchronized boolean addEvent(Event r)
{
if (!acceptEvents) return false;
queue.add(r);
notify();
return true;
}
static private int WAIT_MS = 10000;
static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS;
public void waitAndProcessEvents()
{
while (keepWaiting() || !isEmpty()) {
Event e = null;
synchronized(this)
{
if (!queue.isEmpty())
e = queue.pop();
if (e==null && keepWaiting())
try {
long now = System.nanoTime();
wait(WAIT_MS);
long duration = System.nanoTime()-now;
if(duration > (WAIT_THRESHOLD_NS)) {
for(Thread t : getWaitingForThreads())
if(!t.isAlive())
throw new IllegalStateException("Thread '" + thread + "' has died.");
}
} catch (InterruptedException e1) {
}
}
if (e!=null) {
try {
e.run();
} catch (RuntimeException e1) {
e1.printStackTrace();
}
}
}
while (!isEmpty())
{
Event e = null;
synchronized(this)
{
if (!queue.isEmpty())
e = queue.pop();
}
if (e!=null)
try {
e.run();
} catch (RuntimeException e1) {
e1.printStackTrace();
}
}
}
public synchronized void stopAcceptingEvents()
{
acceptEvents = false;
}
public synchronized Set getWaitingForThreads()
{
Set result = new HashSet(waitingFor.size());
for (Event e : waitingFor)
{
if (completed.contains(e)) continue;
result.add(e.getThread());
}
return result;
}
}
/**
* Blocks until waiting thread has been set for thread t
* @param t thread
* @return waiting thread
*/
synchronized static WaitingThread getWaitingThreadSync(Thread t)
{
WaitingThread result;
do {
result = map.get(t);
if (result!=null) return result;
try {
ThreadUtils.class.wait();
} catch (InterruptedException e) {
}
} while (true);
}
/**
* Return an array which may have nulls
* @param t
* @return
*/
static Set getWaitsForThreads(Thread t)
{
WaitingThread wt = getWaitingThread(t);
if (wt==null) return null;
return wt.getWaitingForThreads();
}
static synchronized WaitingThread getWaitingThread(Thread t)
{
return map.get(t);
}
static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt)
{
WaitingThread prev = map.put(t, wt);
ThreadUtils.class.notifyAll();
return prev;
}
static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith)
{
assert(t == Thread.currentThread());
map.remove(t);
if (replaceWith!=null)
map.put(t, replaceWith);
}
static class BetterThreadAccess implements IThreadWorkQueue {
IThreadWorkQueue ta;
public BetterThreadAccess(IThreadWorkQueue ta)
{
if (ta instanceof BetterThreadAccess)
ta = ((BetterThreadAccess) ta).ta;
this.ta = ta;
}
@Override
public Thread asyncExec(Runnable runnable) {
return ThreadUtils.asyncExec(ta, runnable);
}
@Override
public boolean currentThreadAccess() {
return ta.currentThreadAccess();
}
@Override
public Thread getThread() {
return ta.getThread();
}
@Override
public boolean syncExec(Runnable runnable) {
return ThreadUtils.syncExec(ta, runnable);
}
}
/**
* Executes command in the executor while maintaining possibility to
* return back to caller.
*
* @param executor
* @param command
*/
public static void exec(Executor executor, final Runnable command)
{
final Thread[] calleeThread = new Thread[1];
final Thread callerThread = Thread.currentThread();
final Runnable wrappedCommand = new Runnable() {
@Override
public void run() {
calleeThread[0] = Thread.currentThread();
try {
command.run();
} finally {
}
}
};
// add dependency
DEPENDENCIES.put(callerThread, calleeThread);
executor.execute(wrappedCommand);
DEPENDENCIES.remove(callerThread);
}
private static Map DEPENDENCIES =
Collections.synchronizedMap( new HashMap() );
@SuppressWarnings("unused")
private static boolean hasDependency(Thread waiter, Thread worker)
{
// Follow dependency chain until worker is reached
for (Thread t = waiter; t!=null;)
{
Thread[] potentialResult = DEPENDENCIES.get(t);
if (potentialResult==null) break;
t = potentialResult[0];
if (t==worker) return true;
}
return false;
}
/**
* Lock multiple locks simultaneously. If all locks cannot be locked the
* thread sleeps a moment and tries again.
*
* If all locks cannot be locked at once, this method doesn't prevent
* other from locking them in the mean time.
*
* @param locks an array of locks. null values are ignored
*/
public static void lock(Lock...locks) {
if (locks.length==0) return;
if (locks.length==1) {
locks[0].lock();
return;
}
while (true) {
int i = 0;
for (;il1) {
for (int j=l1; j0) {
for (int j=0; j leftovers = pool.shutdownNow(); // Cancel currently executing tasks
if (!leftovers.isEmpty())
LOGGER.warn("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers);
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS))
LOGGER.warn("Thread pool '" + pool.toString() + "' did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
//long e = System.currentTimeMillis();
//System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms");
}
@SuppressWarnings("unused")
private static void uncheckedAwaitTermination(ExecutorService service, long time) {
try {
NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
// Executor support
// Executor that runs in current thread
public static Executor CURRENT_THREAD = new CurrentThreadExecutor();
// Async executor queues the command into AWT event queue
public static ExecutorService AWT_EDT = AWTThread.INSTANCE;
// Sync executor blocks the call until the command is finished
public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync();
}
class AWTExecutorSync extends AbstractExecutorService {
@Override
public void execute(Runnable command) {
if (EventQueue.isDispatchThread())
{
command.run();
} else {
try {
EventQueue.invokeAndWait(command);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e.getCause());
}
}
}
@Override
public void shutdown() {
}
@Override
public List shutdownNow() {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return false;
}
public static String getStackTrace(int levels){
StringBuilder sb = new StringBuilder();
sb.append( Thread.currentThread() );
sb.append( "\n" );
Exception e3 = new Exception();
try { throw e3; } catch(Exception e2) {
for (int i=1; i