X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=bundles%2Forg.simantics.utils.thread%2Fsrc%2Forg%2Fsimantics%2Futils%2Fthreads%2FThreadUtils.java;h=15e818416bbd0d044ddcd05c115f21019409b88b;hb=refs%2Fchanges%2F65%2F1865%2F2;hp=407a66db82f03cd2a6b413c923271a01d4cc7afa;hpb=969bd23cab98a79ca9101af33334000879fb60c5;p=simantics%2Fplatform.git
diff --git a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ThreadUtils.java b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ThreadUtils.java
index 407a66db8..15e818416 100644
--- a/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ThreadUtils.java
+++ b/bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ThreadUtils.java
@@ -1,991 +1,995 @@
-/*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
- * in Industry THTH ry.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * VTT Technical Research Centre of Finland - initial API and implementation
- *******************************************************************************/
-/*
- *
- * @author Toni Kalajainen
- */
-package org.simantics.utils.threads;
-
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
-import java.awt.EventQueue;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-
-/**
- * Utility for switching threads
- *
- * @see Executors2
- * @see IThreadWorkQueue
- */
-public class ThreadUtils {
-
- public static final int CORES = Runtime.getRuntime().availableProcessors();
-
- /**
- * Defines the maximum thread pool size of
- * {@link #getBlockingWorkExecutor()}. The system will enforce this value to
- * be at least Math.max(CORES, 8). It can be configured through the system
- * property simantics.executor.blockingMaxThreads.
- */
- public static final int MAX_BLOCKING_EXECUTOR_THREADS;
-
- static {
- String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES);
- int blockingMaxThreads = CORES;
- try {
- blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp);
- } catch (NumberFormatException e) {
- }
- MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES);
- }
-
- /** Never acquire non-blocking executor from this field, private use */
- public static ScheduledExecutorService NON_BLOCKING_EXECUTOR;
- /** Never acquire non-blocking executor from this field, private use */
- public static ExecutorService BLOCKING_EXECUTOR;
-
- static ScheduledExecutorService TIMER;
-
- /**
- * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread
- *
- * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread
- */
- public static CurrentThread getCurrentThread() {
- return CurrentThread.INSTANCE;
- }
-
- /**
- * Get a timer executor. Never add blocking or long-wait runnables to this scheduler
- *
- * @return Executor that executes a non-blocking work
- */
- public static synchronized ScheduledExecutorService getTimer() {
- if (TIMER == null) {
- final ThreadGroup tg = new ThreadGroup("Timer");
- final AtomicInteger counter = new AtomicInteger(0);
- ThreadFactory tf = new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet()));
- if (!t.isDaemon())
- t.setDaemon(true);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- };
- TIMER = new ScheduledThreadPoolExecutor( 1, tf );
- }
- return TIMER;
- }
-
- /**
- * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks
- * anything, no semaphores, no synchronized() {} blocks. no Object.wait().
- *
- * @return Executor that executes a non-blocking work
- */
- public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() {
- if (NON_BLOCKING_EXECUTOR == null) {
- final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group");
- final AtomicInteger counter = new AtomicInteger(0);
- ThreadFactory tf = new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet()));
- if (!t.isDaemon())
- t.setDaemon(true);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- };
- NON_BLOCKING_EXECUTOR =
- new ScheduledThreadPoolExecutor( CORES, tf );
- }
- return NON_BLOCKING_EXECUTOR;
- }
-
- /**
- * Get Executor executes work that may lock or block.
- *
- * @return executor for blocking operations
- */
- public static synchronized ExecutorService getBlockingWorkExecutor() {
- if (BLOCKING_EXECUTOR == null) {
- final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group");
- final AtomicInteger counter = new AtomicInteger(0);
- ThreadFactory tf = new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet()));
- if (!t.isDaemon())
- t.setDaemon(true);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- };
- BLOCKING_EXECUTOR =
-// new ThreadPoolExecutor(
-// 0,
-// MAX_BLOCKING_EXECUTOR_THREADS,
-// 3L, TimeUnit.SECONDS,
-// new SynchronousQueue(),
-// tf);
- new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf);
- }
- return BLOCKING_EXECUTOR;
- }
-
- /**
- * Better thread access allows thread context switching back to waiting
- * threads.
- *
- * @param access
- * @return an enhanced version of the specified queue
- */
- public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access)
- {
- if (access instanceof BetterThreadAccess)
- return access;
- return new BetterThreadAccess(access);
- }
-
- static Map map =
- new HashMap();
-
- /**
- * Executes a runnable in thread synchronously.
- * If the thread locked and waits for this thread, then
- * the runnable is ran in that thread.
- *
- * This works only if all thread switching is done
- * using this method.
- *
- * @param threadAccess
- * @param runnable
- * @return true if thread accepted the runnable
- */
- public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
- {
- if (threadAccess instanceof BetterThreadAccess)
- threadAccess = ((BetterThreadAccess) threadAccess).ta;
-
- // Using current thread
- if (threadAccess.currentThreadAccess())
- {
- try {
- runnable.run();
- } catch (RuntimeException e) {
- handleRunnableError(e);
- }
- return true;
- }
-
- final Thread senderThread = Thread.currentThread();
- final WaitingThread wt = new WaitingThread(senderThread);
- WaitingThread prevWt;
- Event e = new Event(runnable, new EventListener() {
- @Override
- public void eventDone(Event e) {
- wt.completed(e);
- }}, null);
-
- synchronized (ThreadUtils.class) {
- // Check if target has WaitingThread. Use it if it does
- WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
- Thread waitingForThread = null;
- if (targetWt != null)
- {
- // Check if it is allowed to use the target WT
- if (isEventQueuingAllowed(senderThread, targetWt))
- {
- if (targetWt.addEvent(e)) {
- synchronized(wt) {
- waitingForThread = targetWt.thread;
- e.setThread(waitingForThread);
- wt.waitFor(e);
- }
- }
- }
- }
-
- if (waitingForThread == null) {
- synchronized(wt) {
- waitingForThread = threadAccess.asyncExec(e);
- if (waitingForThread==null) return false;
- e.setThread(waitingForThread);
- wt.waitFor(e);
- }
- }
-
- prevWt = setWaitingThread(senderThread, wt);
- }
- // run errands in the mean time
- wt.waitAndProcessEvents();
- wt.stopAcceptingEvents();
- // Stop accepting errands
- removeWaitingThread(senderThread, prevWt);
- // Run last events
- wt.waitAndProcessEvents();
- return true;
- }
-
- public static boolean multiSyncExec(Collection executions)
- {
- if (executions.isEmpty()) return true;
- return multiSyncExec(executions.toArray(new Executable[executions.size()]));
- }
-
- /**
- * Executes a list of executables in multiple threads and waits for all to complete.
- *
- * @param executions
- * @return true
once execution has completed (is this necessary?)
- */
- public static boolean multiSyncExec(Executable ... executions)
- {
- if (executions.length==0) return true;
- if (executions.length==1) {
- return syncExec(executions[0].threadAccess, executions[0].runnable);
- }
- final Thread senderThread = Thread.currentThread();
- final WaitingThread wt = new WaitingThread(senderThread);
- WaitingThread prevWt = null;
- synchronized (ThreadUtils.class) {
- for (Executable pair : executions)
- {
- IThreadWorkQueue threadAccess = pair.threadAccess;
- if (threadAccess.currentThreadAccess())
- continue;
- if (threadAccess instanceof BetterThreadAccess)
- threadAccess = ((BetterThreadAccess) threadAccess).ta;
- Runnable runnable = pair.runnable;
-
- Event e = new Event(runnable, new EventListener() {
- @Override
- public void eventDone(Event e) {
- wt.completed(e);
- }}, null);
-
- // Check if target has WaitingThread. Use it if it does
- WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
- Thread waitingForThread = null;
- if (targetWt != null)
- {
- // Check if it is allowed to use the target WT
- if (isEventQueuingAllowed(senderThread, targetWt))
- {
- if (targetWt.addEvent(e)) {
- synchronized(wt) {
- waitingForThread = targetWt.thread;
- e.setThread(waitingForThread);
- wt.waitFor(e);
- }
- }
- }
- }
-
- if (waitingForThread == null) {
- synchronized(wt) {
- waitingForThread = threadAccess.asyncExec(e);
- if (waitingForThread==null)
- return false;
- e.setThread(waitingForThread);
- wt.waitFor(e);
- }
- }
- }
- prevWt = setWaitingThread(senderThread, wt);
- }
-
- // Run local runnables
- for (Executable pair : executions)
- {
- IThreadWorkQueue threadAccess = pair.threadAccess;
- Runnable runnable = pair.runnable;
- if (threadAccess.currentThreadAccess())
- try {
- runnable.run();
- } catch (RuntimeException e) {
- handleRunnableError(e);
- }
- }
-
- // run errands in the mean time
- wt.waitAndProcessEvents();
- // Stop accepting errands
- wt.stopAcceptingEvents();
- removeWaitingThread(senderThread, prevWt);
- // Run last events
- wt.waitAndProcessEvents();
- return true;
- }
-
- public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
- {
- if (threadAccess instanceof BetterThreadAccess)
- threadAccess = ((BetterThreadAccess) threadAccess).ta;
-
- final Thread senderThread = Thread.currentThread();
- synchronized (ThreadUtils.class) {
-
- Event e = new Event(runnable, null, null);
-
- // Check if target has WaitingThread. Use it if it does
- WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
- if (targetWt != null)
- {
- // Check if it is allowed to use the target WT
- if (isEventQueuingAllowed(senderThread, targetWt))
- {
- if (targetWt.addEvent(e))
- return targetWt.thread;
- }
- }
- return threadAccess.asyncExec(runnable);
- }
- }
-
- private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set visitedTargetThreads)
- {
- assert(targetThread!=null);
- if (visitedTargetThreads.contains(targetThread))
- return false;
- visitedTargetThreads.add(targetThread);
- if (sourceThread == targetThread) return false;
- Set waitsFor = getWaitsForThreads(targetThread);
- if (waitsFor==null||waitsFor.isEmpty()) return false;
- for (Thread aThreadTargetThreadWaitsFor : waitsFor)
- {
- if (aThreadTargetThreadWaitsFor==sourceThread) return true;
- if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue;
- if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads))
- return true;
- }
- return false;
- }
-
- static boolean waitsFor(Thread sourceThread, Thread targetThread)
- {
- return _waitsFor(sourceThread, targetThread, new HashSet(3));
- }
-
- static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue)
- {
- if (!eventQueue.acceptEvents) return false;
- // queuing is allowed if target thread does wait for source thread
- return waitsFor(sourceThread, eventQueue.thread);
- }
-
- private static void handleRunnableError(Throwable t)
- {
- t.printStackTrace();
- }
-
- interface EventListener {
- void eventDone(Event e);
- }
-
- public static class Event implements Runnable {
- Runnable r;
- EventListener l;
- Semaphore s;
- Thread t;
- public Event(Runnable r, EventListener l, Semaphore s) {
- this.r = r;
- this.l = l;
- this.s = s;
- }
- public void run() {
- setThread(Thread.currentThread());
- try {
- r.run();
- } catch (RuntimeException e) {
- handleRunnableError(e);
- } finally {
- if (s!=null)
- s.release(1);
- if (l!=null)
- l.eventDone(this);
- }
- }
- public synchronized Thread getThread()
- {
- while (t==null) {
- try {
- t.wait();
- } catch (InterruptedException e) {
- }
- }
- return t;
- }
- public synchronized void setThread(Thread t)
- {
- assert(t!=null);
- if (this.t!=null)
- assert(this.t==t);
- this.t = t;
- notify();
- }
- }
-
- /**
- * WaitingThread is a thread that waits for something and in the meantime
- * runs errands.
- */
- static class WaitingThread
- {
- final Thread thread;
- LinkedList queue = new LinkedList();
- boolean acceptEvents = true;
- Set waitingFor = new HashSet();
- Set completed = new HashSet();
- public WaitingThread(Thread thread) {
- this.thread = thread;
- }
- public synchronized void waitFor(Event event)
- {
- assert(thread!=null);
- waitingFor.add(event);
- }
- public synchronized void completed(Event event)
- {
- //assert(waitingFor.contains(event));
- completed.add(event);
- if (completed.size()==waitingFor.size())
- notify();
- }
- synchronized boolean isEmpty() {
- return queue.isEmpty();
- }
- synchronized boolean keepWaiting() {
- if(waitingFor.size()!=completed.size())
- return true;
- assert(waitingFor.equals(completed));
- return false;
- }
- public synchronized boolean addEvent(Event r)
- {
- if (!acceptEvents) return false;
- queue.add(r);
- notify();
- return true;
- }
-
- static private int WAIT_MS = 10000;
- static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS;
-
- public void waitAndProcessEvents()
- {
- while (keepWaiting() || !isEmpty()) {
- Event e = null;
- synchronized(this)
- {
- if (!queue.isEmpty())
- e = queue.pop();
- if (e==null && keepWaiting())
- try {
- long now = System.nanoTime();
- wait(WAIT_MS);
- long duration = System.nanoTime()-now;
- if(duration > (WAIT_THRESHOLD_NS)) {
- for(Thread t : getWaitingForThreads())
- if(!t.isAlive())
- throw new IllegalStateException("Thread '" + thread + "' has died.");
- }
- } catch (InterruptedException e1) {
- }
- }
- if (e!=null) {
- try {
- e.run();
- } catch (RuntimeException e1) {
- e1.printStackTrace();
- }
- }
- }
- while (!isEmpty())
- {
- Event e = null;
- synchronized(this)
- {
- if (!queue.isEmpty())
- e = queue.pop();
- }
- if (e!=null)
- try {
- e.run();
- } catch (RuntimeException e1) {
- e1.printStackTrace();
- }
- }
-
- }
- public synchronized void stopAcceptingEvents()
- {
- acceptEvents = false;
- }
- public synchronized Set getWaitingForThreads()
- {
- Set result = new HashSet(waitingFor.size());
- for (Event e : waitingFor)
- {
- if (completed.contains(e)) continue;
- result.add(e.getThread());
- }
- return result;
- }
- }
-
- /**
- * Blocks until waiting thread has been set for thread t
- * @param t thread
- * @return waiting thread
- */
- synchronized static WaitingThread getWaitingThreadSync(Thread t)
- {
- WaitingThread result;
- do {
- result = map.get(t);
- if (result!=null) return result;
- try {
- ThreadUtils.class.wait();
- } catch (InterruptedException e) {
- }
- } while (true);
- }
-
- /**
- * Return an array which may have nulls
- * @param t
- * @return
- */
- static Set getWaitsForThreads(Thread t)
- {
- WaitingThread wt = getWaitingThread(t);
- if (wt==null) return null;
- return wt.getWaitingForThreads();
- }
-
- static synchronized WaitingThread getWaitingThread(Thread t)
- {
- return map.get(t);
- }
-
- static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt)
- {
- WaitingThread prev = map.put(t, wt);
- ThreadUtils.class.notifyAll();
- return prev;
- }
-
- static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith)
- {
- assert(t == Thread.currentThread());
- map.remove(t);
- if (replaceWith!=null)
- map.put(t, replaceWith);
- }
-
- static class BetterThreadAccess implements IThreadWorkQueue {
- IThreadWorkQueue ta;
- public BetterThreadAccess(IThreadWorkQueue ta)
- {
- if (ta instanceof BetterThreadAccess)
- ta = ((BetterThreadAccess) ta).ta;
- this.ta = ta;
- }
- @Override
- public Thread asyncExec(Runnable runnable) {
- return ThreadUtils.asyncExec(ta, runnable);
- }
- @Override
- public boolean currentThreadAccess() {
- return ta.currentThreadAccess();
- }
- @Override
- public Thread getThread() {
- return ta.getThread();
- }
- @Override
- public boolean syncExec(Runnable runnable) {
- return ThreadUtils.syncExec(ta, runnable);
- }
- }
-
-
-
- /**
- * Executes command in the executor while maintaining possibility to
- * return back to caller.
- *
- * @param executor
- * @param command
- */
- public static void exec(Executor executor, final Runnable command)
- {
- final Thread[] calleeThread = new Thread[1];
- final Thread callerThread = Thread.currentThread();
- final Runnable wrappedCommand = new Runnable() {
- @Override
- public void run() {
- calleeThread[0] = Thread.currentThread();
- try {
- command.run();
- } finally {
- }
- }
- };
-
- // add dependency
- DEPENDENCIES.put(callerThread, calleeThread);
- executor.execute(wrappedCommand);
- DEPENDENCIES.remove(callerThread);
- }
- private static Map DEPENDENCIES =
- Collections.synchronizedMap( new HashMap() );
- @SuppressWarnings("unused")
- private static boolean hasDependency(Thread waiter, Thread worker)
- {
- // Follow dependency chain until worker is reached
- for (Thread t = waiter; t!=null;)
- {
- Thread[] potentialResult = DEPENDENCIES.get(t);
- if (potentialResult==null) break;
- t = potentialResult[0];
- if (t==worker) return true;
- }
- return false;
- }
-
- /**
- * Lock multiple locks simultaneously. If all locks cannot be locked the
- * thread sleeps a moment and tries again.
- *
- * If all locks cannot be locked at once, this method doesn't prevent
- * other from locking them in the mean time.
- *
- * @param locks an array of locks. null values are ignored
- */
- public static void lock(Lock...locks) {
- if (locks.length==0) return;
- if (locks.length==1) {
- locks[0].lock();
- return;
- }
-
- while (true) {
- int i = 0;
- for (;il1) {
- for (int j=l1; j0) {
- for (int j=0; j leftovers = pool.shutdownNow(); // Cancel currently executing tasks
- if (!leftovers.isEmpty())
- System.err.println("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers);
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS))
- System.err.println("Thread pool '" + pool.toString() + "' did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- //long e = System.currentTimeMillis();
- //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms");
- }
-
- @SuppressWarnings("unused")
- private static void uncheckedAwaitTermination(ExecutorService service, long time) {
- try {
- NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- }
- }
-
- // Executor support
-
- // Executor that runs in current thread
- public static Executor CURRENT_THREAD = new CurrentThreadExecutor();
-
- // Async executor queues the command into AWT event queue
- public static ExecutorService AWT_EDT = AWTThread.INSTANCE;
-
- // Sync executor blocks the call until the command is finished
- public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync();
-
-}
-
-class AWTExecutorSync extends AbstractExecutorService {
-
- @Override
- public void execute(Runnable command) {
- if (EventQueue.isDispatchThread())
- {
- command.run();
- } else {
- try {
- EventQueue.invokeAndWait(command);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public void shutdown() {
- }
-
- @Override
- public List shutdownNow() {
- return null;
- }
-
- @Override
- public boolean isShutdown() {
- return false;
- }
-
- @Override
- public boolean isTerminated() {
- return false;
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- return false;
- }
-
- public static String getStackTrace(int levels){
- StringBuilder sb = new StringBuilder();
- sb.append( Thread.currentThread() );
- sb.append( "\n" );
- Exception e3 = new Exception();
- try { throw e3; } catch(Exception e2) {
-
- for (int i=1; isimantics.executor.blockingMaxThreads.
+ */
+ public static final int MAX_BLOCKING_EXECUTOR_THREADS;
+
+ static {
+ String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES);
+ int blockingMaxThreads = CORES;
+ try {
+ blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp);
+ } catch (NumberFormatException e) {
+ }
+ MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES);
+ }
+
+ /** Never acquire non-blocking executor from this field, private use */
+ public static ScheduledExecutorService NON_BLOCKING_EXECUTOR;
+ /** Never acquire non-blocking executor from this field, private use */
+ public static ExecutorService BLOCKING_EXECUTOR;
+
+ static ScheduledExecutorService TIMER;
+
+ /**
+ * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread
+ *
+ * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread
+ */
+ public static CurrentThread getCurrentThread() {
+ return CurrentThread.INSTANCE;
+ }
+
+ /**
+ * Get a timer executor. Never add blocking or long-wait runnables to this scheduler
+ *
+ * @return Executor that executes a non-blocking work
+ */
+ public static synchronized ScheduledExecutorService getTimer() {
+ if (TIMER == null) {
+ final ThreadGroup tg = new ThreadGroup("Timer");
+ final AtomicInteger counter = new AtomicInteger(0);
+ ThreadFactory tf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet()));
+ if (!t.isDaemon())
+ t.setDaemon(true);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ };
+ TIMER = new ScheduledThreadPoolExecutor( 1, tf );
+ }
+ return TIMER;
+ }
+
+ /**
+ * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks
+ * anything, no semaphores, no synchronized() {} blocks. no Object.wait().
+ *
+ * @return Executor that executes a non-blocking work
+ */
+ public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() {
+ if (NON_BLOCKING_EXECUTOR == null) {
+ final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group");
+ final AtomicInteger counter = new AtomicInteger(0);
+ ThreadFactory tf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet()));
+ if (!t.isDaemon())
+ t.setDaemon(true);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ };
+ NON_BLOCKING_EXECUTOR =
+ new ScheduledThreadPoolExecutor( CORES, tf );
+ }
+ return NON_BLOCKING_EXECUTOR;
+ }
+
+ /**
+ * Get Executor executes work that may lock or block.
+ *
+ * @return executor for blocking operations
+ */
+ public static synchronized ExecutorService getBlockingWorkExecutor() {
+ if (BLOCKING_EXECUTOR == null) {
+ final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group");
+ final AtomicInteger counter = new AtomicInteger(0);
+ ThreadFactory tf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet()));
+ if (!t.isDaemon())
+ t.setDaemon(true);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ };
+ BLOCKING_EXECUTOR =
+// new ThreadPoolExecutor(
+// 0,
+// MAX_BLOCKING_EXECUTOR_THREADS,
+// 3L, TimeUnit.SECONDS,
+// new SynchronousQueue(),
+// tf);
+ new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf);
+ }
+ return BLOCKING_EXECUTOR;
+ }
+
+ /**
+ * Better thread access allows thread context switching back to waiting
+ * threads.
+ *
+ * @param access
+ * @return an enhanced version of the specified queue
+ */
+ public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access)
+ {
+ if (access instanceof BetterThreadAccess)
+ return access;
+ return new BetterThreadAccess(access);
+ }
+
+ static Map map =
+ new HashMap();
+
+ /**
+ * Executes a runnable in thread synchronously.
+ * If the thread locked and waits for this thread, then
+ * the runnable is ran in that thread.
+ *
+ * This works only if all thread switching is done
+ * using this method.
+ *
+ * @param threadAccess
+ * @param runnable
+ * @return true if thread accepted the runnable
+ */
+ public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
+ {
+ if (threadAccess instanceof BetterThreadAccess)
+ threadAccess = ((BetterThreadAccess) threadAccess).ta;
+
+ // Using current thread
+ if (threadAccess.currentThreadAccess())
+ {
+ try {
+ runnable.run();
+ } catch (RuntimeException e) {
+ handleRunnableError(e);
+ }
+ return true;
+ }
+
+ final Thread senderThread = Thread.currentThread();
+ final WaitingThread wt = new WaitingThread(senderThread);
+ WaitingThread prevWt;
+ Event e = new Event(runnable, new EventListener() {
+ @Override
+ public void eventDone(Event e) {
+ wt.completed(e);
+ }}, null);
+
+ synchronized (ThreadUtils.class) {
+ // Check if target has WaitingThread. Use it if it does
+ WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
+ Thread waitingForThread = null;
+ if (targetWt != null)
+ {
+ // Check if it is allowed to use the target WT
+ if (isEventQueuingAllowed(senderThread, targetWt))
+ {
+ if (targetWt.addEvent(e)) {
+ synchronized(wt) {
+ waitingForThread = targetWt.thread;
+ e.setThread(waitingForThread);
+ wt.waitFor(e);
+ }
+ }
+ }
+ }
+
+ if (waitingForThread == null) {
+ synchronized(wt) {
+ waitingForThread = threadAccess.asyncExec(e);
+ if (waitingForThread==null) return false;
+ e.setThread(waitingForThread);
+ wt.waitFor(e);
+ }
+ }
+
+ prevWt = setWaitingThread(senderThread, wt);
+ }
+ // run errands in the mean time
+ wt.waitAndProcessEvents();
+ wt.stopAcceptingEvents();
+ // Stop accepting errands
+ removeWaitingThread(senderThread, prevWt);
+ // Run last events
+ wt.waitAndProcessEvents();
+ return true;
+ }
+
+ public static boolean multiSyncExec(Collection executions)
+ {
+ if (executions.isEmpty()) return true;
+ return multiSyncExec(executions.toArray(new Executable[executions.size()]));
+ }
+
+ /**
+ * Executes a list of executables in multiple threads and waits for all to complete.
+ *
+ * @param executions
+ * @return true
once execution has completed (is this necessary?)
+ */
+ public static boolean multiSyncExec(Executable ... executions)
+ {
+ if (executions.length==0) return true;
+ if (executions.length==1) {
+ return syncExec(executions[0].threadAccess, executions[0].runnable);
+ }
+ final Thread senderThread = Thread.currentThread();
+ final WaitingThread wt = new WaitingThread(senderThread);
+ WaitingThread prevWt = null;
+ synchronized (ThreadUtils.class) {
+ for (Executable pair : executions)
+ {
+ IThreadWorkQueue threadAccess = pair.threadAccess;
+ if (threadAccess.currentThreadAccess())
+ continue;
+ if (threadAccess instanceof BetterThreadAccess)
+ threadAccess = ((BetterThreadAccess) threadAccess).ta;
+ Runnable runnable = pair.runnable;
+
+ Event e = new Event(runnable, new EventListener() {
+ @Override
+ public void eventDone(Event e) {
+ wt.completed(e);
+ }}, null);
+
+ // Check if target has WaitingThread. Use it if it does
+ WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
+ Thread waitingForThread = null;
+ if (targetWt != null)
+ {
+ // Check if it is allowed to use the target WT
+ if (isEventQueuingAllowed(senderThread, targetWt))
+ {
+ if (targetWt.addEvent(e)) {
+ synchronized(wt) {
+ waitingForThread = targetWt.thread;
+ e.setThread(waitingForThread);
+ wt.waitFor(e);
+ }
+ }
+ }
+ }
+
+ if (waitingForThread == null) {
+ synchronized(wt) {
+ waitingForThread = threadAccess.asyncExec(e);
+ if (waitingForThread==null)
+ return false;
+ e.setThread(waitingForThread);
+ wt.waitFor(e);
+ }
+ }
+ }
+ prevWt = setWaitingThread(senderThread, wt);
+ }
+
+ // Run local runnables
+ for (Executable pair : executions)
+ {
+ IThreadWorkQueue threadAccess = pair.threadAccess;
+ Runnable runnable = pair.runnable;
+ if (threadAccess.currentThreadAccess())
+ try {
+ runnable.run();
+ } catch (RuntimeException e) {
+ handleRunnableError(e);
+ }
+ }
+
+ // run errands in the mean time
+ wt.waitAndProcessEvents();
+ // Stop accepting errands
+ wt.stopAcceptingEvents();
+ removeWaitingThread(senderThread, prevWt);
+ // Run last events
+ wt.waitAndProcessEvents();
+ return true;
+ }
+
+ public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
+ {
+ if (threadAccess instanceof BetterThreadAccess)
+ threadAccess = ((BetterThreadAccess) threadAccess).ta;
+
+ final Thread senderThread = Thread.currentThread();
+ synchronized (ThreadUtils.class) {
+
+ Event e = new Event(runnable, null, null);
+
+ // Check if target has WaitingThread. Use it if it does
+ WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
+ if (targetWt != null)
+ {
+ // Check if it is allowed to use the target WT
+ if (isEventQueuingAllowed(senderThread, targetWt))
+ {
+ if (targetWt.addEvent(e))
+ return targetWt.thread;
+ }
+ }
+ return threadAccess.asyncExec(runnable);
+ }
+ }
+
+ private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set visitedTargetThreads)
+ {
+ assert(targetThread!=null);
+ if (visitedTargetThreads.contains(targetThread))
+ return false;
+ visitedTargetThreads.add(targetThread);
+ if (sourceThread == targetThread) return false;
+ Set waitsFor = getWaitsForThreads(targetThread);
+ if (waitsFor==null||waitsFor.isEmpty()) return false;
+ for (Thread aThreadTargetThreadWaitsFor : waitsFor)
+ {
+ if (aThreadTargetThreadWaitsFor==sourceThread) return true;
+ if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue;
+ if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads))
+ return true;
+ }
+ return false;
+ }
+
+ static boolean waitsFor(Thread sourceThread, Thread targetThread)
+ {
+ return _waitsFor(sourceThread, targetThread, new HashSet(3));
+ }
+
+ static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue)
+ {
+ if (!eventQueue.acceptEvents) return false;
+ // queuing is allowed if target thread does wait for source thread
+ return waitsFor(sourceThread, eventQueue.thread);
+ }
+
+ private static void handleRunnableError(Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ interface EventListener {
+ void eventDone(Event e);
+ }
+
+ public static class Event implements Runnable {
+ Runnable r;
+ EventListener l;
+ Semaphore s;
+ Thread t;
+ public Event(Runnable r, EventListener l, Semaphore s) {
+ this.r = r;
+ this.l = l;
+ this.s = s;
+ }
+ public void run() {
+ setThread(Thread.currentThread());
+ try {
+ r.run();
+ } catch (RuntimeException e) {
+ handleRunnableError(e);
+ } finally {
+ if (s!=null)
+ s.release(1);
+ if (l!=null)
+ l.eventDone(this);
+ }
+ }
+ public synchronized Thread getThread()
+ {
+ while (t==null) {
+ try {
+ t.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ return t;
+ }
+ public synchronized void setThread(Thread t)
+ {
+ assert(t!=null);
+ if (this.t!=null)
+ assert(this.t==t);
+ this.t = t;
+ notify();
+ }
+ }
+
+ /**
+ * WaitingThread is a thread that waits for something and in the meantime
+ * runs errands.
+ */
+ static class WaitingThread
+ {
+ final Thread thread;
+ LinkedList queue = new LinkedList();
+ boolean acceptEvents = true;
+ Set waitingFor = new HashSet();
+ Set completed = new HashSet();
+ public WaitingThread(Thread thread) {
+ this.thread = thread;
+ }
+ public synchronized void waitFor(Event event)
+ {
+ assert(thread!=null);
+ waitingFor.add(event);
+ }
+ public synchronized void completed(Event event)
+ {
+ //assert(waitingFor.contains(event));
+ completed.add(event);
+ if (completed.size()==waitingFor.size())
+ notify();
+ }
+ synchronized boolean isEmpty() {
+ return queue.isEmpty();
+ }
+ synchronized boolean keepWaiting() {
+ if(waitingFor.size()!=completed.size())
+ return true;
+ assert(waitingFor.equals(completed));
+ return false;
+ }
+ public synchronized boolean addEvent(Event r)
+ {
+ if (!acceptEvents) return false;
+ queue.add(r);
+ notify();
+ return true;
+ }
+
+ static private int WAIT_MS = 10000;
+ static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS;
+
+ public void waitAndProcessEvents()
+ {
+ while (keepWaiting() || !isEmpty()) {
+ Event e = null;
+ synchronized(this)
+ {
+ if (!queue.isEmpty())
+ e = queue.pop();
+ if (e==null && keepWaiting())
+ try {
+ long now = System.nanoTime();
+ wait(WAIT_MS);
+ long duration = System.nanoTime()-now;
+ if(duration > (WAIT_THRESHOLD_NS)) {
+ for(Thread t : getWaitingForThreads())
+ if(!t.isAlive())
+ throw new IllegalStateException("Thread '" + thread + "' has died.");
+ }
+ } catch (InterruptedException e1) {
+ }
+ }
+ if (e!=null) {
+ try {
+ e.run();
+ } catch (RuntimeException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+ while (!isEmpty())
+ {
+ Event e = null;
+ synchronized(this)
+ {
+ if (!queue.isEmpty())
+ e = queue.pop();
+ }
+ if (e!=null)
+ try {
+ e.run();
+ } catch (RuntimeException e1) {
+ e1.printStackTrace();
+ }
+ }
+
+ }
+ public synchronized void stopAcceptingEvents()
+ {
+ acceptEvents = false;
+ }
+ public synchronized Set getWaitingForThreads()
+ {
+ Set result = new HashSet(waitingFor.size());
+ for (Event e : waitingFor)
+ {
+ if (completed.contains(e)) continue;
+ result.add(e.getThread());
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Blocks until waiting thread has been set for thread t
+ * @param t thread
+ * @return waiting thread
+ */
+ synchronized static WaitingThread getWaitingThreadSync(Thread t)
+ {
+ WaitingThread result;
+ do {
+ result = map.get(t);
+ if (result!=null) return result;
+ try {
+ ThreadUtils.class.wait();
+ } catch (InterruptedException e) {
+ }
+ } while (true);
+ }
+
+ /**
+ * Return an array which may have nulls
+ * @param t
+ * @return
+ */
+ static Set getWaitsForThreads(Thread t)
+ {
+ WaitingThread wt = getWaitingThread(t);
+ if (wt==null) return null;
+ return wt.getWaitingForThreads();
+ }
+
+ static synchronized WaitingThread getWaitingThread(Thread t)
+ {
+ return map.get(t);
+ }
+
+ static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt)
+ {
+ WaitingThread prev = map.put(t, wt);
+ ThreadUtils.class.notifyAll();
+ return prev;
+ }
+
+ static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith)
+ {
+ assert(t == Thread.currentThread());
+ map.remove(t);
+ if (replaceWith!=null)
+ map.put(t, replaceWith);
+ }
+
+ static class BetterThreadAccess implements IThreadWorkQueue {
+ IThreadWorkQueue ta;
+ public BetterThreadAccess(IThreadWorkQueue ta)
+ {
+ if (ta instanceof BetterThreadAccess)
+ ta = ((BetterThreadAccess) ta).ta;
+ this.ta = ta;
+ }
+ @Override
+ public Thread asyncExec(Runnable runnable) {
+ return ThreadUtils.asyncExec(ta, runnable);
+ }
+ @Override
+ public boolean currentThreadAccess() {
+ return ta.currentThreadAccess();
+ }
+ @Override
+ public Thread getThread() {
+ return ta.getThread();
+ }
+ @Override
+ public boolean syncExec(Runnable runnable) {
+ return ThreadUtils.syncExec(ta, runnable);
+ }
+ }
+
+
+
+ /**
+ * Executes command in the executor while maintaining possibility to
+ * return back to caller.
+ *
+ * @param executor
+ * @param command
+ */
+ public static void exec(Executor executor, final Runnable command)
+ {
+ final Thread[] calleeThread = new Thread[1];
+ final Thread callerThread = Thread.currentThread();
+ final Runnable wrappedCommand = new Runnable() {
+ @Override
+ public void run() {
+ calleeThread[0] = Thread.currentThread();
+ try {
+ command.run();
+ } finally {
+ }
+ }
+ };
+
+ // add dependency
+ DEPENDENCIES.put(callerThread, calleeThread);
+ executor.execute(wrappedCommand);
+ DEPENDENCIES.remove(callerThread);
+ }
+ private static Map DEPENDENCIES =
+ Collections.synchronizedMap( new HashMap() );
+ @SuppressWarnings("unused")
+ private static boolean hasDependency(Thread waiter, Thread worker)
+ {
+ // Follow dependency chain until worker is reached
+ for (Thread t = waiter; t!=null;)
+ {
+ Thread[] potentialResult = DEPENDENCIES.get(t);
+ if (potentialResult==null) break;
+ t = potentialResult[0];
+ if (t==worker) return true;
+ }
+ return false;
+ }
+
+ /**
+ * Lock multiple locks simultaneously. If all locks cannot be locked the
+ * thread sleeps a moment and tries again.
+ *
+ * If all locks cannot be locked at once, this method doesn't prevent
+ * other from locking them in the mean time.
+ *
+ * @param locks an array of locks. null values are ignored
+ */
+ public static void lock(Lock...locks) {
+ if (locks.length==0) return;
+ if (locks.length==1) {
+ locks[0].lock();
+ return;
+ }
+
+ while (true) {
+ int i = 0;
+ for (;il1) {
+ for (int j=l1; j0) {
+ for (int j=0; j leftovers = pool.shutdownNow(); // Cancel currently executing tasks
+ if (!leftovers.isEmpty())
+ LOGGER.warn("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers);
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS))
+ LOGGER.warn("Thread pool '" + pool.toString() + "' did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ //long e = System.currentTimeMillis();
+ //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms");
+ }
+
+ @SuppressWarnings("unused")
+ private static void uncheckedAwaitTermination(ExecutorService service, long time) {
+ try {
+ NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Executor support
+
+ // Executor that runs in current thread
+ public static Executor CURRENT_THREAD = new CurrentThreadExecutor();
+
+ // Async executor queues the command into AWT event queue
+ public static ExecutorService AWT_EDT = AWTThread.INSTANCE;
+
+ // Sync executor blocks the call until the command is finished
+ public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync();
+
+}
+
+class AWTExecutorSync extends AbstractExecutorService {
+
+ @Override
+ public void execute(Runnable command) {
+ if (EventQueue.isDispatchThread())
+ {
+ command.run();
+ } else {
+ try {
+ EventQueue.invokeAndWait(command);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+ @Override
+ public List shutdownNow() {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return false;
+ }
+
+ public static String getStackTrace(int levels){
+ StringBuilder sb = new StringBuilder();
+ sb.append( Thread.currentThread() );
+ sb.append( "\n" );
+ Exception e3 = new Exception();
+ try { throw e3; } catch(Exception e2) {
+
+ for (int i=1; i