/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ /* * * @author Toni Kalajainen */ package org.simantics.utils.threads; import static java.util.concurrent.TimeUnit.NANOSECONDS; import java.awt.EventQueue; import java.lang.reflect.InvocationTargetException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Utility for switching threads * * @see Executors2 * @see IThreadWorkQueue */ public class ThreadUtils { private static final Logger LOGGER = LoggerFactory.getLogger(ThreadUtils.class); public static final int CORES = Runtime.getRuntime().availableProcessors(); /** * Defines the maximum thread pool size of * {@link #getBlockingWorkExecutor()}. The system will enforce this value to * be at least Math.max(CORES, 8). It can be configured through the system * property simantics.executor.blockingMaxThreads. */ public static final int MAX_BLOCKING_EXECUTOR_THREADS; static { String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES); int blockingMaxThreads = CORES; try { blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp); } catch (NumberFormatException e) { } MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES); } /** Never acquire non-blocking executor from this field, private use */ public static ScheduledExecutorService NON_BLOCKING_EXECUTOR; /** Never acquire non-blocking executor from this field, private use */ public static ExecutorService BLOCKING_EXECUTOR; static ScheduledExecutorService TIMER; /** * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread * * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread */ public static CurrentThread getCurrentThread() { return CurrentThread.INSTANCE; } /** * Get a timer executor. Never add blocking or long-wait runnables to this scheduler * * @return Executor that executes a non-blocking work */ public static synchronized ScheduledExecutorService getTimer() { if (TIMER == null) { final ThreadGroup tg = new ThreadGroup("Timer"); final AtomicInteger counter = new AtomicInteger(0); ThreadFactory tf = new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet())); if (!t.isDaemon()) t.setDaemon(true); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }; TIMER = new ScheduledThreadPoolExecutor( 1, tf ); } return TIMER; } /** * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks * anything, no semaphores, no synchronized() {} blocks. no Object.wait(). * * @return Executor that executes a non-blocking work */ public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() { if (NON_BLOCKING_EXECUTOR == null) { final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group"); final AtomicInteger counter = new AtomicInteger(0); ThreadFactory tf = new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet())); if (!t.isDaemon()) t.setDaemon(true); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }; NON_BLOCKING_EXECUTOR = new ScheduledThreadPoolExecutor( CORES, tf ); } return NON_BLOCKING_EXECUTOR; } /** * Get Executor executes work that may lock or block. * * @return executor for blocking operations */ public static synchronized ExecutorService getBlockingWorkExecutor() { if (BLOCKING_EXECUTOR == null) { final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group"); final AtomicInteger counter = new AtomicInteger(0); ThreadFactory tf = new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet())); if (!t.isDaemon()) t.setDaemon(true); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }; BLOCKING_EXECUTOR = // new ThreadPoolExecutor( // 0, // MAX_BLOCKING_EXECUTOR_THREADS, // 3L, TimeUnit.SECONDS, // new SynchronousQueue(), // tf); new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf); } return BLOCKING_EXECUTOR; } /** * Better thread access allows thread context switching back to waiting * threads. * * @param access * @return an enhanced version of the specified queue */ public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access) { if (access instanceof BetterThreadAccess) return access; return new BetterThreadAccess(access); } static Map map = new HashMap(); /** * Executes a runnable in thread synchronously. * If the thread locked and waits for this thread, then * the runnable is ran in that thread. * * This works only if all thread switching is done * using this method. * * @param threadAccess * @param runnable * @return true if thread accepted the runnable */ public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable) { if (threadAccess instanceof BetterThreadAccess) threadAccess = ((BetterThreadAccess) threadAccess).ta; // Using current thread if (threadAccess.currentThreadAccess()) { try { runnable.run(); } catch (RuntimeException e) { handleRunnableError(e); } return true; } final Thread senderThread = Thread.currentThread(); final WaitingThread wt = new WaitingThread(senderThread); WaitingThread prevWt; Event e = new Event(runnable, new EventListener() { @Override public void eventDone(Event e) { wt.completed(e); }}, null); synchronized (ThreadUtils.class) { // Check if target has WaitingThread. Use it if it does WaitingThread targetWt = getWaitingThread(threadAccess.getThread()); Thread waitingForThread = null; if (targetWt != null) { // Check if it is allowed to use the target WT if (isEventQueuingAllowed(senderThread, targetWt)) { if (targetWt.addEvent(e)) { synchronized(wt) { waitingForThread = targetWt.thread; e.setThread(waitingForThread); wt.waitFor(e); } } } } if (waitingForThread == null) { synchronized(wt) { waitingForThread = threadAccess.asyncExec(e); if (waitingForThread==null) return false; e.setThread(waitingForThread); wt.waitFor(e); } } prevWt = setWaitingThread(senderThread, wt); } // run errands in the mean time wt.waitAndProcessEvents(); wt.stopAcceptingEvents(); // Stop accepting errands removeWaitingThread(senderThread, prevWt); // Run last events wt.waitAndProcessEvents(); return true; } public static boolean multiSyncExec(Collection executions) { if (executions.isEmpty()) return true; return multiSyncExec(executions.toArray(new Executable[executions.size()])); } /** * Executes a list of executables in multiple threads and waits for all to complete. * * @param executions * @return true once execution has completed (is this necessary?) */ public static boolean multiSyncExec(Executable ... executions) { if (executions.length==0) return true; if (executions.length==1) { return syncExec(executions[0].threadAccess, executions[0].runnable); } final Thread senderThread = Thread.currentThread(); final WaitingThread wt = new WaitingThread(senderThread); WaitingThread prevWt = null; synchronized (ThreadUtils.class) { for (Executable pair : executions) { IThreadWorkQueue threadAccess = pair.threadAccess; if (threadAccess.currentThreadAccess()) continue; if (threadAccess instanceof BetterThreadAccess) threadAccess = ((BetterThreadAccess) threadAccess).ta; Runnable runnable = pair.runnable; Event e = new Event(runnable, new EventListener() { @Override public void eventDone(Event e) { wt.completed(e); }}, null); // Check if target has WaitingThread. Use it if it does WaitingThread targetWt = getWaitingThread(threadAccess.getThread()); Thread waitingForThread = null; if (targetWt != null) { // Check if it is allowed to use the target WT if (isEventQueuingAllowed(senderThread, targetWt)) { if (targetWt.addEvent(e)) { synchronized(wt) { waitingForThread = targetWt.thread; e.setThread(waitingForThread); wt.waitFor(e); } } } } if (waitingForThread == null) { synchronized(wt) { waitingForThread = threadAccess.asyncExec(e); if (waitingForThread==null) return false; e.setThread(waitingForThread); wt.waitFor(e); } } } prevWt = setWaitingThread(senderThread, wt); } // Run local runnables for (Executable pair : executions) { IThreadWorkQueue threadAccess = pair.threadAccess; Runnable runnable = pair.runnable; if (threadAccess.currentThreadAccess()) try { runnable.run(); } catch (RuntimeException e) { handleRunnableError(e); } } // run errands in the mean time wt.waitAndProcessEvents(); // Stop accepting errands wt.stopAcceptingEvents(); removeWaitingThread(senderThread, prevWt); // Run last events wt.waitAndProcessEvents(); return true; } public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable) { if (threadAccess instanceof BetterThreadAccess) threadAccess = ((BetterThreadAccess) threadAccess).ta; final Thread senderThread = Thread.currentThread(); synchronized (ThreadUtils.class) { Event e = new Event(runnable, null, null); // Check if target has WaitingThread. Use it if it does WaitingThread targetWt = getWaitingThread(threadAccess.getThread()); if (targetWt != null) { // Check if it is allowed to use the target WT if (isEventQueuingAllowed(senderThread, targetWt)) { if (targetWt.addEvent(e)) return targetWt.thread; } } return threadAccess.asyncExec(runnable); } } private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set visitedTargetThreads) { assert(targetThread!=null); if (visitedTargetThreads.contains(targetThread)) return false; visitedTargetThreads.add(targetThread); if (sourceThread == targetThread) return false; Set waitsFor = getWaitsForThreads(targetThread); if (waitsFor==null||waitsFor.isEmpty()) return false; for (Thread aThreadTargetThreadWaitsFor : waitsFor) { if (aThreadTargetThreadWaitsFor==sourceThread) return true; if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue; if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads)) return true; } return false; } static boolean waitsFor(Thread sourceThread, Thread targetThread) { return _waitsFor(sourceThread, targetThread, new HashSet(3)); } static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue) { if (!eventQueue.acceptEvents) return false; // queuing is allowed if target thread does wait for source thread return waitsFor(sourceThread, eventQueue.thread); } private static void handleRunnableError(Throwable t) { t.printStackTrace(); } interface EventListener { void eventDone(Event e); } public static class Event implements Runnable { Runnable r; EventListener l; Semaphore s; Thread t; public Event(Runnable r, EventListener l, Semaphore s) { this.r = r; this.l = l; this.s = s; } public void run() { setThread(Thread.currentThread()); try { r.run(); } catch (RuntimeException e) { handleRunnableError(e); } finally { if (s!=null) s.release(1); if (l!=null) l.eventDone(this); } } public synchronized Thread getThread() { while (t==null) { try { t.wait(); } catch (InterruptedException e) { } } return t; } public synchronized void setThread(Thread t) { assert(t!=null); if (this.t!=null) assert(this.t==t); this.t = t; notify(); } } /** * WaitingThread is a thread that waits for something and in the meantime * runs errands. */ static class WaitingThread { final Thread thread; LinkedList queue = new LinkedList(); boolean acceptEvents = true; Set waitingFor = new HashSet(); Set completed = new HashSet(); public WaitingThread(Thread thread) { this.thread = thread; } public synchronized void waitFor(Event event) { assert(thread!=null); waitingFor.add(event); } public synchronized void completed(Event event) { //assert(waitingFor.contains(event)); completed.add(event); if (completed.size()==waitingFor.size()) notify(); } synchronized boolean isEmpty() { return queue.isEmpty(); } synchronized boolean keepWaiting() { if(waitingFor.size()!=completed.size()) return true; assert(waitingFor.equals(completed)); return false; } public synchronized boolean addEvent(Event r) { if (!acceptEvents) return false; queue.add(r); notify(); return true; } static private int WAIT_MS = 10000; static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS; public void waitAndProcessEvents() { while (keepWaiting() || !isEmpty()) { Event e = null; synchronized(this) { if (!queue.isEmpty()) e = queue.pop(); if (e==null && keepWaiting()) try { long now = System.nanoTime(); wait(WAIT_MS); long duration = System.nanoTime()-now; if(duration > (WAIT_THRESHOLD_NS)) { for(Thread t : getWaitingForThreads()) if(!t.isAlive()) throw new IllegalStateException("Thread '" + thread + "' has died."); } } catch (InterruptedException e1) { } } if (e!=null) { try { e.run(); } catch (RuntimeException e1) { e1.printStackTrace(); } } } while (!isEmpty()) { Event e = null; synchronized(this) { if (!queue.isEmpty()) e = queue.pop(); } if (e!=null) try { e.run(); } catch (RuntimeException e1) { e1.printStackTrace(); } } } public synchronized void stopAcceptingEvents() { acceptEvents = false; } public synchronized Set getWaitingForThreads() { Set result = new HashSet(waitingFor.size()); for (Event e : waitingFor) { if (completed.contains(e)) continue; result.add(e.getThread()); } return result; } } /** * Blocks until waiting thread has been set for thread t * @param t thread * @return waiting thread */ synchronized static WaitingThread getWaitingThreadSync(Thread t) { WaitingThread result; do { result = map.get(t); if (result!=null) return result; try { ThreadUtils.class.wait(); } catch (InterruptedException e) { } } while (true); } /** * Return an array which may have nulls * @param t * @return */ static Set getWaitsForThreads(Thread t) { WaitingThread wt = getWaitingThread(t); if (wt==null) return null; return wt.getWaitingForThreads(); } static synchronized WaitingThread getWaitingThread(Thread t) { return map.get(t); } static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt) { WaitingThread prev = map.put(t, wt); ThreadUtils.class.notifyAll(); return prev; } static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith) { assert(t == Thread.currentThread()); map.remove(t); if (replaceWith!=null) map.put(t, replaceWith); } static class BetterThreadAccess implements IThreadWorkQueue { IThreadWorkQueue ta; public BetterThreadAccess(IThreadWorkQueue ta) { if (ta instanceof BetterThreadAccess) ta = ((BetterThreadAccess) ta).ta; this.ta = ta; } @Override public Thread asyncExec(Runnable runnable) { return ThreadUtils.asyncExec(ta, runnable); } @Override public boolean currentThreadAccess() { return ta.currentThreadAccess(); } @Override public Thread getThread() { return ta.getThread(); } @Override public boolean syncExec(Runnable runnable) { return ThreadUtils.syncExec(ta, runnable); } } /** * Executes command in the executor while maintaining possibility to * return back to caller. * * @param executor * @param command */ public static void exec(Executor executor, final Runnable command) { final Thread[] calleeThread = new Thread[1]; final Thread callerThread = Thread.currentThread(); final Runnable wrappedCommand = new Runnable() { @Override public void run() { calleeThread[0] = Thread.currentThread(); try { command.run(); } finally { } } }; // add dependency DEPENDENCIES.put(callerThread, calleeThread); executor.execute(wrappedCommand); DEPENDENCIES.remove(callerThread); } private static Map DEPENDENCIES = Collections.synchronizedMap( new HashMap() ); @SuppressWarnings("unused") private static boolean hasDependency(Thread waiter, Thread worker) { // Follow dependency chain until worker is reached for (Thread t = waiter; t!=null;) { Thread[] potentialResult = DEPENDENCIES.get(t); if (potentialResult==null) break; t = potentialResult[0]; if (t==worker) return true; } return false; } /** * Lock multiple locks simultaneously. If all locks cannot be locked the * thread sleeps a moment and tries again. * * If all locks cannot be locked at once, this method doesn't prevent * other from locking them in the mean time. * * @param locks an array of locks. null values are ignored */ public static void lock(Lock...locks) { if (locks.length==0) return; if (locks.length==1) { locks[0].lock(); return; } while (true) { int i = 0; for (;il1) { for (int j=l1; j0) { for (int j=0; j leftovers = pool.shutdownNow(); // Cancel currently executing tasks if (!leftovers.isEmpty()) LOGGER.warn("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers); // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) LOGGER.warn("Thread pool '" + pool.toString() + "' did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } //long e = System.currentTimeMillis(); //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms"); } @SuppressWarnings("unused") private static void uncheckedAwaitTermination(ExecutorService service, long time) { try { NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } // Executor support // Executor that runs in current thread public static Executor CURRENT_THREAD = new CurrentThreadExecutor(); // Async executor queues the command into AWT event queue public static ExecutorService AWT_EDT = AWTThread.INSTANCE; // Sync executor blocks the call until the command is finished public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync(); } class AWTExecutorSync extends AbstractExecutorService { @Override public void execute(Runnable command) { if (EventQueue.isDispatchThread()) { command.run(); } else { try { EventQueue.invokeAndWait(command); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (InvocationTargetException e) { throw new RuntimeException(e.getCause()); } } } @Override public void shutdown() { } @Override public List shutdownNow() { return null; } @Override public boolean isShutdown() { return false; } @Override public boolean isTerminated() { return false; } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return false; } public static String getStackTrace(int levels){ StringBuilder sb = new StringBuilder(); sb.append( Thread.currentThread() ); sb.append( "\n" ); Exception e3 = new Exception(); try { throw e3; } catch(Exception e2) { for (int i=1; i