--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+/*\r
+ *\r
+ * @author Toni Kalajainen\r
+ */\r
+package org.simantics.utils.threads;\r
+\r
+import static java.util.concurrent.TimeUnit.NANOSECONDS;\r
+\r
+import java.awt.EventQueue;\r
+import java.lang.reflect.InvocationTargetException;\r
+import java.util.Collection;\r
+import java.util.Collections;\r
+import java.util.HashMap;\r
+import java.util.HashSet;\r
+import java.util.LinkedList;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.concurrent.AbstractExecutorService;\r
+import java.util.concurrent.Executor;\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.ScheduledExecutorService;\r
+import java.util.concurrent.ScheduledThreadPoolExecutor;\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.ThreadFactory;\r
+import java.util.concurrent.TimeUnit;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+import java.util.concurrent.locks.Lock;\r
+\r
+/**\r
+ * Utility for switching threads\r
+ * \r
+ * @see Executors2\r
+ * @see IThreadWorkQueue\r
+ */\r
+public class ThreadUtils {\r
+\r
+ public static final int CORES = Runtime.getRuntime().availableProcessors();\r
+\r
+ /**\r
+ * Defines the maximum thread pool size of\r
+ * {@link #getBlockingWorkExecutor()}. The system will enforce this value to\r
+ * be at least Math.max(CORES, 8). It can be configured through the system\r
+ * property <em>simantics.executor.blockingMaxThreads</em>.\r
+ */\r
+ public static final int MAX_BLOCKING_EXECUTOR_THREADS;\r
+\r
+ static {\r
+ String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES);\r
+ int blockingMaxThreads = CORES;\r
+ try {\r
+ blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp);\r
+ } catch (NumberFormatException e) {\r
+ }\r
+ MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES);\r
+ }\r
+\r
+ /** Never acquire non-blocking executor from this field, private use */\r
+ public static ScheduledExecutorService NON_BLOCKING_EXECUTOR;\r
+ /** Never acquire non-blocking executor from this field, private use */\r
+ public static ExecutorService BLOCKING_EXECUTOR;\r
+\r
+ static ScheduledExecutorService TIMER;\r
+\r
+ /**\r
+ * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread\r
+ * \r
+ * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread\r
+ */\r
+ public static CurrentThread getCurrentThread() {\r
+ return CurrentThread.INSTANCE;\r
+ }\r
+\r
+ /**\r
+ * Get a timer executor. Never add blocking or long-wait runnables to this scheduler \r
+ * \r
+ * @return Executor that executes a non-blocking work\r
+ */\r
+ public static synchronized ScheduledExecutorService getTimer() {\r
+ if (TIMER == null) {\r
+ final ThreadGroup tg = new ThreadGroup("Timer");\r
+ final AtomicInteger counter = new AtomicInteger(0);\r
+ ThreadFactory tf = new ThreadFactory() {\r
+ @Override\r
+ public Thread newThread(Runnable r) {\r
+ Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet()));\r
+ if (!t.isDaemon())\r
+ t.setDaemon(true);\r
+ if (t.getPriority() != Thread.NORM_PRIORITY)\r
+ t.setPriority(Thread.NORM_PRIORITY);\r
+ return t;\r
+ }\r
+ };\r
+ TIMER = new ScheduledThreadPoolExecutor( 1, tf );\r
+ }\r
+ return TIMER;\r
+ }\r
+\r
+ /**\r
+ * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks\r
+ * anything, no semaphores, no synchronized() {} blocks. no Object.wait(). \r
+ * \r
+ * @return Executor that executes a non-blocking work\r
+ */\r
+ public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() {\r
+ if (NON_BLOCKING_EXECUTOR == null) {\r
+ final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group");\r
+ final AtomicInteger counter = new AtomicInteger(0);\r
+ ThreadFactory tf = new ThreadFactory() {\r
+ @Override\r
+ public Thread newThread(Runnable r) {\r
+ Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet()));\r
+ if (!t.isDaemon())\r
+ t.setDaemon(true);\r
+ if (t.getPriority() != Thread.NORM_PRIORITY)\r
+ t.setPriority(Thread.NORM_PRIORITY);\r
+ return t;\r
+ }\r
+ };\r
+ NON_BLOCKING_EXECUTOR =\r
+ new ScheduledThreadPoolExecutor( CORES, tf );\r
+ }\r
+ return NON_BLOCKING_EXECUTOR;\r
+ }\r
+\r
+ /**\r
+ * Get Executor executes work that may lock or block. \r
+ * \r
+ * @return executor for blocking operations\r
+ */ \r
+ public static synchronized ExecutorService getBlockingWorkExecutor() {\r
+ if (BLOCKING_EXECUTOR == null) {\r
+ final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group");\r
+ final AtomicInteger counter = new AtomicInteger(0);\r
+ ThreadFactory tf = new ThreadFactory() {\r
+ @Override\r
+ public Thread newThread(Runnable r) {\r
+ Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet()));\r
+ if (!t.isDaemon())\r
+ t.setDaemon(true);\r
+ if (t.getPriority() != Thread.NORM_PRIORITY)\r
+ t.setPriority(Thread.NORM_PRIORITY);\r
+ return t;\r
+ }\r
+ };\r
+ BLOCKING_EXECUTOR =\r
+// new ThreadPoolExecutor(\r
+// 0,\r
+// MAX_BLOCKING_EXECUTOR_THREADS,\r
+// 3L, TimeUnit.SECONDS,\r
+// new SynchronousQueue<Runnable>(),\r
+// tf);\r
+ new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf);\r
+ }\r
+ return BLOCKING_EXECUTOR;\r
+ }\r
+\r
+ /**\r
+ * Better thread access allows thread context switching back to waiting\r
+ * threads.\r
+ * \r
+ * @param access\r
+ * @return an enhanced version of the specified queue\r
+ */\r
+ public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access)\r
+ {\r
+ if (access instanceof BetterThreadAccess)\r
+ return access;\r
+ return new BetterThreadAccess(access);\r
+ }\r
+\r
+ static Map<Thread, WaitingThread> map =\r
+ new HashMap<Thread, WaitingThread>();\r
+\r
+ /**\r
+ * Executes a runnable in thread synchronously.\r
+ * If the thread locked and waits for this thread, then\r
+ * the runnable is ran in that thread.\r
+ * \r
+ * This works only if all thread switching is done\r
+ * using this method.\r
+ * \r
+ * @param threadAccess\r
+ * @param runnable\r
+ * @return true if thread accepted the runnable\r
+ */\r
+ public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable)\r
+ {\r
+ if (threadAccess instanceof BetterThreadAccess)\r
+ threadAccess = ((BetterThreadAccess) threadAccess).ta;\r
+\r
+ // Using current thread\r
+ if (threadAccess.currentThreadAccess())\r
+ {\r
+ try {\r
+ runnable.run();\r
+ } catch (RuntimeException e) {\r
+ handleRunnableError(e);\r
+ }\r
+ return true;\r
+ }\r
+\r
+ final Thread senderThread = Thread.currentThread();\r
+ final WaitingThread wt = new WaitingThread(senderThread);\r
+ WaitingThread prevWt;\r
+ Event e = new Event(runnable, new EventListener() {\r
+ @Override\r
+ public void eventDone(Event e) {\r
+ wt.completed(e);\r
+ }}, null);\r
+\r
+ synchronized (ThreadUtils.class) {\r
+ // Check if target has WaitingThread. Use it if it does\r
+ WaitingThread targetWt = getWaitingThread(threadAccess.getThread());\r
+ Thread waitingForThread = null;\r
+ if (targetWt != null)\r
+ {\r
+ // Check if it is allowed to use the target WT\r
+ if (isEventQueuingAllowed(senderThread, targetWt))\r
+ {\r
+ if (targetWt.addEvent(e)) {\r
+ synchronized(wt) {\r
+ waitingForThread = targetWt.thread;\r
+ e.setThread(waitingForThread);\r
+ wt.waitFor(e);\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ if (waitingForThread == null) {\r
+ synchronized(wt) {\r
+ waitingForThread = threadAccess.asyncExec(e);\r
+ if (waitingForThread==null) return false;\r
+ e.setThread(waitingForThread);\r
+ wt.waitFor(e);\r
+ }\r
+ }\r
+\r
+ prevWt = setWaitingThread(senderThread, wt);\r
+ }\r
+ // run errands in the mean time\r
+ wt.waitAndProcessEvents();\r
+ wt.stopAcceptingEvents();\r
+ // Stop accepting errands\r
+ removeWaitingThread(senderThread, prevWt);\r
+ // Run last events\r
+ wt.waitAndProcessEvents();\r
+ return true;\r
+ }\r
+\r
+ public static boolean multiSyncExec(Collection<Executable> executions)\r
+ {\r
+ if (executions.isEmpty()) return true;\r
+ return multiSyncExec(executions.toArray(new Executable[executions.size()]));\r
+ }\r
+\r
+ /**\r
+ * Executes a list of executables in multiple threads and waits for all to complete.\r
+ * \r
+ * @param executions\r
+ * @return <code>true</code> once execution has completed (is this necessary?)\r
+ */\r
+ public static boolean multiSyncExec(Executable ... executions)\r
+ {\r
+ if (executions.length==0) return true;\r
+ if (executions.length==1) {\r
+ return syncExec(executions[0].threadAccess, executions[0].runnable);\r
+ }\r
+ final Thread senderThread = Thread.currentThread();\r
+ final WaitingThread wt = new WaitingThread(senderThread);\r
+ WaitingThread prevWt = null;\r
+ synchronized (ThreadUtils.class) {\r
+ for (Executable pair : executions)\r
+ {\r
+ IThreadWorkQueue threadAccess = pair.threadAccess;\r
+ if (threadAccess.currentThreadAccess())\r
+ continue;\r
+ if (threadAccess instanceof BetterThreadAccess)\r
+ threadAccess = ((BetterThreadAccess) threadAccess).ta;\r
+ Runnable runnable = pair.runnable;\r
+\r
+ Event e = new Event(runnable, new EventListener() {\r
+ @Override\r
+ public void eventDone(Event e) {\r
+ wt.completed(e);\r
+ }}, null);\r
+\r
+ // Check if target has WaitingThread. Use it if it does\r
+ WaitingThread targetWt = getWaitingThread(threadAccess.getThread());\r
+ Thread waitingForThread = null;\r
+ if (targetWt != null)\r
+ {\r
+ // Check if it is allowed to use the target WT\r
+ if (isEventQueuingAllowed(senderThread, targetWt))\r
+ {\r
+ if (targetWt.addEvent(e)) {\r
+ synchronized(wt) {\r
+ waitingForThread = targetWt.thread;\r
+ e.setThread(waitingForThread);\r
+ wt.waitFor(e);\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ if (waitingForThread == null) {\r
+ synchronized(wt) {\r
+ waitingForThread = threadAccess.asyncExec(e);\r
+ if (waitingForThread==null)\r
+ return false;\r
+ e.setThread(waitingForThread);\r
+ wt.waitFor(e);\r
+ }\r
+ }\r
+ }\r
+ prevWt = setWaitingThread(senderThread, wt);\r
+ }\r
+\r
+ // Run local runnables\r
+ for (Executable pair : executions)\r
+ {\r
+ IThreadWorkQueue threadAccess = pair.threadAccess;\r
+ Runnable runnable = pair.runnable;\r
+ if (threadAccess.currentThreadAccess())\r
+ try {\r
+ runnable.run();\r
+ } catch (RuntimeException e) {\r
+ handleRunnableError(e);\r
+ }\r
+ }\r
+\r
+ // run errands in the mean time\r
+ wt.waitAndProcessEvents();\r
+ // Stop accepting errands\r
+ wt.stopAcceptingEvents();\r
+ removeWaitingThread(senderThread, prevWt);\r
+ // Run last events\r
+ wt.waitAndProcessEvents();\r
+ return true;\r
+ }\r
+\r
+ public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable)\r
+ {\r
+ if (threadAccess instanceof BetterThreadAccess)\r
+ threadAccess = ((BetterThreadAccess) threadAccess).ta;\r
+\r
+ final Thread senderThread = Thread.currentThread();\r
+ synchronized (ThreadUtils.class) {\r
+\r
+ Event e = new Event(runnable, null, null);\r
+\r
+ // Check if target has WaitingThread. Use it if it does\r
+ WaitingThread targetWt = getWaitingThread(threadAccess.getThread());\r
+ if (targetWt != null)\r
+ {\r
+ // Check if it is allowed to use the target WT\r
+ if (isEventQueuingAllowed(senderThread, targetWt))\r
+ {\r
+ if (targetWt.addEvent(e))\r
+ return targetWt.thread;\r
+ }\r
+ }\r
+ return threadAccess.asyncExec(runnable);\r
+ }\r
+ }\r
+\r
+ private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set<Thread> visitedTargetThreads)\r
+ {\r
+ assert(targetThread!=null);\r
+ if (visitedTargetThreads.contains(targetThread))\r
+ return false;\r
+ visitedTargetThreads.add(targetThread);\r
+ if (sourceThread == targetThread) return false;\r
+ Set<Thread> waitsFor = getWaitsForThreads(targetThread);\r
+ if (waitsFor==null||waitsFor.isEmpty()) return false;\r
+ for (Thread aThreadTargetThreadWaitsFor : waitsFor)\r
+ {\r
+ if (aThreadTargetThreadWaitsFor==sourceThread) return true;\r
+ if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue;\r
+ if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads))\r
+ return true;\r
+ }\r
+ return false;\r
+ }\r
+\r
+ static boolean waitsFor(Thread sourceThread, Thread targetThread)\r
+ {\r
+ return _waitsFor(sourceThread, targetThread, new HashSet<Thread>(3));\r
+ }\r
+\r
+ static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue)\r
+ {\r
+ if (!eventQueue.acceptEvents) return false;\r
+ // queuing is allowed if target thread does wait for source thread\r
+ return waitsFor(sourceThread, eventQueue.thread);\r
+ }\r
+\r
+ private static void handleRunnableError(Throwable t)\r
+ {\r
+ t.printStackTrace();\r
+ }\r
+\r
+ interface EventListener {\r
+ void eventDone(Event e);\r
+ }\r
+\r
+ public static class Event implements Runnable {\r
+ Runnable r;\r
+ EventListener l;\r
+ Semaphore s;\r
+ Thread t;\r
+ public Event(Runnable r, EventListener l, Semaphore s) {\r
+ this.r = r;\r
+ this.l = l;\r
+ this.s = s;\r
+ }\r
+ public void run() {\r
+ setThread(Thread.currentThread());\r
+ try {\r
+ r.run();\r
+ } catch (RuntimeException e) {\r
+ handleRunnableError(e);\r
+ } finally {\r
+ if (s!=null)\r
+ s.release(1);\r
+ if (l!=null)\r
+ l.eventDone(this);\r
+ }\r
+ }\r
+ public synchronized Thread getThread()\r
+ {\r
+ while (t==null) {\r
+ try {\r
+ t.wait();\r
+ } catch (InterruptedException e) {\r
+ }\r
+ }\r
+ return t;\r
+ }\r
+ public synchronized void setThread(Thread t)\r
+ {\r
+ assert(t!=null);\r
+ if (this.t!=null)\r
+ assert(this.t==t);\r
+ this.t = t;\r
+ notify();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * WaitingThread is a thread that waits for something and in the meantime\r
+ * runs errands.\r
+ */\r
+ static class WaitingThread\r
+ {\r
+ final Thread thread;\r
+ LinkedList<Event> queue = new LinkedList<Event>();\r
+ boolean acceptEvents = true;\r
+ Set<Event> waitingFor = new HashSet<Event>();\r
+ Set<Event> completed = new HashSet<Event>();\r
+ public WaitingThread(Thread thread) {\r
+ this.thread = thread;\r
+ }\r
+ public synchronized void waitFor(Event event)\r
+ {\r
+ assert(thread!=null);\r
+ waitingFor.add(event);\r
+ }\r
+ public synchronized void completed(Event event)\r
+ {\r
+ //assert(waitingFor.contains(event));\r
+ completed.add(event);\r
+ if (completed.size()==waitingFor.size())\r
+ notify();\r
+ }\r
+ synchronized boolean isEmpty() {\r
+ return queue.isEmpty();\r
+ }\r
+ synchronized boolean keepWaiting() {\r
+ if(waitingFor.size()!=completed.size())\r
+ return true;\r
+ assert(waitingFor.equals(completed));\r
+ return false;\r
+ }\r
+ public synchronized boolean addEvent(Event r)\r
+ {\r
+ if (!acceptEvents) return false;\r
+ queue.add(r);\r
+ notify();\r
+ return true;\r
+ }\r
+ \r
+ static private int WAIT_MS = 10000;\r
+ static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS;\r
+ \r
+ public void waitAndProcessEvents()\r
+ {\r
+ while (keepWaiting() || !isEmpty()) {\r
+ Event e = null;\r
+ synchronized(this)\r
+ {\r
+ if (!queue.isEmpty())\r
+ e = queue.pop();\r
+ if (e==null && keepWaiting())\r
+ try {\r
+ long now = System.nanoTime();\r
+ wait(WAIT_MS);\r
+ long duration = System.nanoTime()-now;\r
+ if(duration > (WAIT_THRESHOLD_NS)) {\r
+ for(Thread t : getWaitingForThreads())\r
+ if(!t.isAlive())\r
+ throw new IllegalStateException("Thread '" + thread + "' has died.");\r
+ }\r
+ } catch (InterruptedException e1) {\r
+ }\r
+ }\r
+ if (e!=null) {\r
+ try {\r
+ e.run();\r
+ } catch (RuntimeException e1) {\r
+ e1.printStackTrace();\r
+ }\r
+ }\r
+ }\r
+ while (!isEmpty())\r
+ {\r
+ Event e = null;\r
+ synchronized(this)\r
+ {\r
+ if (!queue.isEmpty())\r
+ e = queue.pop();\r
+ }\r
+ if (e!=null)\r
+ try {\r
+ e.run();\r
+ } catch (RuntimeException e1) {\r
+ e1.printStackTrace();\r
+ }\r
+ }\r
+\r
+ }\r
+ public synchronized void stopAcceptingEvents()\r
+ {\r
+ acceptEvents = false;\r
+ }\r
+ public synchronized Set<Thread> getWaitingForThreads()\r
+ {\r
+ Set<Thread> result = new HashSet<Thread>(waitingFor.size());\r
+ for (Event e : waitingFor)\r
+ {\r
+ if (completed.contains(e)) continue;\r
+ result.add(e.getThread());\r
+ }\r
+ return result;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Blocks until waiting thread has been set for thread t\r
+ * @param t thread\r
+ * @return waiting thread\r
+ */\r
+ synchronized static WaitingThread getWaitingThreadSync(Thread t)\r
+ {\r
+ WaitingThread result;\r
+ do {\r
+ result = map.get(t);\r
+ if (result!=null) return result;\r
+ try {\r
+ ThreadUtils.class.wait();\r
+ } catch (InterruptedException e) {\r
+ }\r
+ } while (true);\r
+ }\r
+\r
+ /**\r
+ * Return an array which may have nulls\r
+ * @param t\r
+ * @return\r
+ */\r
+ static Set<Thread> getWaitsForThreads(Thread t)\r
+ {\r
+ WaitingThread wt = getWaitingThread(t);\r
+ if (wt==null) return null;\r
+ return wt.getWaitingForThreads();\r
+ }\r
+\r
+ static synchronized WaitingThread getWaitingThread(Thread t)\r
+ {\r
+ return map.get(t);\r
+ }\r
+\r
+ static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt)\r
+ {\r
+ WaitingThread prev = map.put(t, wt);\r
+ ThreadUtils.class.notifyAll();\r
+ return prev;\r
+ }\r
+\r
+ static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith)\r
+ {\r
+ assert(t == Thread.currentThread());\r
+ map.remove(t);\r
+ if (replaceWith!=null)\r
+ map.put(t, replaceWith);\r
+ }\r
+\r
+ static class BetterThreadAccess implements IThreadWorkQueue {\r
+ IThreadWorkQueue ta;\r
+ public BetterThreadAccess(IThreadWorkQueue ta)\r
+ {\r
+ if (ta instanceof BetterThreadAccess)\r
+ ta = ((BetterThreadAccess) ta).ta;\r
+ this.ta = ta;\r
+ }\r
+ @Override\r
+ public Thread asyncExec(Runnable runnable) {\r
+ return ThreadUtils.asyncExec(ta, runnable);\r
+ }\r
+ @Override\r
+ public boolean currentThreadAccess() {\r
+ return ta.currentThreadAccess();\r
+ }\r
+ @Override\r
+ public Thread getThread() {\r
+ return ta.getThread();\r
+ }\r
+ @Override\r
+ public boolean syncExec(Runnable runnable) {\r
+ return ThreadUtils.syncExec(ta, runnable);\r
+ }\r
+ }\r
+\r
+\r
+\r
+ /**\r
+ * Executes command in the executor while maintaining possibility to\r
+ * return back to caller.\r
+ * \r
+ * @param executor\r
+ * @param command\r
+ */\r
+ public static void exec(Executor executor, final Runnable command)\r
+ {\r
+ final Thread[] calleeThread = new Thread[1];\r
+ final Thread callerThread = Thread.currentThread();\r
+ final Runnable wrappedCommand = new Runnable() {\r
+ @Override\r
+ public void run() {\r
+ calleeThread[0] = Thread.currentThread();\r
+ try {\r
+ command.run();\r
+ } finally {\r
+ }\r
+ }\r
+ };\r
+\r
+ // add dependency\r
+ DEPENDENCIES.put(callerThread, calleeThread);\r
+ executor.execute(wrappedCommand);\r
+ DEPENDENCIES.remove(callerThread);\r
+ }\r
+ private static Map<Thread, Thread[]> DEPENDENCIES =\r
+ Collections.synchronizedMap( new HashMap<Thread, Thread[]>() );\r
+ @SuppressWarnings("unused")\r
+ private static boolean hasDependency(Thread waiter, Thread worker)\r
+ {\r
+ // Follow dependency chain until worker is reached\r
+ for (Thread t = waiter; t!=null;)\r
+ {\r
+ Thread[] potentialResult = DEPENDENCIES.get(t);\r
+ if (potentialResult==null) break;\r
+ t = potentialResult[0];\r
+ if (t==worker) return true;\r
+ }\r
+ return false;\r
+ }\r
+ \r
+ /**\r
+ * Lock multiple locks simultaneously. If all locks cannot be locked the \r
+ * thread sleeps a moment and tries again. \r
+ * \r
+ * If all locks cannot be locked at once, this method doesn't prevent \r
+ * other from locking them in the mean time.\r
+ * \r
+ * @param locks an array of locks. null values are ignored\r
+ */\r
+ public static void lock(Lock...locks) {\r
+ if (locks.length==0) return;\r
+ if (locks.length==1) {\r
+ locks[0].lock();\r
+ return;\r
+ }\r
+\r
+ while (true) {\r
+ int i = 0;\r
+ for (;i<locks.length; i++) {\r
+ Lock l = locks[i];\r
+ if (l==null) continue;\r
+ if ( !locks[i].tryLock() ) break;\r
+ }\r
+ if (i==locks.length) return;\r
+ for (int j=0; j<i; j++) {\r
+ Lock l = locks[j];\r
+ if (l==null) continue;\r
+ l.unlock();\r
+ }\r
+ try { NANOSECONDS.sleep(10000); } catch (InterruptedException e) {}\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Lock multiple locks simultaneously. If all locks cannot be locked the \r
+ * thread sleeps a moment and tries again. \r
+ * \r
+ * If all locks cannot be locked at once, this method doesn't prevent \r
+ * other from locking them in the mean time.\r
+ * \r
+ * [UNTESTED]\r
+ * \r
+ * @param locks1 an array of locks. null values are ignored\r
+ * @param locks2 an array of locks. null values are ignored\r
+ */\r
+ public static void lock2(Lock[] locks1, Lock[] locks2) {\r
+ int l1 = locks1.length;\r
+ int l2 = locks2.length;\r
+ int c = l1+l2;\r
+ if (l1==0 && l2==0) return;\r
+\r
+ while (true) {\r
+ int i = 0;\r
+ // Attempt to lock everything in locks 1 \r
+ for (;i<l1; i++) {\r
+ Lock l = locks1[i];\r
+ if (l==null) continue;\r
+ if ( !locks1[i].tryLock() ) break;\r
+ }\r
+ // Attempt to lock everything in locks 2\r
+ if (i==l1) {\r
+ for (;i<c; i++) {\r
+ Lock l = locks2[i];\r
+ if (l==null) continue;\r
+ if ( !locks2[i-l1].tryLock() ) break;\r
+ }\r
+ }\r
+ // All locked?\r
+ if (i==c) return;\r
+ // Cancel locks2\r
+ if (i>l1) {\r
+ for (int j=l1; j<i; j++) {\r
+ Lock l = locks2[j-l1];\r
+ if (l==null) continue;\r
+ l.unlock();\r
+ }\r
+ }\r
+ // Cancel locks 1\r
+ if (i>0) { \r
+ for (int j=0; j<i; j++) {\r
+ Lock l = locks1[j];\r
+ if (l==null) continue;\r
+ l.unlock();\r
+ }\r
+ }\r
+ try { NANOSECONDS.sleep(1); } catch (InterruptedException e) {}\r
+ }\r
+ } \r
+\r
+ /**\r
+ * Try to lock multiple locks simultaneously. If all locks cannot be locked the \r
+ * thread sleeps a moment and tries again. \r
+ * \r
+ * If all locks cannot be locked at once, this method doesn't prevent \r
+ * other from locking them in the mean time.\r
+ * \r
+ * @param locks an array of locks. null values are ignored\r
+ * @return \r
+ */\r
+ public static boolean tryLock(Lock...locks) {\r
+ if (locks.length==0) return true;\r
+ if (locks.length==1) {\r
+ return locks[0].tryLock();\r
+ }\r
+\r
+ int i = 0;\r
+ for (;i<locks.length; i++) {\r
+ Lock l = locks[i];\r
+ if (l==null) continue;\r
+ if ( !locks[i].tryLock() ) break;\r
+ }\r
+ if (i==locks.length) return true;\r
+\r
+ // Unlock & fail\r
+ for (int j=0; j<i; j++) {\r
+ Lock l = locks[j];\r
+ if (l==null) continue;\r
+ l.unlock();\r
+ }\r
+ return false;\r
+ }\r
+\r
+\r
+ /**\r
+ * Unlock multiple locks. Use this after locking multiple locks.\r
+ * \r
+ * @param locks an array of locks. Null values are ignored\r
+ */\r
+ public static void unlock(Lock...locks) {\r
+ for (Lock lock : locks) {\r
+ if (lock != null) lock.unlock();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Unlock multiple locks. Use this after locking multiple locks.\r
+ * \r
+ * @param locks1 an array of locks. Null values are ignored\r
+ * @param locsk2 an array of locks. Null values are ignored\r
+ */\r
+ public static void unlock2(Lock[] locks1, Lock[] locks2) {\r
+ for (Lock lock : locks1) {\r
+ if (lock != null) lock.unlock();\r
+ }\r
+ for (Lock lock : locks2) {\r
+ if (lock != null) lock.unlock();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Join multiple arrays into a single array\r
+ * \r
+ * @param lockArrays\r
+ * @return\r
+ */\r
+ public static Lock[] appendLockArrays(Lock[]...lockArrays) {\r
+ int len = 0;\r
+ for (Lock[] array : lockArrays) len +=array.length;\r
+ Lock[] result = new Lock[ len ];\r
+ int i = 0;\r
+ for (Lock[] array : lockArrays) {\r
+ System.arraycopy(array, 0, result, i, array.length);\r
+ i += array.length;\r
+ }\r
+ return result;\r
+ }\r
+\r
+ public static synchronized void shutdown() {\r
+ if (TIMER != null) {\r
+ //System.out.println("TIMERS");\r
+ shutdownAndAwaitTermination(TIMER, 1000);\r
+ TIMER = null;\r
+ }\r
+ if (NON_BLOCKING_EXECUTOR != null) {\r
+ //System.out.println("NON_BLOCKING");\r
+ shutdownAndAwaitTermination(NON_BLOCKING_EXECUTOR, 1000);\r
+ NON_BLOCKING_EXECUTOR = null;\r
+ }\r
+ if (BLOCKING_EXECUTOR != null) {\r
+ //System.out.println("BLOCKING");\r
+ shutdownAndAwaitTermination(BLOCKING_EXECUTOR, 1000);\r
+ BLOCKING_EXECUTOR = null;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Grabbed from {@link ExecutorService} javadoc.\r
+ * \r
+ * @param pool {@link ExecutorService} to shut down\r
+ */\r
+ private static void shutdownAndAwaitTermination(ExecutorService pool, long timeoutMs) {\r
+ //long t = System.currentTimeMillis();\r
+ pool.shutdown(); // Disable new tasks from being submitted\r
+ try {\r
+ // Wait a while for existing tasks to terminate\r
+ if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {\r
+ List<Runnable> leftovers = pool.shutdownNow(); // Cancel currently executing tasks\r
+ if (!leftovers.isEmpty())\r
+ System.err.println("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers);\r
+ // Wait a while for tasks to respond to being cancelled\r
+ if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS))\r
+ System.err.println("Thread pool '" + pool.toString() + "' did not terminate");\r
+ }\r
+ } catch (InterruptedException ie) {\r
+ // (Re-)Cancel if current thread also interrupted\r
+ pool.shutdownNow();\r
+ // Preserve interrupt status\r
+ Thread.currentThread().interrupt();\r
+ }\r
+ //long e = System.currentTimeMillis();\r
+ //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms");\r
+ }\r
+\r
+ @SuppressWarnings("unused")\r
+ private static void uncheckedAwaitTermination(ExecutorService service, long time) {\r
+ try {\r
+ NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS);\r
+ } catch (InterruptedException e) {\r
+ }\r
+ }\r
+\r
+ // Executor support\r
+\r
+ // Executor that runs in current thread\r
+ public static Executor CURRENT_THREAD = new CurrentThreadExecutor();\r
+\r
+ // Async executor queues the command into AWT event queue\r
+ public static ExecutorService AWT_EDT = AWTThread.INSTANCE;\r
+\r
+ // Sync executor blocks the call until the command is finished\r
+ public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync();\r
+\r
+}\r
+\r
+class AWTExecutorSync extends AbstractExecutorService {\r
+\r
+ @Override\r
+ public void execute(Runnable command) {\r
+ if (EventQueue.isDispatchThread())\r
+ {\r
+ command.run();\r
+ } else {\r
+ try {\r
+ EventQueue.invokeAndWait(command);\r
+ } catch (InterruptedException e) {\r
+ throw new RuntimeException(e);\r
+ } catch (InvocationTargetException e) {\r
+ throw new RuntimeException(e);\r
+ }\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void shutdown() {\r
+ }\r
+\r
+ @Override\r
+ public List<Runnable> shutdownNow() {\r
+ return null;\r
+ }\r
+\r
+ @Override\r
+ public boolean isShutdown() {\r
+ return false;\r
+ }\r
+\r
+ @Override\r
+ public boolean isTerminated() {\r
+ return false;\r
+ }\r
+\r
+ @Override\r
+ public boolean awaitTermination(long timeout, TimeUnit unit)\r
+ throws InterruptedException {\r
+ return false;\r
+ }\r
+ \r
+ public static String getStackTrace(int levels){\r
+ StringBuilder sb = new StringBuilder();\r
+ sb.append( Thread.currentThread() );\r
+ sb.append( "\n" );\r
+ Exception e3 = new Exception(); \r
+ try { throw e3; } catch(Exception e2) {\r
+ \r
+ for (int i=1; i<Math.max(e2.getStackTrace().length, levels+1); i++) {\r
+ StackTraceElement ste = e2.getStackTrace()[i];\r
+ sb.append(" - ");\r
+ sb.append(ste);\r
+ sb.append( "\n" );\r
+ } \r
+ }\r
+ return sb.toString();\r
+ }\r
+}\r
+\r
+class CurrentThreadExecutor implements Executor {\r
+ @Override\r
+ public void execute(Runnable command) {\r
+ command.run();\r
+ }\r
+}\r