X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.utils.thread%2Fsrc%2Forg%2Fsimantics%2Futils%2Fthreads%2FThreadUtils.java;h=15e818416bbd0d044ddcd05c115f21019409b88b;hb=refs%2Fchanges%2F65%2F1865%2F2;hp=2efc1e56287cdb3fc4a4b7044b5a11a2ca2754b3;hpb=75785e1454fbc60973848c72e2ed698d2cda5ce7;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ThreadUtils.java b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ThreadUtils.java index 2efc1e562..15e818416 100644 --- a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ThreadUtils.java +++ b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ThreadUtils.java @@ -1,991 +1,995 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -/* - * - * @author Toni Kalajainen - */ -package org.simantics.utils.threads; - -import static java.util.concurrent.TimeUnit.NANOSECONDS; - -import java.awt.EventQueue; -import java.lang.reflect.InvocationTargetException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.AbstractExecutorService; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; - -/** - * Utility for switching threads - * - * @see Executors2 - * @see IThreadWorkQueue - */ -public class ThreadUtils { - - public static final int CORES = Runtime.getRuntime().availableProcessors(); - - /** - * Defines the maximum thread pool size of - * {@link #getBlockingWorkExecutor()}. The system will enforce this value to - * be at least Math.max(CORES, 8). It can be configured through the system - * property simantics.executor.blockingMaxThreads. - */ - public static final int MAX_BLOCKING_EXECUTOR_THREADS; - - static { - String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES); - int blockingMaxThreads = CORES; - try { - blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp); - } catch (NumberFormatException e) { - } - MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES); - } - - /** Never acquire non-blocking executor from this field, private use */ - public static ScheduledExecutorService NON_BLOCKING_EXECUTOR; - /** Never acquire non-blocking executor from this field, private use */ - public static ExecutorService BLOCKING_EXECUTOR; - - static ScheduledExecutorService TIMER; - - /** - * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread - * - * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread - */ - public static CurrentThread getCurrentThread() { - return CurrentThread.INSTANCE; - } - - /** - * Get a timer executor. Never add blocking or long-wait runnables to this scheduler - * - * @return Executor that executes a non-blocking work - */ - public static synchronized ScheduledExecutorService getTimer() { - if (TIMER == null) { - final ThreadGroup tg = new ThreadGroup("Timer"); - final AtomicInteger counter = new AtomicInteger(0); - ThreadFactory tf = new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet())); - if (!t.isDaemon()) - t.setDaemon(true); - if (t.getPriority() != Thread.NORM_PRIORITY) - t.setPriority(Thread.NORM_PRIORITY); - return t; - } - }; - TIMER = new ScheduledThreadPoolExecutor( 1, tf ); - } - return TIMER; - } - - /** - * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks - * anything, no semaphores, no synchronized() {} blocks. no Object.wait(). - * - * @return Executor that executes a non-blocking work - */ - public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() { - if (NON_BLOCKING_EXECUTOR == null) { - final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group"); - final AtomicInteger counter = new AtomicInteger(0); - ThreadFactory tf = new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet())); - if (!t.isDaemon()) - t.setDaemon(true); - if (t.getPriority() != Thread.NORM_PRIORITY) - t.setPriority(Thread.NORM_PRIORITY); - return t; - } - }; - NON_BLOCKING_EXECUTOR = - new ScheduledThreadPoolExecutor( CORES, tf ); - } - return NON_BLOCKING_EXECUTOR; - } - - /** - * Get Executor executes work that may lock or block. - * - * @return executor for blocking operations - */ - public static synchronized ExecutorService getBlockingWorkExecutor() { - if (BLOCKING_EXECUTOR == null) { - final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group"); - final AtomicInteger counter = new AtomicInteger(0); - ThreadFactory tf = new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet())); - if (!t.isDaemon()) - t.setDaemon(true); - if (t.getPriority() != Thread.NORM_PRIORITY) - t.setPriority(Thread.NORM_PRIORITY); - return t; - } - }; - BLOCKING_EXECUTOR = -// new ThreadPoolExecutor( -// 0, -// MAX_BLOCKING_EXECUTOR_THREADS, -// 3L, TimeUnit.SECONDS, -// new SynchronousQueue(), -// tf); - new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf); - } - return BLOCKING_EXECUTOR; - } - - /** - * Better thread access allows thread context switching back to waiting - * threads. - * - * @param access - * @return an enhanced version of the specified queue - */ - public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access) - { - if (access instanceof BetterThreadAccess) - return access; - return new BetterThreadAccess(access); - } - - static Map map = - new HashMap(); - - /** - * Executes a runnable in thread synchronously. - * If the thread locked and waits for this thread, then - * the runnable is ran in that thread. - * - * This works only if all thread switching is done - * using this method. - * - * @param threadAccess - * @param runnable - * @return true if thread accepted the runnable - */ - public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable) - { - if (threadAccess instanceof BetterThreadAccess) - threadAccess = ((BetterThreadAccess) threadAccess).ta; - - // Using current thread - if (threadAccess.currentThreadAccess()) - { - try { - runnable.run(); - } catch (RuntimeException e) { - handleRunnableError(e); - } - return true; - } - - final Thread senderThread = Thread.currentThread(); - final WaitingThread wt = new WaitingThread(senderThread); - WaitingThread prevWt; - Event e = new Event(runnable, new EventListener() { - @Override - public void eventDone(Event e) { - wt.completed(e); - }}, null); - - synchronized (ThreadUtils.class) { - // Check if target has WaitingThread. Use it if it does - WaitingThread targetWt = getWaitingThread(threadAccess.getThread()); - Thread waitingForThread = null; - if (targetWt != null) - { - // Check if it is allowed to use the target WT - if (isEventQueuingAllowed(senderThread, targetWt)) - { - if (targetWt.addEvent(e)) { - synchronized(wt) { - waitingForThread = targetWt.thread; - e.setThread(waitingForThread); - wt.waitFor(e); - } - } - } - } - - if (waitingForThread == null) { - synchronized(wt) { - waitingForThread = threadAccess.asyncExec(e); - if (waitingForThread==null) return false; - e.setThread(waitingForThread); - wt.waitFor(e); - } - } - - prevWt = setWaitingThread(senderThread, wt); - } - // run errands in the mean time - wt.waitAndProcessEvents(); - wt.stopAcceptingEvents(); - // Stop accepting errands - removeWaitingThread(senderThread, prevWt); - // Run last events - wt.waitAndProcessEvents(); - return true; - } - - public static boolean multiSyncExec(Collection executions) - { - if (executions.isEmpty()) return true; - return multiSyncExec(executions.toArray(new Executable[executions.size()])); - } - - /** - * Executes a list of executables in multiple threads and waits for all to complete. - * - * @param executions - * @return true once execution has completed (is this necessary?) - */ - public static boolean multiSyncExec(Executable ... executions) - { - if (executions.length==0) return true; - if (executions.length==1) { - return syncExec(executions[0].threadAccess, executions[0].runnable); - } - final Thread senderThread = Thread.currentThread(); - final WaitingThread wt = new WaitingThread(senderThread); - WaitingThread prevWt = null; - synchronized (ThreadUtils.class) { - for (Executable pair : executions) - { - IThreadWorkQueue threadAccess = pair.threadAccess; - if (threadAccess.currentThreadAccess()) - continue; - if (threadAccess instanceof BetterThreadAccess) - threadAccess = ((BetterThreadAccess) threadAccess).ta; - Runnable runnable = pair.runnable; - - Event e = new Event(runnable, new EventListener() { - @Override - public void eventDone(Event e) { - wt.completed(e); - }}, null); - - // Check if target has WaitingThread. Use it if it does - WaitingThread targetWt = getWaitingThread(threadAccess.getThread()); - Thread waitingForThread = null; - if (targetWt != null) - { - // Check if it is allowed to use the target WT - if (isEventQueuingAllowed(senderThread, targetWt)) - { - if (targetWt.addEvent(e)) { - synchronized(wt) { - waitingForThread = targetWt.thread; - e.setThread(waitingForThread); - wt.waitFor(e); - } - } - } - } - - if (waitingForThread == null) { - synchronized(wt) { - waitingForThread = threadAccess.asyncExec(e); - if (waitingForThread==null) - return false; - e.setThread(waitingForThread); - wt.waitFor(e); - } - } - } - prevWt = setWaitingThread(senderThread, wt); - } - - // Run local runnables - for (Executable pair : executions) - { - IThreadWorkQueue threadAccess = pair.threadAccess; - Runnable runnable = pair.runnable; - if (threadAccess.currentThreadAccess()) - try { - runnable.run(); - } catch (RuntimeException e) { - handleRunnableError(e); - } - } - - // run errands in the mean time - wt.waitAndProcessEvents(); - // Stop accepting errands - wt.stopAcceptingEvents(); - removeWaitingThread(senderThread, prevWt); - // Run last events - wt.waitAndProcessEvents(); - return true; - } - - public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable) - { - if (threadAccess instanceof BetterThreadAccess) - threadAccess = ((BetterThreadAccess) threadAccess).ta; - - final Thread senderThread = Thread.currentThread(); - synchronized (ThreadUtils.class) { - - Event e = new Event(runnable, null, null); - - // Check if target has WaitingThread. Use it if it does - WaitingThread targetWt = getWaitingThread(threadAccess.getThread()); - if (targetWt != null) - { - // Check if it is allowed to use the target WT - if (isEventQueuingAllowed(senderThread, targetWt)) - { - if (targetWt.addEvent(e)) - return targetWt.thread; - } - } - return threadAccess.asyncExec(runnable); - } - } - - private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set visitedTargetThreads) - { - assert(targetThread!=null); - if (visitedTargetThreads.contains(targetThread)) - return false; - visitedTargetThreads.add(targetThread); - if (sourceThread == targetThread) return false; - Set waitsFor = getWaitsForThreads(targetThread); - if (waitsFor==null||waitsFor.isEmpty()) return false; - for (Thread aThreadTargetThreadWaitsFor : waitsFor) - { - if (aThreadTargetThreadWaitsFor==sourceThread) return true; - if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue; - if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads)) - return true; - } - return false; - } - - static boolean waitsFor(Thread sourceThread, Thread targetThread) - { - return _waitsFor(sourceThread, targetThread, new HashSet(3)); - } - - static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue) - { - if (!eventQueue.acceptEvents) return false; - // queuing is allowed if target thread does wait for source thread - return waitsFor(sourceThread, eventQueue.thread); - } - - private static void handleRunnableError(Throwable t) - { - t.printStackTrace(); - } - - interface EventListener { - void eventDone(Event e); - } - - public static class Event implements Runnable { - Runnable r; - EventListener l; - Semaphore s; - Thread t; - public Event(Runnable r, EventListener l, Semaphore s) { - this.r = r; - this.l = l; - this.s = s; - } - public void run() { - setThread(Thread.currentThread()); - try { - r.run(); - } catch (RuntimeException e) { - handleRunnableError(e); - } finally { - if (s!=null) - s.release(1); - if (l!=null) - l.eventDone(this); - } - } - public synchronized Thread getThread() - { - while (t==null) { - try { - t.wait(); - } catch (InterruptedException e) { - } - } - return t; - } - public synchronized void setThread(Thread t) - { - assert(t!=null); - if (this.t!=null) - assert(this.t==t); - this.t = t; - notify(); - } - } - - /** - * WaitingThread is a thread that waits for something and in the meantime - * runs errands. - */ - static class WaitingThread - { - final Thread thread; - LinkedList queue = new LinkedList(); - boolean acceptEvents = true; - Set waitingFor = new HashSet(); - Set completed = new HashSet(); - public WaitingThread(Thread thread) { - this.thread = thread; - } - public synchronized void waitFor(Event event) - { - assert(thread!=null); - waitingFor.add(event); - } - public synchronized void completed(Event event) - { - //assert(waitingFor.contains(event)); - completed.add(event); - if (completed.size()==waitingFor.size()) - notify(); - } - synchronized boolean isEmpty() { - return queue.isEmpty(); - } - synchronized boolean keepWaiting() { - if(waitingFor.size()!=completed.size()) - return true; - assert(waitingFor.equals(completed)); - return false; - } - public synchronized boolean addEvent(Event r) - { - if (!acceptEvents) return false; - queue.add(r); - notify(); - return true; - } - - static private int WAIT_MS = 10000; - static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS; - - public void waitAndProcessEvents() - { - while (keepWaiting() || !isEmpty()) { - Event e = null; - synchronized(this) - { - if (!queue.isEmpty()) - e = queue.pop(); - if (e==null && keepWaiting()) - try { - long now = System.nanoTime(); - wait(WAIT_MS); - long duration = System.nanoTime()-now; - if(duration > (WAIT_THRESHOLD_NS)) { - for(Thread t : getWaitingForThreads()) - if(!t.isAlive()) - throw new IllegalStateException("Thread '" + thread + "' has died."); - } - } catch (InterruptedException e1) { - } - } - if (e!=null) { - try { - e.run(); - } catch (RuntimeException e1) { - e1.printStackTrace(); - } - } - } - while (!isEmpty()) - { - Event e = null; - synchronized(this) - { - if (!queue.isEmpty()) - e = queue.pop(); - } - if (e!=null) - try { - e.run(); - } catch (RuntimeException e1) { - e1.printStackTrace(); - } - } - - } - public synchronized void stopAcceptingEvents() - { - acceptEvents = false; - } - public synchronized Set getWaitingForThreads() - { - Set result = new HashSet(waitingFor.size()); - for (Event e : waitingFor) - { - if (completed.contains(e)) continue; - result.add(e.getThread()); - } - return result; - } - } - - /** - * Blocks until waiting thread has been set for thread t - * @param t thread - * @return waiting thread - */ - synchronized static WaitingThread getWaitingThreadSync(Thread t) - { - WaitingThread result; - do { - result = map.get(t); - if (result!=null) return result; - try { - ThreadUtils.class.wait(); - } catch (InterruptedException e) { - } - } while (true); - } - - /** - * Return an array which may have nulls - * @param t - * @return - */ - static Set getWaitsForThreads(Thread t) - { - WaitingThread wt = getWaitingThread(t); - if (wt==null) return null; - return wt.getWaitingForThreads(); - } - - static synchronized WaitingThread getWaitingThread(Thread t) - { - return map.get(t); - } - - static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt) - { - WaitingThread prev = map.put(t, wt); - ThreadUtils.class.notifyAll(); - return prev; - } - - static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith) - { - assert(t == Thread.currentThread()); - map.remove(t); - if (replaceWith!=null) - map.put(t, replaceWith); - } - - static class BetterThreadAccess implements IThreadWorkQueue { - IThreadWorkQueue ta; - public BetterThreadAccess(IThreadWorkQueue ta) - { - if (ta instanceof BetterThreadAccess) - ta = ((BetterThreadAccess) ta).ta; - this.ta = ta; - } - @Override - public Thread asyncExec(Runnable runnable) { - return ThreadUtils.asyncExec(ta, runnable); - } - @Override - public boolean currentThreadAccess() { - return ta.currentThreadAccess(); - } - @Override - public Thread getThread() { - return ta.getThread(); - } - @Override - public boolean syncExec(Runnable runnable) { - return ThreadUtils.syncExec(ta, runnable); - } - } - - - - /** - * Executes command in the executor while maintaining possibility to - * return back to caller. - * - * @param executor - * @param command - */ - public static void exec(Executor executor, final Runnable command) - { - final Thread[] calleeThread = new Thread[1]; - final Thread callerThread = Thread.currentThread(); - final Runnable wrappedCommand = new Runnable() { - @Override - public void run() { - calleeThread[0] = Thread.currentThread(); - try { - command.run(); - } finally { - } - } - }; - - // add dependency - DEPENDENCIES.put(callerThread, calleeThread); - executor.execute(wrappedCommand); - DEPENDENCIES.remove(callerThread); - } - private static Map DEPENDENCIES = - Collections.synchronizedMap( new HashMap() ); - @SuppressWarnings("unused") - private static boolean hasDependency(Thread waiter, Thread worker) - { - // Follow dependency chain until worker is reached - for (Thread t = waiter; t!=null;) - { - Thread[] potentialResult = DEPENDENCIES.get(t); - if (potentialResult==null) break; - t = potentialResult[0]; - if (t==worker) return true; - } - return false; - } - - /** - * Lock multiple locks simultaneously. If all locks cannot be locked the - * thread sleeps a moment and tries again. - * - * If all locks cannot be locked at once, this method doesn't prevent - * other from locking them in the mean time. - * - * @param locks an array of locks. null values are ignored - */ - public static void lock(Lock...locks) { - if (locks.length==0) return; - if (locks.length==1) { - locks[0].lock(); - return; - } - - while (true) { - int i = 0; - for (;il1) { - for (int j=l1; j0) { - for (int j=0; j leftovers = pool.shutdownNow(); // Cancel currently executing tasks - if (!leftovers.isEmpty()) - System.err.println("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers); - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) - System.err.println("Thread pool '" + pool.toString() + "' did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - //long e = System.currentTimeMillis(); - //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms"); - } - - @SuppressWarnings("unused") - private static void uncheckedAwaitTermination(ExecutorService service, long time) { - try { - NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - } - } - - // Executor support - - // Executor that runs in current thread - public static Executor CURRENT_THREAD = new CurrentThreadExecutor(); - - // Async executor queues the command into AWT event queue - public static ExecutorService AWT_EDT = AWTThread.INSTANCE; - - // Sync executor blocks the call until the command is finished - public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync(); - -} - -class AWTExecutorSync extends AbstractExecutorService { - - @Override - public void execute(Runnable command) { - if (EventQueue.isDispatchThread()) - { - command.run(); - } else { - try { - EventQueue.invokeAndWait(command); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e.getCause()); - } - } - } - - @Override - public void shutdown() { - } - - @Override - public List shutdownNow() { - return null; - } - - @Override - public boolean isShutdown() { - return false; - } - - @Override - public boolean isTerminated() { - return false; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - return false; - } - - public static String getStackTrace(int levels){ - StringBuilder sb = new StringBuilder(); - sb.append( Thread.currentThread() ); - sb.append( "\n" ); - Exception e3 = new Exception(); - try { throw e3; } catch(Exception e2) { - - for (int i=1; isimantics.executor.blockingMaxThreads. + */ + public static final int MAX_BLOCKING_EXECUTOR_THREADS; + + static { + String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES); + int blockingMaxThreads = CORES; + try { + blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp); + } catch (NumberFormatException e) { + } + MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES); + } + + /** Never acquire non-blocking executor from this field, private use */ + public static ScheduledExecutorService NON_BLOCKING_EXECUTOR; + /** Never acquire non-blocking executor from this field, private use */ + public static ExecutorService BLOCKING_EXECUTOR; + + static ScheduledExecutorService TIMER; + + /** + * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread + * + * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread + */ + public static CurrentThread getCurrentThread() { + return CurrentThread.INSTANCE; + } + + /** + * Get a timer executor. Never add blocking or long-wait runnables to this scheduler + * + * @return Executor that executes a non-blocking work + */ + public static synchronized ScheduledExecutorService getTimer() { + if (TIMER == null) { + final ThreadGroup tg = new ThreadGroup("Timer"); + final AtomicInteger counter = new AtomicInteger(0); + ThreadFactory tf = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet())); + if (!t.isDaemon()) + t.setDaemon(true); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + }; + TIMER = new ScheduledThreadPoolExecutor( 1, tf ); + } + return TIMER; + } + + /** + * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks + * anything, no semaphores, no synchronized() {} blocks. no Object.wait(). + * + * @return Executor that executes a non-blocking work + */ + public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() { + if (NON_BLOCKING_EXECUTOR == null) { + final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group"); + final AtomicInteger counter = new AtomicInteger(0); + ThreadFactory tf = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet())); + if (!t.isDaemon()) + t.setDaemon(true); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + }; + NON_BLOCKING_EXECUTOR = + new ScheduledThreadPoolExecutor( CORES, tf ); + } + return NON_BLOCKING_EXECUTOR; + } + + /** + * Get Executor executes work that may lock or block. + * + * @return executor for blocking operations + */ + public static synchronized ExecutorService getBlockingWorkExecutor() { + if (BLOCKING_EXECUTOR == null) { + final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group"); + final AtomicInteger counter = new AtomicInteger(0); + ThreadFactory tf = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet())); + if (!t.isDaemon()) + t.setDaemon(true); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + }; + BLOCKING_EXECUTOR = +// new ThreadPoolExecutor( +// 0, +// MAX_BLOCKING_EXECUTOR_THREADS, +// 3L, TimeUnit.SECONDS, +// new SynchronousQueue(), +// tf); + new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf); + } + return BLOCKING_EXECUTOR; + } + + /** + * Better thread access allows thread context switching back to waiting + * threads. + * + * @param access + * @return an enhanced version of the specified queue + */ + public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access) + { + if (access instanceof BetterThreadAccess) + return access; + return new BetterThreadAccess(access); + } + + static Map map = + new HashMap(); + + /** + * Executes a runnable in thread synchronously. + * If the thread locked and waits for this thread, then + * the runnable is ran in that thread. + * + * This works only if all thread switching is done + * using this method. + * + * @param threadAccess + * @param runnable + * @return true if thread accepted the runnable + */ + public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable) + { + if (threadAccess instanceof BetterThreadAccess) + threadAccess = ((BetterThreadAccess) threadAccess).ta; + + // Using current thread + if (threadAccess.currentThreadAccess()) + { + try { + runnable.run(); + } catch (RuntimeException e) { + handleRunnableError(e); + } + return true; + } + + final Thread senderThread = Thread.currentThread(); + final WaitingThread wt = new WaitingThread(senderThread); + WaitingThread prevWt; + Event e = new Event(runnable, new EventListener() { + @Override + public void eventDone(Event e) { + wt.completed(e); + }}, null); + + synchronized (ThreadUtils.class) { + // Check if target has WaitingThread. Use it if it does + WaitingThread targetWt = getWaitingThread(threadAccess.getThread()); + Thread waitingForThread = null; + if (targetWt != null) + { + // Check if it is allowed to use the target WT + if (isEventQueuingAllowed(senderThread, targetWt)) + { + if (targetWt.addEvent(e)) { + synchronized(wt) { + waitingForThread = targetWt.thread; + e.setThread(waitingForThread); + wt.waitFor(e); + } + } + } + } + + if (waitingForThread == null) { + synchronized(wt) { + waitingForThread = threadAccess.asyncExec(e); + if (waitingForThread==null) return false; + e.setThread(waitingForThread); + wt.waitFor(e); + } + } + + prevWt = setWaitingThread(senderThread, wt); + } + // run errands in the mean time + wt.waitAndProcessEvents(); + wt.stopAcceptingEvents(); + // Stop accepting errands + removeWaitingThread(senderThread, prevWt); + // Run last events + wt.waitAndProcessEvents(); + return true; + } + + public static boolean multiSyncExec(Collection executions) + { + if (executions.isEmpty()) return true; + return multiSyncExec(executions.toArray(new Executable[executions.size()])); + } + + /** + * Executes a list of executables in multiple threads and waits for all to complete. + * + * @param executions + * @return true once execution has completed (is this necessary?) + */ + public static boolean multiSyncExec(Executable ... executions) + { + if (executions.length==0) return true; + if (executions.length==1) { + return syncExec(executions[0].threadAccess, executions[0].runnable); + } + final Thread senderThread = Thread.currentThread(); + final WaitingThread wt = new WaitingThread(senderThread); + WaitingThread prevWt = null; + synchronized (ThreadUtils.class) { + for (Executable pair : executions) + { + IThreadWorkQueue threadAccess = pair.threadAccess; + if (threadAccess.currentThreadAccess()) + continue; + if (threadAccess instanceof BetterThreadAccess) + threadAccess = ((BetterThreadAccess) threadAccess).ta; + Runnable runnable = pair.runnable; + + Event e = new Event(runnable, new EventListener() { + @Override + public void eventDone(Event e) { + wt.completed(e); + }}, null); + + // Check if target has WaitingThread. Use it if it does + WaitingThread targetWt = getWaitingThread(threadAccess.getThread()); + Thread waitingForThread = null; + if (targetWt != null) + { + // Check if it is allowed to use the target WT + if (isEventQueuingAllowed(senderThread, targetWt)) + { + if (targetWt.addEvent(e)) { + synchronized(wt) { + waitingForThread = targetWt.thread; + e.setThread(waitingForThread); + wt.waitFor(e); + } + } + } + } + + if (waitingForThread == null) { + synchronized(wt) { + waitingForThread = threadAccess.asyncExec(e); + if (waitingForThread==null) + return false; + e.setThread(waitingForThread); + wt.waitFor(e); + } + } + } + prevWt = setWaitingThread(senderThread, wt); + } + + // Run local runnables + for (Executable pair : executions) + { + IThreadWorkQueue threadAccess = pair.threadAccess; + Runnable runnable = pair.runnable; + if (threadAccess.currentThreadAccess()) + try { + runnable.run(); + } catch (RuntimeException e) { + handleRunnableError(e); + } + } + + // run errands in the mean time + wt.waitAndProcessEvents(); + // Stop accepting errands + wt.stopAcceptingEvents(); + removeWaitingThread(senderThread, prevWt); + // Run last events + wt.waitAndProcessEvents(); + return true; + } + + public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable) + { + if (threadAccess instanceof BetterThreadAccess) + threadAccess = ((BetterThreadAccess) threadAccess).ta; + + final Thread senderThread = Thread.currentThread(); + synchronized (ThreadUtils.class) { + + Event e = new Event(runnable, null, null); + + // Check if target has WaitingThread. Use it if it does + WaitingThread targetWt = getWaitingThread(threadAccess.getThread()); + if (targetWt != null) + { + // Check if it is allowed to use the target WT + if (isEventQueuingAllowed(senderThread, targetWt)) + { + if (targetWt.addEvent(e)) + return targetWt.thread; + } + } + return threadAccess.asyncExec(runnable); + } + } + + private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set visitedTargetThreads) + { + assert(targetThread!=null); + if (visitedTargetThreads.contains(targetThread)) + return false; + visitedTargetThreads.add(targetThread); + if (sourceThread == targetThread) return false; + Set waitsFor = getWaitsForThreads(targetThread); + if (waitsFor==null||waitsFor.isEmpty()) return false; + for (Thread aThreadTargetThreadWaitsFor : waitsFor) + { + if (aThreadTargetThreadWaitsFor==sourceThread) return true; + if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue; + if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads)) + return true; + } + return false; + } + + static boolean waitsFor(Thread sourceThread, Thread targetThread) + { + return _waitsFor(sourceThread, targetThread, new HashSet(3)); + } + + static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue) + { + if (!eventQueue.acceptEvents) return false; + // queuing is allowed if target thread does wait for source thread + return waitsFor(sourceThread, eventQueue.thread); + } + + private static void handleRunnableError(Throwable t) + { + t.printStackTrace(); + } + + interface EventListener { + void eventDone(Event e); + } + + public static class Event implements Runnable { + Runnable r; + EventListener l; + Semaphore s; + Thread t; + public Event(Runnable r, EventListener l, Semaphore s) { + this.r = r; + this.l = l; + this.s = s; + } + public void run() { + setThread(Thread.currentThread()); + try { + r.run(); + } catch (RuntimeException e) { + handleRunnableError(e); + } finally { + if (s!=null) + s.release(1); + if (l!=null) + l.eventDone(this); + } + } + public synchronized Thread getThread() + { + while (t==null) { + try { + t.wait(); + } catch (InterruptedException e) { + } + } + return t; + } + public synchronized void setThread(Thread t) + { + assert(t!=null); + if (this.t!=null) + assert(this.t==t); + this.t = t; + notify(); + } + } + + /** + * WaitingThread is a thread that waits for something and in the meantime + * runs errands. + */ + static class WaitingThread + { + final Thread thread; + LinkedList queue = new LinkedList(); + boolean acceptEvents = true; + Set waitingFor = new HashSet(); + Set completed = new HashSet(); + public WaitingThread(Thread thread) { + this.thread = thread; + } + public synchronized void waitFor(Event event) + { + assert(thread!=null); + waitingFor.add(event); + } + public synchronized void completed(Event event) + { + //assert(waitingFor.contains(event)); + completed.add(event); + if (completed.size()==waitingFor.size()) + notify(); + } + synchronized boolean isEmpty() { + return queue.isEmpty(); + } + synchronized boolean keepWaiting() { + if(waitingFor.size()!=completed.size()) + return true; + assert(waitingFor.equals(completed)); + return false; + } + public synchronized boolean addEvent(Event r) + { + if (!acceptEvents) return false; + queue.add(r); + notify(); + return true; + } + + static private int WAIT_MS = 10000; + static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS; + + public void waitAndProcessEvents() + { + while (keepWaiting() || !isEmpty()) { + Event e = null; + synchronized(this) + { + if (!queue.isEmpty()) + e = queue.pop(); + if (e==null && keepWaiting()) + try { + long now = System.nanoTime(); + wait(WAIT_MS); + long duration = System.nanoTime()-now; + if(duration > (WAIT_THRESHOLD_NS)) { + for(Thread t : getWaitingForThreads()) + if(!t.isAlive()) + throw new IllegalStateException("Thread '" + thread + "' has died."); + } + } catch (InterruptedException e1) { + } + } + if (e!=null) { + try { + e.run(); + } catch (RuntimeException e1) { + e1.printStackTrace(); + } + } + } + while (!isEmpty()) + { + Event e = null; + synchronized(this) + { + if (!queue.isEmpty()) + e = queue.pop(); + } + if (e!=null) + try { + e.run(); + } catch (RuntimeException e1) { + e1.printStackTrace(); + } + } + + } + public synchronized void stopAcceptingEvents() + { + acceptEvents = false; + } + public synchronized Set getWaitingForThreads() + { + Set result = new HashSet(waitingFor.size()); + for (Event e : waitingFor) + { + if (completed.contains(e)) continue; + result.add(e.getThread()); + } + return result; + } + } + + /** + * Blocks until waiting thread has been set for thread t + * @param t thread + * @return waiting thread + */ + synchronized static WaitingThread getWaitingThreadSync(Thread t) + { + WaitingThread result; + do { + result = map.get(t); + if (result!=null) return result; + try { + ThreadUtils.class.wait(); + } catch (InterruptedException e) { + } + } while (true); + } + + /** + * Return an array which may have nulls + * @param t + * @return + */ + static Set getWaitsForThreads(Thread t) + { + WaitingThread wt = getWaitingThread(t); + if (wt==null) return null; + return wt.getWaitingForThreads(); + } + + static synchronized WaitingThread getWaitingThread(Thread t) + { + return map.get(t); + } + + static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt) + { + WaitingThread prev = map.put(t, wt); + ThreadUtils.class.notifyAll(); + return prev; + } + + static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith) + { + assert(t == Thread.currentThread()); + map.remove(t); + if (replaceWith!=null) + map.put(t, replaceWith); + } + + static class BetterThreadAccess implements IThreadWorkQueue { + IThreadWorkQueue ta; + public BetterThreadAccess(IThreadWorkQueue ta) + { + if (ta instanceof BetterThreadAccess) + ta = ((BetterThreadAccess) ta).ta; + this.ta = ta; + } + @Override + public Thread asyncExec(Runnable runnable) { + return ThreadUtils.asyncExec(ta, runnable); + } + @Override + public boolean currentThreadAccess() { + return ta.currentThreadAccess(); + } + @Override + public Thread getThread() { + return ta.getThread(); + } + @Override + public boolean syncExec(Runnable runnable) { + return ThreadUtils.syncExec(ta, runnable); + } + } + + + + /** + * Executes command in the executor while maintaining possibility to + * return back to caller. + * + * @param executor + * @param command + */ + public static void exec(Executor executor, final Runnable command) + { + final Thread[] calleeThread = new Thread[1]; + final Thread callerThread = Thread.currentThread(); + final Runnable wrappedCommand = new Runnable() { + @Override + public void run() { + calleeThread[0] = Thread.currentThread(); + try { + command.run(); + } finally { + } + } + }; + + // add dependency + DEPENDENCIES.put(callerThread, calleeThread); + executor.execute(wrappedCommand); + DEPENDENCIES.remove(callerThread); + } + private static Map DEPENDENCIES = + Collections.synchronizedMap( new HashMap() ); + @SuppressWarnings("unused") + private static boolean hasDependency(Thread waiter, Thread worker) + { + // Follow dependency chain until worker is reached + for (Thread t = waiter; t!=null;) + { + Thread[] potentialResult = DEPENDENCIES.get(t); + if (potentialResult==null) break; + t = potentialResult[0]; + if (t==worker) return true; + } + return false; + } + + /** + * Lock multiple locks simultaneously. If all locks cannot be locked the + * thread sleeps a moment and tries again. + * + * If all locks cannot be locked at once, this method doesn't prevent + * other from locking them in the mean time. + * + * @param locks an array of locks. null values are ignored + */ + public static void lock(Lock...locks) { + if (locks.length==0) return; + if (locks.length==1) { + locks[0].lock(); + return; + } + + while (true) { + int i = 0; + for (;il1) { + for (int j=l1; j0) { + for (int j=0; j leftovers = pool.shutdownNow(); // Cancel currently executing tasks + if (!leftovers.isEmpty()) + LOGGER.warn("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers); + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) + LOGGER.warn("Thread pool '" + pool.toString() + "' did not terminate"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + //long e = System.currentTimeMillis(); + //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms"); + } + + @SuppressWarnings("unused") + private static void uncheckedAwaitTermination(ExecutorService service, long time) { + try { + NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + } + + // Executor support + + // Executor that runs in current thread + public static Executor CURRENT_THREAD = new CurrentThreadExecutor(); + + // Async executor queues the command into AWT event queue + public static ExecutorService AWT_EDT = AWTThread.INSTANCE; + + // Sync executor blocks the call until the command is finished + public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync(); + +} + +class AWTExecutorSync extends AbstractExecutorService { + + @Override + public void execute(Runnable command) { + if (EventQueue.isDispatchThread()) + { + command.run(); + } else { + try { + EventQueue.invokeAndWait(command); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + throw new RuntimeException(e.getCause()); + } + } + } + + @Override + public void shutdown() { + } + + @Override + public List shutdownNow() { + return null; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + return false; + } + + public static String getStackTrace(int levels){ + StringBuilder sb = new StringBuilder(); + sb.append( Thread.currentThread() ); + sb.append( "\n" ); + Exception e3 = new Exception(); + try { throw e3; } catch(Exception e2) { + + for (int i=1; i