-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-/*\r
- *\r
- * @author Toni Kalajainen\r
- */\r
-package org.simantics.utils.threads;\r
-\r
-import static java.util.concurrent.TimeUnit.NANOSECONDS;\r
-\r
-import java.awt.EventQueue;\r
-import java.lang.reflect.InvocationTargetException;\r
-import java.util.Collection;\r
-import java.util.Collections;\r
-import java.util.HashMap;\r
-import java.util.HashSet;\r
-import java.util.LinkedList;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.Set;\r
-import java.util.concurrent.AbstractExecutorService;\r
-import java.util.concurrent.Executor;\r
-import java.util.concurrent.ExecutorService;\r
-import java.util.concurrent.ScheduledExecutorService;\r
-import java.util.concurrent.ScheduledThreadPoolExecutor;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.ThreadFactory;\r
-import java.util.concurrent.TimeUnit;\r
-import java.util.concurrent.atomic.AtomicInteger;\r
-import java.util.concurrent.locks.Lock;\r
-\r
-/**\r
- * Utility for switching threads\r
- * \r
- * @see Executors2\r
- * @see IThreadWorkQueue\r
- */\r
-public class ThreadUtils {\r
-\r
- public static final int CORES = Runtime.getRuntime().availableProcessors();\r
-\r
- /**\r
- * Defines the maximum thread pool size of\r
- * {@link #getBlockingWorkExecutor()}. The system will enforce this value to\r
- * be at least Math.max(CORES, 8). It can be configured through the system\r
- * property <em>simantics.executor.blockingMaxThreads</em>.\r
- */\r
- public static final int MAX_BLOCKING_EXECUTOR_THREADS;\r
-\r
- static {\r
- String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES);\r
- int blockingMaxThreads = CORES;\r
- try {\r
- blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp);\r
- } catch (NumberFormatException e) {\r
- }\r
- MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES);\r
- }\r
-\r
- /** Never acquire non-blocking executor from this field, private use */\r
- public static ScheduledExecutorService NON_BLOCKING_EXECUTOR;\r
- /** Never acquire non-blocking executor from this field, private use */\r
- public static ExecutorService BLOCKING_EXECUTOR;\r
-\r
- static ScheduledExecutorService TIMER;\r
-\r
- /**\r
- * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread\r
- * \r
- * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread\r
- */\r
- public static CurrentThread getCurrentThread() {\r
- return CurrentThread.INSTANCE;\r
- }\r
-\r
- /**\r
- * Get a timer executor. Never add blocking or long-wait runnables to this scheduler \r
- * \r
- * @return Executor that executes a non-blocking work\r
- */\r
- public static synchronized ScheduledExecutorService getTimer() {\r
- if (TIMER == null) {\r
- final ThreadGroup tg = new ThreadGroup("Timer");\r
- final AtomicInteger counter = new AtomicInteger(0);\r
- ThreadFactory tf = new ThreadFactory() {\r
- @Override\r
- public Thread newThread(Runnable r) {\r
- Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet()));\r
- if (!t.isDaemon())\r
- t.setDaemon(true);\r
- if (t.getPriority() != Thread.NORM_PRIORITY)\r
- t.setPriority(Thread.NORM_PRIORITY);\r
- return t;\r
- }\r
- };\r
- TIMER = new ScheduledThreadPoolExecutor( 1, tf );\r
- }\r
- return TIMER;\r
- }\r
-\r
- /**\r
- * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks\r
- * anything, no semaphores, no synchronized() {} blocks. no Object.wait(). \r
- * \r
- * @return Executor that executes a non-blocking work\r
- */\r
- public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() {\r
- if (NON_BLOCKING_EXECUTOR == null) {\r
- final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group");\r
- final AtomicInteger counter = new AtomicInteger(0);\r
- ThreadFactory tf = new ThreadFactory() {\r
- @Override\r
- public Thread newThread(Runnable r) {\r
- Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet()));\r
- if (!t.isDaemon())\r
- t.setDaemon(true);\r
- if (t.getPriority() != Thread.NORM_PRIORITY)\r
- t.setPriority(Thread.NORM_PRIORITY);\r
- return t;\r
- }\r
- };\r
- NON_BLOCKING_EXECUTOR =\r
- new ScheduledThreadPoolExecutor( CORES, tf );\r
- }\r
- return NON_BLOCKING_EXECUTOR;\r
- }\r
-\r
- /**\r
- * Get Executor executes work that may lock or block. \r
- * \r
- * @return executor for blocking operations\r
- */ \r
- public static synchronized ExecutorService getBlockingWorkExecutor() {\r
- if (BLOCKING_EXECUTOR == null) {\r
- final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group");\r
- final AtomicInteger counter = new AtomicInteger(0);\r
- ThreadFactory tf = new ThreadFactory() {\r
- @Override\r
- public Thread newThread(Runnable r) {\r
- Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet()));\r
- if (!t.isDaemon())\r
- t.setDaemon(true);\r
- if (t.getPriority() != Thread.NORM_PRIORITY)\r
- t.setPriority(Thread.NORM_PRIORITY);\r
- return t;\r
- }\r
- };\r
- BLOCKING_EXECUTOR =\r
-// new ThreadPoolExecutor(\r
-// 0,\r
-// MAX_BLOCKING_EXECUTOR_THREADS,\r
-// 3L, TimeUnit.SECONDS,\r
-// new SynchronousQueue<Runnable>(),\r
-// tf);\r
- new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf);\r
- }\r
- return BLOCKING_EXECUTOR;\r
- }\r
-\r
- /**\r
- * Better thread access allows thread context switching back to waiting\r
- * threads.\r
- * \r
- * @param access\r
- * @return an enhanced version of the specified queue\r
- */\r
- public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access)\r
- {\r
- if (access instanceof BetterThreadAccess)\r
- return access;\r
- return new BetterThreadAccess(access);\r
- }\r
-\r
- static Map<Thread, WaitingThread> map =\r
- new HashMap<Thread, WaitingThread>();\r
-\r
- /**\r
- * Executes a runnable in thread synchronously.\r
- * If the thread locked and waits for this thread, then\r
- * the runnable is ran in that thread.\r
- * \r
- * This works only if all thread switching is done\r
- * using this method.\r
- * \r
- * @param threadAccess\r
- * @param runnable\r
- * @return true if thread accepted the runnable\r
- */\r
- public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable)\r
- {\r
- if (threadAccess instanceof BetterThreadAccess)\r
- threadAccess = ((BetterThreadAccess) threadAccess).ta;\r
-\r
- // Using current thread\r
- if (threadAccess.currentThreadAccess())\r
- {\r
- try {\r
- runnable.run();\r
- } catch (RuntimeException e) {\r
- handleRunnableError(e);\r
- }\r
- return true;\r
- }\r
-\r
- final Thread senderThread = Thread.currentThread();\r
- final WaitingThread wt = new WaitingThread(senderThread);\r
- WaitingThread prevWt;\r
- Event e = new Event(runnable, new EventListener() {\r
- @Override\r
- public void eventDone(Event e) {\r
- wt.completed(e);\r
- }}, null);\r
-\r
- synchronized (ThreadUtils.class) {\r
- // Check if target has WaitingThread. Use it if it does\r
- WaitingThread targetWt = getWaitingThread(threadAccess.getThread());\r
- Thread waitingForThread = null;\r
- if (targetWt != null)\r
- {\r
- // Check if it is allowed to use the target WT\r
- if (isEventQueuingAllowed(senderThread, targetWt))\r
- {\r
- if (targetWt.addEvent(e)) {\r
- synchronized(wt) {\r
- waitingForThread = targetWt.thread;\r
- e.setThread(waitingForThread);\r
- wt.waitFor(e);\r
- }\r
- }\r
- }\r
- }\r
-\r
- if (waitingForThread == null) {\r
- synchronized(wt) {\r
- waitingForThread = threadAccess.asyncExec(e);\r
- if (waitingForThread==null) return false;\r
- e.setThread(waitingForThread);\r
- wt.waitFor(e);\r
- }\r
- }\r
-\r
- prevWt = setWaitingThread(senderThread, wt);\r
- }\r
- // run errands in the mean time\r
- wt.waitAndProcessEvents();\r
- wt.stopAcceptingEvents();\r
- // Stop accepting errands\r
- removeWaitingThread(senderThread, prevWt);\r
- // Run last events\r
- wt.waitAndProcessEvents();\r
- return true;\r
- }\r
-\r
- public static boolean multiSyncExec(Collection<Executable> executions)\r
- {\r
- if (executions.isEmpty()) return true;\r
- return multiSyncExec(executions.toArray(new Executable[executions.size()]));\r
- }\r
-\r
- /**\r
- * Executes a list of executables in multiple threads and waits for all to complete.\r
- * \r
- * @param executions\r
- * @return <code>true</code> once execution has completed (is this necessary?)\r
- */\r
- public static boolean multiSyncExec(Executable ... executions)\r
- {\r
- if (executions.length==0) return true;\r
- if (executions.length==1) {\r
- return syncExec(executions[0].threadAccess, executions[0].runnable);\r
- }\r
- final Thread senderThread = Thread.currentThread();\r
- final WaitingThread wt = new WaitingThread(senderThread);\r
- WaitingThread prevWt = null;\r
- synchronized (ThreadUtils.class) {\r
- for (Executable pair : executions)\r
- {\r
- IThreadWorkQueue threadAccess = pair.threadAccess;\r
- if (threadAccess.currentThreadAccess())\r
- continue;\r
- if (threadAccess instanceof BetterThreadAccess)\r
- threadAccess = ((BetterThreadAccess) threadAccess).ta;\r
- Runnable runnable = pair.runnable;\r
-\r
- Event e = new Event(runnable, new EventListener() {\r
- @Override\r
- public void eventDone(Event e) {\r
- wt.completed(e);\r
- }}, null);\r
-\r
- // Check if target has WaitingThread. Use it if it does\r
- WaitingThread targetWt = getWaitingThread(threadAccess.getThread());\r
- Thread waitingForThread = null;\r
- if (targetWt != null)\r
- {\r
- // Check if it is allowed to use the target WT\r
- if (isEventQueuingAllowed(senderThread, targetWt))\r
- {\r
- if (targetWt.addEvent(e)) {\r
- synchronized(wt) {\r
- waitingForThread = targetWt.thread;\r
- e.setThread(waitingForThread);\r
- wt.waitFor(e);\r
- }\r
- }\r
- }\r
- }\r
-\r
- if (waitingForThread == null) {\r
- synchronized(wt) {\r
- waitingForThread = threadAccess.asyncExec(e);\r
- if (waitingForThread==null)\r
- return false;\r
- e.setThread(waitingForThread);\r
- wt.waitFor(e);\r
- }\r
- }\r
- }\r
- prevWt = setWaitingThread(senderThread, wt);\r
- }\r
-\r
- // Run local runnables\r
- for (Executable pair : executions)\r
- {\r
- IThreadWorkQueue threadAccess = pair.threadAccess;\r
- Runnable runnable = pair.runnable;\r
- if (threadAccess.currentThreadAccess())\r
- try {\r
- runnable.run();\r
- } catch (RuntimeException e) {\r
- handleRunnableError(e);\r
- }\r
- }\r
-\r
- // run errands in the mean time\r
- wt.waitAndProcessEvents();\r
- // Stop accepting errands\r
- wt.stopAcceptingEvents();\r
- removeWaitingThread(senderThread, prevWt);\r
- // Run last events\r
- wt.waitAndProcessEvents();\r
- return true;\r
- }\r
-\r
- public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable)\r
- {\r
- if (threadAccess instanceof BetterThreadAccess)\r
- threadAccess = ((BetterThreadAccess) threadAccess).ta;\r
-\r
- final Thread senderThread = Thread.currentThread();\r
- synchronized (ThreadUtils.class) {\r
-\r
- Event e = new Event(runnable, null, null);\r
-\r
- // Check if target has WaitingThread. Use it if it does\r
- WaitingThread targetWt = getWaitingThread(threadAccess.getThread());\r
- if (targetWt != null)\r
- {\r
- // Check if it is allowed to use the target WT\r
- if (isEventQueuingAllowed(senderThread, targetWt))\r
- {\r
- if (targetWt.addEvent(e))\r
- return targetWt.thread;\r
- }\r
- }\r
- return threadAccess.asyncExec(runnable);\r
- }\r
- }\r
-\r
- private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set<Thread> visitedTargetThreads)\r
- {\r
- assert(targetThread!=null);\r
- if (visitedTargetThreads.contains(targetThread))\r
- return false;\r
- visitedTargetThreads.add(targetThread);\r
- if (sourceThread == targetThread) return false;\r
- Set<Thread> waitsFor = getWaitsForThreads(targetThread);\r
- if (waitsFor==null||waitsFor.isEmpty()) return false;\r
- for (Thread aThreadTargetThreadWaitsFor : waitsFor)\r
- {\r
- if (aThreadTargetThreadWaitsFor==sourceThread) return true;\r
- if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue;\r
- if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads))\r
- return true;\r
- }\r
- return false;\r
- }\r
-\r
- static boolean waitsFor(Thread sourceThread, Thread targetThread)\r
- {\r
- return _waitsFor(sourceThread, targetThread, new HashSet<Thread>(3));\r
- }\r
-\r
- static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue)\r
- {\r
- if (!eventQueue.acceptEvents) return false;\r
- // queuing is allowed if target thread does wait for source thread\r
- return waitsFor(sourceThread, eventQueue.thread);\r
- }\r
-\r
- private static void handleRunnableError(Throwable t)\r
- {\r
- t.printStackTrace();\r
- }\r
-\r
- interface EventListener {\r
- void eventDone(Event e);\r
- }\r
-\r
- public static class Event implements Runnable {\r
- Runnable r;\r
- EventListener l;\r
- Semaphore s;\r
- Thread t;\r
- public Event(Runnable r, EventListener l, Semaphore s) {\r
- this.r = r;\r
- this.l = l;\r
- this.s = s;\r
- }\r
- public void run() {\r
- setThread(Thread.currentThread());\r
- try {\r
- r.run();\r
- } catch (RuntimeException e) {\r
- handleRunnableError(e);\r
- } finally {\r
- if (s!=null)\r
- s.release(1);\r
- if (l!=null)\r
- l.eventDone(this);\r
- }\r
- }\r
- public synchronized Thread getThread()\r
- {\r
- while (t==null) {\r
- try {\r
- t.wait();\r
- } catch (InterruptedException e) {\r
- }\r
- }\r
- return t;\r
- }\r
- public synchronized void setThread(Thread t)\r
- {\r
- assert(t!=null);\r
- if (this.t!=null)\r
- assert(this.t==t);\r
- this.t = t;\r
- notify();\r
- }\r
- }\r
-\r
- /**\r
- * WaitingThread is a thread that waits for something and in the meantime\r
- * runs errands.\r
- */\r
- static class WaitingThread\r
- {\r
- final Thread thread;\r
- LinkedList<Event> queue = new LinkedList<Event>();\r
- boolean acceptEvents = true;\r
- Set<Event> waitingFor = new HashSet<Event>();\r
- Set<Event> completed = new HashSet<Event>();\r
- public WaitingThread(Thread thread) {\r
- this.thread = thread;\r
- }\r
- public synchronized void waitFor(Event event)\r
- {\r
- assert(thread!=null);\r
- waitingFor.add(event);\r
- }\r
- public synchronized void completed(Event event)\r
- {\r
- //assert(waitingFor.contains(event));\r
- completed.add(event);\r
- if (completed.size()==waitingFor.size())\r
- notify();\r
- }\r
- synchronized boolean isEmpty() {\r
- return queue.isEmpty();\r
- }\r
- synchronized boolean keepWaiting() {\r
- if(waitingFor.size()!=completed.size())\r
- return true;\r
- assert(waitingFor.equals(completed));\r
- return false;\r
- }\r
- public synchronized boolean addEvent(Event r)\r
- {\r
- if (!acceptEvents) return false;\r
- queue.add(r);\r
- notify();\r
- return true;\r
- }\r
- \r
- static private int WAIT_MS = 10000;\r
- static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS;\r
- \r
- public void waitAndProcessEvents()\r
- {\r
- while (keepWaiting() || !isEmpty()) {\r
- Event e = null;\r
- synchronized(this)\r
- {\r
- if (!queue.isEmpty())\r
- e = queue.pop();\r
- if (e==null && keepWaiting())\r
- try {\r
- long now = System.nanoTime();\r
- wait(WAIT_MS);\r
- long duration = System.nanoTime()-now;\r
- if(duration > (WAIT_THRESHOLD_NS)) {\r
- for(Thread t : getWaitingForThreads())\r
- if(!t.isAlive())\r
- throw new IllegalStateException("Thread '" + thread + "' has died.");\r
- }\r
- } catch (InterruptedException e1) {\r
- }\r
- }\r
- if (e!=null) {\r
- try {\r
- e.run();\r
- } catch (RuntimeException e1) {\r
- e1.printStackTrace();\r
- }\r
- }\r
- }\r
- while (!isEmpty())\r
- {\r
- Event e = null;\r
- synchronized(this)\r
- {\r
- if (!queue.isEmpty())\r
- e = queue.pop();\r
- }\r
- if (e!=null)\r
- try {\r
- e.run();\r
- } catch (RuntimeException e1) {\r
- e1.printStackTrace();\r
- }\r
- }\r
-\r
- }\r
- public synchronized void stopAcceptingEvents()\r
- {\r
- acceptEvents = false;\r
- }\r
- public synchronized Set<Thread> getWaitingForThreads()\r
- {\r
- Set<Thread> result = new HashSet<Thread>(waitingFor.size());\r
- for (Event e : waitingFor)\r
- {\r
- if (completed.contains(e)) continue;\r
- result.add(e.getThread());\r
- }\r
- return result;\r
- }\r
- }\r
-\r
- /**\r
- * Blocks until waiting thread has been set for thread t\r
- * @param t thread\r
- * @return waiting thread\r
- */\r
- synchronized static WaitingThread getWaitingThreadSync(Thread t)\r
- {\r
- WaitingThread result;\r
- do {\r
- result = map.get(t);\r
- if (result!=null) return result;\r
- try {\r
- ThreadUtils.class.wait();\r
- } catch (InterruptedException e) {\r
- }\r
- } while (true);\r
- }\r
-\r
- /**\r
- * Return an array which may have nulls\r
- * @param t\r
- * @return\r
- */\r
- static Set<Thread> getWaitsForThreads(Thread t)\r
- {\r
- WaitingThread wt = getWaitingThread(t);\r
- if (wt==null) return null;\r
- return wt.getWaitingForThreads();\r
- }\r
-\r
- static synchronized WaitingThread getWaitingThread(Thread t)\r
- {\r
- return map.get(t);\r
- }\r
-\r
- static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt)\r
- {\r
- WaitingThread prev = map.put(t, wt);\r
- ThreadUtils.class.notifyAll();\r
- return prev;\r
- }\r
-\r
- static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith)\r
- {\r
- assert(t == Thread.currentThread());\r
- map.remove(t);\r
- if (replaceWith!=null)\r
- map.put(t, replaceWith);\r
- }\r
-\r
- static class BetterThreadAccess implements IThreadWorkQueue {\r
- IThreadWorkQueue ta;\r
- public BetterThreadAccess(IThreadWorkQueue ta)\r
- {\r
- if (ta instanceof BetterThreadAccess)\r
- ta = ((BetterThreadAccess) ta).ta;\r
- this.ta = ta;\r
- }\r
- @Override\r
- public Thread asyncExec(Runnable runnable) {\r
- return ThreadUtils.asyncExec(ta, runnable);\r
- }\r
- @Override\r
- public boolean currentThreadAccess() {\r
- return ta.currentThreadAccess();\r
- }\r
- @Override\r
- public Thread getThread() {\r
- return ta.getThread();\r
- }\r
- @Override\r
- public boolean syncExec(Runnable runnable) {\r
- return ThreadUtils.syncExec(ta, runnable);\r
- }\r
- }\r
-\r
-\r
-\r
- /**\r
- * Executes command in the executor while maintaining possibility to\r
- * return back to caller.\r
- * \r
- * @param executor\r
- * @param command\r
- */\r
- public static void exec(Executor executor, final Runnable command)\r
- {\r
- final Thread[] calleeThread = new Thread[1];\r
- final Thread callerThread = Thread.currentThread();\r
- final Runnable wrappedCommand = new Runnable() {\r
- @Override\r
- public void run() {\r
- calleeThread[0] = Thread.currentThread();\r
- try {\r
- command.run();\r
- } finally {\r
- }\r
- }\r
- };\r
-\r
- // add dependency\r
- DEPENDENCIES.put(callerThread, calleeThread);\r
- executor.execute(wrappedCommand);\r
- DEPENDENCIES.remove(callerThread);\r
- }\r
- private static Map<Thread, Thread[]> DEPENDENCIES =\r
- Collections.synchronizedMap( new HashMap<Thread, Thread[]>() );\r
- @SuppressWarnings("unused")\r
- private static boolean hasDependency(Thread waiter, Thread worker)\r
- {\r
- // Follow dependency chain until worker is reached\r
- for (Thread t = waiter; t!=null;)\r
- {\r
- Thread[] potentialResult = DEPENDENCIES.get(t);\r
- if (potentialResult==null) break;\r
- t = potentialResult[0];\r
- if (t==worker) return true;\r
- }\r
- return false;\r
- }\r
- \r
- /**\r
- * Lock multiple locks simultaneously. If all locks cannot be locked the \r
- * thread sleeps a moment and tries again. \r
- * \r
- * If all locks cannot be locked at once, this method doesn't prevent \r
- * other from locking them in the mean time.\r
- * \r
- * @param locks an array of locks. null values are ignored\r
- */\r
- public static void lock(Lock...locks) {\r
- if (locks.length==0) return;\r
- if (locks.length==1) {\r
- locks[0].lock();\r
- return;\r
- }\r
-\r
- while (true) {\r
- int i = 0;\r
- for (;i<locks.length; i++) {\r
- Lock l = locks[i];\r
- if (l==null) continue;\r
- if ( !locks[i].tryLock() ) break;\r
- }\r
- if (i==locks.length) return;\r
- for (int j=0; j<i; j++) {\r
- Lock l = locks[j];\r
- if (l==null) continue;\r
- l.unlock();\r
- }\r
- try { NANOSECONDS.sleep(10000); } catch (InterruptedException e) {}\r
- }\r
- }\r
-\r
- /**\r
- * Lock multiple locks simultaneously. If all locks cannot be locked the \r
- * thread sleeps a moment and tries again. \r
- * \r
- * If all locks cannot be locked at once, this method doesn't prevent \r
- * other from locking them in the mean time.\r
- * \r
- * [UNTESTED]\r
- * \r
- * @param locks1 an array of locks. null values are ignored\r
- * @param locks2 an array of locks. null values are ignored\r
- */\r
- public static void lock2(Lock[] locks1, Lock[] locks2) {\r
- int l1 = locks1.length;\r
- int l2 = locks2.length;\r
- int c = l1+l2;\r
- if (l1==0 && l2==0) return;\r
-\r
- while (true) {\r
- int i = 0;\r
- // Attempt to lock everything in locks 1 \r
- for (;i<l1; i++) {\r
- Lock l = locks1[i];\r
- if (l==null) continue;\r
- if ( !locks1[i].tryLock() ) break;\r
- }\r
- // Attempt to lock everything in locks 2\r
- if (i==l1) {\r
- for (;i<c; i++) {\r
- Lock l = locks2[i];\r
- if (l==null) continue;\r
- if ( !locks2[i-l1].tryLock() ) break;\r
- }\r
- }\r
- // All locked?\r
- if (i==c) return;\r
- // Cancel locks2\r
- if (i>l1) {\r
- for (int j=l1; j<i; j++) {\r
- Lock l = locks2[j-l1];\r
- if (l==null) continue;\r
- l.unlock();\r
- }\r
- }\r
- // Cancel locks 1\r
- if (i>0) { \r
- for (int j=0; j<i; j++) {\r
- Lock l = locks1[j];\r
- if (l==null) continue;\r
- l.unlock();\r
- }\r
- }\r
- try { NANOSECONDS.sleep(1); } catch (InterruptedException e) {}\r
- }\r
- } \r
-\r
- /**\r
- * Try to lock multiple locks simultaneously. If all locks cannot be locked the \r
- * thread sleeps a moment and tries again. \r
- * \r
- * If all locks cannot be locked at once, this method doesn't prevent \r
- * other from locking them in the mean time.\r
- * \r
- * @param locks an array of locks. null values are ignored\r
- * @return \r
- */\r
- public static boolean tryLock(Lock...locks) {\r
- if (locks.length==0) return true;\r
- if (locks.length==1) {\r
- return locks[0].tryLock();\r
- }\r
-\r
- int i = 0;\r
- for (;i<locks.length; i++) {\r
- Lock l = locks[i];\r
- if (l==null) continue;\r
- if ( !locks[i].tryLock() ) break;\r
- }\r
- if (i==locks.length) return true;\r
-\r
- // Unlock & fail\r
- for (int j=0; j<i; j++) {\r
- Lock l = locks[j];\r
- if (l==null) continue;\r
- l.unlock();\r
- }\r
- return false;\r
- }\r
-\r
-\r
- /**\r
- * Unlock multiple locks. Use this after locking multiple locks.\r
- * \r
- * @param locks an array of locks. Null values are ignored\r
- */\r
- public static void unlock(Lock...locks) {\r
- for (Lock lock : locks) {\r
- if (lock != null) lock.unlock();\r
- }\r
- }\r
-\r
- /**\r
- * Unlock multiple locks. Use this after locking multiple locks.\r
- * \r
- * @param locks1 an array of locks. Null values are ignored\r
- * @param locsk2 an array of locks. Null values are ignored\r
- */\r
- public static void unlock2(Lock[] locks1, Lock[] locks2) {\r
- for (Lock lock : locks1) {\r
- if (lock != null) lock.unlock();\r
- }\r
- for (Lock lock : locks2) {\r
- if (lock != null) lock.unlock();\r
- }\r
- }\r
-\r
- /**\r
- * Join multiple arrays into a single array\r
- * \r
- * @param lockArrays\r
- * @return\r
- */\r
- public static Lock[] appendLockArrays(Lock[]...lockArrays) {\r
- int len = 0;\r
- for (Lock[] array : lockArrays) len +=array.length;\r
- Lock[] result = new Lock[ len ];\r
- int i = 0;\r
- for (Lock[] array : lockArrays) {\r
- System.arraycopy(array, 0, result, i, array.length);\r
- i += array.length;\r
- }\r
- return result;\r
- }\r
-\r
- public static synchronized void shutdown() {\r
- if (TIMER != null) {\r
- //System.out.println("TIMERS");\r
- shutdownAndAwaitTermination(TIMER, 1000);\r
- TIMER = null;\r
- }\r
- if (NON_BLOCKING_EXECUTOR != null) {\r
- //System.out.println("NON_BLOCKING");\r
- shutdownAndAwaitTermination(NON_BLOCKING_EXECUTOR, 1000);\r
- NON_BLOCKING_EXECUTOR = null;\r
- }\r
- if (BLOCKING_EXECUTOR != null) {\r
- //System.out.println("BLOCKING");\r
- shutdownAndAwaitTermination(BLOCKING_EXECUTOR, 1000);\r
- BLOCKING_EXECUTOR = null;\r
- }\r
- }\r
-\r
- /**\r
- * Grabbed from {@link ExecutorService} javadoc.\r
- * \r
- * @param pool {@link ExecutorService} to shut down\r
- */\r
- private static void shutdownAndAwaitTermination(ExecutorService pool, long timeoutMs) {\r
- //long t = System.currentTimeMillis();\r
- pool.shutdown(); // Disable new tasks from being submitted\r
- try {\r
- // Wait a while for existing tasks to terminate\r
- if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {\r
- List<Runnable> leftovers = pool.shutdownNow(); // Cancel currently executing tasks\r
- if (!leftovers.isEmpty())\r
- System.err.println("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers);\r
- // Wait a while for tasks to respond to being cancelled\r
- if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS))\r
- System.err.println("Thread pool '" + pool.toString() + "' did not terminate");\r
- }\r
- } catch (InterruptedException ie) {\r
- // (Re-)Cancel if current thread also interrupted\r
- pool.shutdownNow();\r
- // Preserve interrupt status\r
- Thread.currentThread().interrupt();\r
- }\r
- //long e = System.currentTimeMillis();\r
- //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms");\r
- }\r
-\r
- @SuppressWarnings("unused")\r
- private static void uncheckedAwaitTermination(ExecutorService service, long time) {\r
- try {\r
- NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS);\r
- } catch (InterruptedException e) {\r
- }\r
- }\r
-\r
- // Executor support\r
-\r
- // Executor that runs in current thread\r
- public static Executor CURRENT_THREAD = new CurrentThreadExecutor();\r
-\r
- // Async executor queues the command into AWT event queue\r
- public static ExecutorService AWT_EDT = AWTThread.INSTANCE;\r
-\r
- // Sync executor blocks the call until the command is finished\r
- public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync();\r
-\r
-}\r
-\r
-class AWTExecutorSync extends AbstractExecutorService {\r
-\r
- @Override\r
- public void execute(Runnable command) {\r
- if (EventQueue.isDispatchThread())\r
- {\r
- command.run();\r
- } else {\r
- try {\r
- EventQueue.invokeAndWait(command);\r
- } catch (InterruptedException e) {\r
- throw new RuntimeException(e);\r
- } catch (InvocationTargetException e) {\r
- throw new RuntimeException(e.getCause());\r
- }\r
- }\r
- }\r
-\r
- @Override\r
- public void shutdown() {\r
- }\r
-\r
- @Override\r
- public List<Runnable> shutdownNow() {\r
- return null;\r
- }\r
-\r
- @Override\r
- public boolean isShutdown() {\r
- return false;\r
- }\r
-\r
- @Override\r
- public boolean isTerminated() {\r
- return false;\r
- }\r
-\r
- @Override\r
- public boolean awaitTermination(long timeout, TimeUnit unit)\r
- throws InterruptedException {\r
- return false;\r
- }\r
- \r
- public static String getStackTrace(int levels){\r
- StringBuilder sb = new StringBuilder();\r
- sb.append( Thread.currentThread() );\r
- sb.append( "\n" );\r
- Exception e3 = new Exception(); \r
- try { throw e3; } catch(Exception e2) {\r
- \r
- for (int i=1; i<Math.max(e2.getStackTrace().length, levels+1); i++) {\r
- StackTraceElement ste = e2.getStackTrace()[i];\r
- sb.append(" - ");\r
- sb.append(ste);\r
- sb.append( "\n" );\r
- } \r
- }\r
- return sb.toString();\r
- }\r
-}\r
-\r
-class CurrentThreadExecutor implements Executor {\r
- @Override\r
- public void execute(Runnable command) {\r
- command.run();\r
- }\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+/*
+ *
+ * @author Toni Kalajainen
+ */
+package org.simantics.utils.threads;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import java.awt.EventQueue;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility for switching threads
+ *
+ * @see Executors2
+ * @see IThreadWorkQueue
+ */
+public class ThreadUtils {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ThreadUtils.class);
+ public static final int CORES = Runtime.getRuntime().availableProcessors();
+
+ /**
+ * Defines the maximum thread pool size of
+ * {@link #getBlockingWorkExecutor()}. The system will enforce this value to
+ * be at least Math.max(CORES, 8). It can be configured through the system
+ * property <em>simantics.executor.blockingMaxThreads</em>.
+ */
+ public static final int MAX_BLOCKING_EXECUTOR_THREADS;
+
+ static {
+ String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES);
+ int blockingMaxThreads = CORES;
+ try {
+ blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp);
+ } catch (NumberFormatException e) {
+ }
+ MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES);
+ }
+
+ /** Never acquire non-blocking executor from this field, private use */
+ public static ScheduledExecutorService NON_BLOCKING_EXECUTOR;
+ /** Never acquire non-blocking executor from this field, private use */
+ public static ExecutorService BLOCKING_EXECUTOR;
+
+ static ScheduledExecutorService TIMER;
+
+ /**
+ * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread
+ *
+ * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread
+ */
+ public static CurrentThread getCurrentThread() {
+ return CurrentThread.INSTANCE;
+ }
+
+ /**
+ * Get a timer executor. Never add blocking or long-wait runnables to this scheduler
+ *
+ * @return Executor that executes a non-blocking work
+ */
+ public static synchronized ScheduledExecutorService getTimer() {
+ if (TIMER == null) {
+ final ThreadGroup tg = new ThreadGroup("Timer");
+ final AtomicInteger counter = new AtomicInteger(0);
+ ThreadFactory tf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet()));
+ if (!t.isDaemon())
+ t.setDaemon(true);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ };
+ TIMER = new ScheduledThreadPoolExecutor( 1, tf );
+ }
+ return TIMER;
+ }
+
+ /**
+ * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks
+ * anything, no semaphores, no synchronized() {} blocks. no Object.wait().
+ *
+ * @return Executor that executes a non-blocking work
+ */
+ public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() {
+ if (NON_BLOCKING_EXECUTOR == null) {
+ final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group");
+ final AtomicInteger counter = new AtomicInteger(0);
+ ThreadFactory tf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet()));
+ if (!t.isDaemon())
+ t.setDaemon(true);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ };
+ NON_BLOCKING_EXECUTOR =
+ new ScheduledThreadPoolExecutor( CORES, tf );
+ }
+ return NON_BLOCKING_EXECUTOR;
+ }
+
+ /**
+ * Get Executor executes work that may lock or block.
+ *
+ * @return executor for blocking operations
+ */
+ public static synchronized ExecutorService getBlockingWorkExecutor() {
+ if (BLOCKING_EXECUTOR == null) {
+ final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group");
+ final AtomicInteger counter = new AtomicInteger(0);
+ ThreadFactory tf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet()));
+ if (!t.isDaemon())
+ t.setDaemon(true);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ };
+ BLOCKING_EXECUTOR =
+// new ThreadPoolExecutor(
+// 0,
+// MAX_BLOCKING_EXECUTOR_THREADS,
+// 3L, TimeUnit.SECONDS,
+// new SynchronousQueue<Runnable>(),
+// tf);
+ new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf);
+ }
+ return BLOCKING_EXECUTOR;
+ }
+
+ /**
+ * Better thread access allows thread context switching back to waiting
+ * threads.
+ *
+ * @param access
+ * @return an enhanced version of the specified queue
+ */
+ public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access)
+ {
+ if (access instanceof BetterThreadAccess)
+ return access;
+ return new BetterThreadAccess(access);
+ }
+
+ static Map<Thread, WaitingThread> map =
+ new HashMap<Thread, WaitingThread>();
+
+ /**
+ * Executes a runnable in thread synchronously.
+ * If the thread locked and waits for this thread, then
+ * the runnable is ran in that thread.
+ *
+ * This works only if all thread switching is done
+ * using this method.
+ *
+ * @param threadAccess
+ * @param runnable
+ * @return true if thread accepted the runnable
+ */
+ public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
+ {
+ if (threadAccess instanceof BetterThreadAccess)
+ threadAccess = ((BetterThreadAccess) threadAccess).ta;
+
+ // Using current thread
+ if (threadAccess.currentThreadAccess())
+ {
+ try {
+ runnable.run();
+ } catch (RuntimeException e) {
+ handleRunnableError(e);
+ }
+ return true;
+ }
+
+ final Thread senderThread = Thread.currentThread();
+ final WaitingThread wt = new WaitingThread(senderThread);
+ WaitingThread prevWt;
+ Event e = new Event(runnable, new EventListener() {
+ @Override
+ public void eventDone(Event e) {
+ wt.completed(e);
+ }}, null);
+
+ synchronized (ThreadUtils.class) {
+ // Check if target has WaitingThread. Use it if it does
+ WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
+ Thread waitingForThread = null;
+ if (targetWt != null)
+ {
+ // Check if it is allowed to use the target WT
+ if (isEventQueuingAllowed(senderThread, targetWt))
+ {
+ if (targetWt.addEvent(e)) {
+ synchronized(wt) {
+ waitingForThread = targetWt.thread;
+ e.setThread(waitingForThread);
+ wt.waitFor(e);
+ }
+ }
+ }
+ }
+
+ if (waitingForThread == null) {
+ synchronized(wt) {
+ waitingForThread = threadAccess.asyncExec(e);
+ if (waitingForThread==null) return false;
+ e.setThread(waitingForThread);
+ wt.waitFor(e);
+ }
+ }
+
+ prevWt = setWaitingThread(senderThread, wt);
+ }
+ // run errands in the mean time
+ wt.waitAndProcessEvents();
+ wt.stopAcceptingEvents();
+ // Stop accepting errands
+ removeWaitingThread(senderThread, prevWt);
+ // Run last events
+ wt.waitAndProcessEvents();
+ return true;
+ }
+
+ public static boolean multiSyncExec(Collection<Executable> executions)
+ {
+ if (executions.isEmpty()) return true;
+ return multiSyncExec(executions.toArray(new Executable[executions.size()]));
+ }
+
+ /**
+ * Executes a list of executables in multiple threads and waits for all to complete.
+ *
+ * @param executions
+ * @return <code>true</code> once execution has completed (is this necessary?)
+ */
+ public static boolean multiSyncExec(Executable ... executions)
+ {
+ if (executions.length==0) return true;
+ if (executions.length==1) {
+ return syncExec(executions[0].threadAccess, executions[0].runnable);
+ }
+ final Thread senderThread = Thread.currentThread();
+ final WaitingThread wt = new WaitingThread(senderThread);
+ WaitingThread prevWt = null;
+ synchronized (ThreadUtils.class) {
+ for (Executable pair : executions)
+ {
+ IThreadWorkQueue threadAccess = pair.threadAccess;
+ if (threadAccess.currentThreadAccess())
+ continue;
+ if (threadAccess instanceof BetterThreadAccess)
+ threadAccess = ((BetterThreadAccess) threadAccess).ta;
+ Runnable runnable = pair.runnable;
+
+ Event e = new Event(runnable, new EventListener() {
+ @Override
+ public void eventDone(Event e) {
+ wt.completed(e);
+ }}, null);
+
+ // Check if target has WaitingThread. Use it if it does
+ WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
+ Thread waitingForThread = null;
+ if (targetWt != null)
+ {
+ // Check if it is allowed to use the target WT
+ if (isEventQueuingAllowed(senderThread, targetWt))
+ {
+ if (targetWt.addEvent(e)) {
+ synchronized(wt) {
+ waitingForThread = targetWt.thread;
+ e.setThread(waitingForThread);
+ wt.waitFor(e);
+ }
+ }
+ }
+ }
+
+ if (waitingForThread == null) {
+ synchronized(wt) {
+ waitingForThread = threadAccess.asyncExec(e);
+ if (waitingForThread==null)
+ return false;
+ e.setThread(waitingForThread);
+ wt.waitFor(e);
+ }
+ }
+ }
+ prevWt = setWaitingThread(senderThread, wt);
+ }
+
+ // Run local runnables
+ for (Executable pair : executions)
+ {
+ IThreadWorkQueue threadAccess = pair.threadAccess;
+ Runnable runnable = pair.runnable;
+ if (threadAccess.currentThreadAccess())
+ try {
+ runnable.run();
+ } catch (RuntimeException e) {
+ handleRunnableError(e);
+ }
+ }
+
+ // run errands in the mean time
+ wt.waitAndProcessEvents();
+ // Stop accepting errands
+ wt.stopAcceptingEvents();
+ removeWaitingThread(senderThread, prevWt);
+ // Run last events
+ wt.waitAndProcessEvents();
+ return true;
+ }
+
+ public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable)
+ {
+ if (threadAccess instanceof BetterThreadAccess)
+ threadAccess = ((BetterThreadAccess) threadAccess).ta;
+
+ final Thread senderThread = Thread.currentThread();
+ synchronized (ThreadUtils.class) {
+
+ Event e = new Event(runnable, null, null);
+
+ // Check if target has WaitingThread. Use it if it does
+ WaitingThread targetWt = getWaitingThread(threadAccess.getThread());
+ if (targetWt != null)
+ {
+ // Check if it is allowed to use the target WT
+ if (isEventQueuingAllowed(senderThread, targetWt))
+ {
+ if (targetWt.addEvent(e))
+ return targetWt.thread;
+ }
+ }
+ return threadAccess.asyncExec(runnable);
+ }
+ }
+
+ private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set<Thread> visitedTargetThreads)
+ {
+ assert(targetThread!=null);
+ if (visitedTargetThreads.contains(targetThread))
+ return false;
+ visitedTargetThreads.add(targetThread);
+ if (sourceThread == targetThread) return false;
+ Set<Thread> waitsFor = getWaitsForThreads(targetThread);
+ if (waitsFor==null||waitsFor.isEmpty()) return false;
+ for (Thread aThreadTargetThreadWaitsFor : waitsFor)
+ {
+ if (aThreadTargetThreadWaitsFor==sourceThread) return true;
+ if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue;
+ if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads))
+ return true;
+ }
+ return false;
+ }
+
+ static boolean waitsFor(Thread sourceThread, Thread targetThread)
+ {
+ return _waitsFor(sourceThread, targetThread, new HashSet<Thread>(3));
+ }
+
+ static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue)
+ {
+ if (!eventQueue.acceptEvents) return false;
+ // queuing is allowed if target thread does wait for source thread
+ return waitsFor(sourceThread, eventQueue.thread);
+ }
+
+ private static void handleRunnableError(Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ interface EventListener {
+ void eventDone(Event e);
+ }
+
+ public static class Event implements Runnable {
+ Runnable r;
+ EventListener l;
+ Semaphore s;
+ Thread t;
+ public Event(Runnable r, EventListener l, Semaphore s) {
+ this.r = r;
+ this.l = l;
+ this.s = s;
+ }
+ public void run() {
+ setThread(Thread.currentThread());
+ try {
+ r.run();
+ } catch (RuntimeException e) {
+ handleRunnableError(e);
+ } finally {
+ if (s!=null)
+ s.release(1);
+ if (l!=null)
+ l.eventDone(this);
+ }
+ }
+ public synchronized Thread getThread()
+ {
+ while (t==null) {
+ try {
+ t.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ return t;
+ }
+ public synchronized void setThread(Thread t)
+ {
+ assert(t!=null);
+ if (this.t!=null)
+ assert(this.t==t);
+ this.t = t;
+ notify();
+ }
+ }
+
+ /**
+ * WaitingThread is a thread that waits for something and in the meantime
+ * runs errands.
+ */
+ static class WaitingThread
+ {
+ final Thread thread;
+ LinkedList<Event> queue = new LinkedList<Event>();
+ boolean acceptEvents = true;
+ Set<Event> waitingFor = new HashSet<Event>();
+ Set<Event> completed = new HashSet<Event>();
+ public WaitingThread(Thread thread) {
+ this.thread = thread;
+ }
+ public synchronized void waitFor(Event event)
+ {
+ assert(thread!=null);
+ waitingFor.add(event);
+ }
+ public synchronized void completed(Event event)
+ {
+ //assert(waitingFor.contains(event));
+ completed.add(event);
+ if (completed.size()==waitingFor.size())
+ notify();
+ }
+ synchronized boolean isEmpty() {
+ return queue.isEmpty();
+ }
+ synchronized boolean keepWaiting() {
+ if(waitingFor.size()!=completed.size())
+ return true;
+ assert(waitingFor.equals(completed));
+ return false;
+ }
+ public synchronized boolean addEvent(Event r)
+ {
+ if (!acceptEvents) return false;
+ queue.add(r);
+ notify();
+ return true;
+ }
+
+ static private int WAIT_MS = 10000;
+ static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS;
+
+ public void waitAndProcessEvents()
+ {
+ while (keepWaiting() || !isEmpty()) {
+ Event e = null;
+ synchronized(this)
+ {
+ if (!queue.isEmpty())
+ e = queue.pop();
+ if (e==null && keepWaiting())
+ try {
+ long now = System.nanoTime();
+ wait(WAIT_MS);
+ long duration = System.nanoTime()-now;
+ if(duration > (WAIT_THRESHOLD_NS)) {
+ for(Thread t : getWaitingForThreads())
+ if(!t.isAlive())
+ throw new IllegalStateException("Thread '" + thread + "' has died.");
+ }
+ } catch (InterruptedException e1) {
+ }
+ }
+ if (e!=null) {
+ try {
+ e.run();
+ } catch (RuntimeException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+ while (!isEmpty())
+ {
+ Event e = null;
+ synchronized(this)
+ {
+ if (!queue.isEmpty())
+ e = queue.pop();
+ }
+ if (e!=null)
+ try {
+ e.run();
+ } catch (RuntimeException e1) {
+ e1.printStackTrace();
+ }
+ }
+
+ }
+ public synchronized void stopAcceptingEvents()
+ {
+ acceptEvents = false;
+ }
+ public synchronized Set<Thread> getWaitingForThreads()
+ {
+ Set<Thread> result = new HashSet<Thread>(waitingFor.size());
+ for (Event e : waitingFor)
+ {
+ if (completed.contains(e)) continue;
+ result.add(e.getThread());
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Blocks until waiting thread has been set for thread t
+ * @param t thread
+ * @return waiting thread
+ */
+ synchronized static WaitingThread getWaitingThreadSync(Thread t)
+ {
+ WaitingThread result;
+ do {
+ result = map.get(t);
+ if (result!=null) return result;
+ try {
+ ThreadUtils.class.wait();
+ } catch (InterruptedException e) {
+ }
+ } while (true);
+ }
+
+ /**
+ * Return an array which may have nulls
+ * @param t
+ * @return
+ */
+ static Set<Thread> getWaitsForThreads(Thread t)
+ {
+ WaitingThread wt = getWaitingThread(t);
+ if (wt==null) return null;
+ return wt.getWaitingForThreads();
+ }
+
+ static synchronized WaitingThread getWaitingThread(Thread t)
+ {
+ return map.get(t);
+ }
+
+ static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt)
+ {
+ WaitingThread prev = map.put(t, wt);
+ ThreadUtils.class.notifyAll();
+ return prev;
+ }
+
+ static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith)
+ {
+ assert(t == Thread.currentThread());
+ map.remove(t);
+ if (replaceWith!=null)
+ map.put(t, replaceWith);
+ }
+
+ static class BetterThreadAccess implements IThreadWorkQueue {
+ IThreadWorkQueue ta;
+ public BetterThreadAccess(IThreadWorkQueue ta)
+ {
+ if (ta instanceof BetterThreadAccess)
+ ta = ((BetterThreadAccess) ta).ta;
+ this.ta = ta;
+ }
+ @Override
+ public Thread asyncExec(Runnable runnable) {
+ return ThreadUtils.asyncExec(ta, runnable);
+ }
+ @Override
+ public boolean currentThreadAccess() {
+ return ta.currentThreadAccess();
+ }
+ @Override
+ public Thread getThread() {
+ return ta.getThread();
+ }
+ @Override
+ public boolean syncExec(Runnable runnable) {
+ return ThreadUtils.syncExec(ta, runnable);
+ }
+ }
+
+
+
+ /**
+ * Executes command in the executor while maintaining possibility to
+ * return back to caller.
+ *
+ * @param executor
+ * @param command
+ */
+ public static void exec(Executor executor, final Runnable command)
+ {
+ final Thread[] calleeThread = new Thread[1];
+ final Thread callerThread = Thread.currentThread();
+ final Runnable wrappedCommand = new Runnable() {
+ @Override
+ public void run() {
+ calleeThread[0] = Thread.currentThread();
+ try {
+ command.run();
+ } finally {
+ }
+ }
+ };
+
+ // add dependency
+ DEPENDENCIES.put(callerThread, calleeThread);
+ executor.execute(wrappedCommand);
+ DEPENDENCIES.remove(callerThread);
+ }
+ private static Map<Thread, Thread[]> DEPENDENCIES =
+ Collections.synchronizedMap( new HashMap<Thread, Thread[]>() );
+ @SuppressWarnings("unused")
+ private static boolean hasDependency(Thread waiter, Thread worker)
+ {
+ // Follow dependency chain until worker is reached
+ for (Thread t = waiter; t!=null;)
+ {
+ Thread[] potentialResult = DEPENDENCIES.get(t);
+ if (potentialResult==null) break;
+ t = potentialResult[0];
+ if (t==worker) return true;
+ }
+ return false;
+ }
+
+ /**
+ * Lock multiple locks simultaneously. If all locks cannot be locked the
+ * thread sleeps a moment and tries again.
+ *
+ * If all locks cannot be locked at once, this method doesn't prevent
+ * other from locking them in the mean time.
+ *
+ * @param locks an array of locks. null values are ignored
+ */
+ public static void lock(Lock...locks) {
+ if (locks.length==0) return;
+ if (locks.length==1) {
+ locks[0].lock();
+ return;
+ }
+
+ while (true) {
+ int i = 0;
+ for (;i<locks.length; i++) {
+ Lock l = locks[i];
+ if (l==null) continue;
+ if ( !locks[i].tryLock() ) break;
+ }
+ if (i==locks.length) return;
+ for (int j=0; j<i; j++) {
+ Lock l = locks[j];
+ if (l==null) continue;
+ l.unlock();
+ }
+ try { NANOSECONDS.sleep(10000); } catch (InterruptedException e) {}
+ }
+ }
+
+ /**
+ * Lock multiple locks simultaneously. If all locks cannot be locked the
+ * thread sleeps a moment and tries again.
+ *
+ * If all locks cannot be locked at once, this method doesn't prevent
+ * other from locking them in the mean time.
+ *
+ * [UNTESTED]
+ *
+ * @param locks1 an array of locks. null values are ignored
+ * @param locks2 an array of locks. null values are ignored
+ */
+ public static void lock2(Lock[] locks1, Lock[] locks2) {
+ int l1 = locks1.length;
+ int l2 = locks2.length;
+ int c = l1+l2;
+ if (l1==0 && l2==0) return;
+
+ while (true) {
+ int i = 0;
+ // Attempt to lock everything in locks 1
+ for (;i<l1; i++) {
+ Lock l = locks1[i];
+ if (l==null) continue;
+ if ( !locks1[i].tryLock() ) break;
+ }
+ // Attempt to lock everything in locks 2
+ if (i==l1) {
+ for (;i<c; i++) {
+ Lock l = locks2[i];
+ if (l==null) continue;
+ if ( !locks2[i-l1].tryLock() ) break;
+ }
+ }
+ // All locked?
+ if (i==c) return;
+ // Cancel locks2
+ if (i>l1) {
+ for (int j=l1; j<i; j++) {
+ Lock l = locks2[j-l1];
+ if (l==null) continue;
+ l.unlock();
+ }
+ }
+ // Cancel locks 1
+ if (i>0) {
+ for (int j=0; j<i; j++) {
+ Lock l = locks1[j];
+ if (l==null) continue;
+ l.unlock();
+ }
+ }
+ try { NANOSECONDS.sleep(1); } catch (InterruptedException e) {}
+ }
+ }
+
+ /**
+ * Try to lock multiple locks simultaneously. If all locks cannot be locked the
+ * thread sleeps a moment and tries again.
+ *
+ * If all locks cannot be locked at once, this method doesn't prevent
+ * other from locking them in the mean time.
+ *
+ * @param locks an array of locks. null values are ignored
+ * @return
+ */
+ public static boolean tryLock(Lock...locks) {
+ if (locks.length==0) return true;
+ if (locks.length==1) {
+ return locks[0].tryLock();
+ }
+
+ int i = 0;
+ for (;i<locks.length; i++) {
+ Lock l = locks[i];
+ if (l==null) continue;
+ if ( !locks[i].tryLock() ) break;
+ }
+ if (i==locks.length) return true;
+
+ // Unlock & fail
+ for (int j=0; j<i; j++) {
+ Lock l = locks[j];
+ if (l==null) continue;
+ l.unlock();
+ }
+ return false;
+ }
+
+
+ /**
+ * Unlock multiple locks. Use this after locking multiple locks.
+ *
+ * @param locks an array of locks. Null values are ignored
+ */
+ public static void unlock(Lock...locks) {
+ for (Lock lock : locks) {
+ if (lock != null) lock.unlock();
+ }
+ }
+
+ /**
+ * Unlock multiple locks. Use this after locking multiple locks.
+ *
+ * @param locks1 an array of locks. Null values are ignored
+ * @param locsk2 an array of locks. Null values are ignored
+ */
+ public static void unlock2(Lock[] locks1, Lock[] locks2) {
+ for (Lock lock : locks1) {
+ if (lock != null) lock.unlock();
+ }
+ for (Lock lock : locks2) {
+ if (lock != null) lock.unlock();
+ }
+ }
+
+ /**
+ * Join multiple arrays into a single array
+ *
+ * @param lockArrays
+ * @return
+ */
+ public static Lock[] appendLockArrays(Lock[]...lockArrays) {
+ int len = 0;
+ for (Lock[] array : lockArrays) len +=array.length;
+ Lock[] result = new Lock[ len ];
+ int i = 0;
+ for (Lock[] array : lockArrays) {
+ System.arraycopy(array, 0, result, i, array.length);
+ i += array.length;
+ }
+ return result;
+ }
+
+ public static synchronized void shutdown() {
+ if (TIMER != null) {
+ //System.out.println("TIMERS");
+ shutdownAndAwaitTermination(TIMER, 1000);
+ TIMER = null;
+ }
+ if (NON_BLOCKING_EXECUTOR != null) {
+ //System.out.println("NON_BLOCKING");
+ shutdownAndAwaitTermination(NON_BLOCKING_EXECUTOR, 1000);
+ NON_BLOCKING_EXECUTOR = null;
+ }
+ if (BLOCKING_EXECUTOR != null) {
+ //System.out.println("BLOCKING");
+ shutdownAndAwaitTermination(BLOCKING_EXECUTOR, 1000);
+ BLOCKING_EXECUTOR = null;
+ }
+ }
+
+ /**
+ * Grabbed from {@link ExecutorService} javadoc.
+ *
+ * @param pool {@link ExecutorService} to shut down
+ */
+ static void shutdownAndAwaitTermination(ExecutorService pool, long timeoutMs) {
+ //long t = System.currentTimeMillis();
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
+ List<Runnable> leftovers = pool.shutdownNow(); // Cancel currently executing tasks
+ if (!leftovers.isEmpty())
+ LOGGER.warn("Thread pool '" + pool.toString() + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers);
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS))
+ LOGGER.warn("Thread pool '" + pool.toString() + "' did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ //long e = System.currentTimeMillis();
+ //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms");
+ }
+
+ @SuppressWarnings("unused")
+ private static void uncheckedAwaitTermination(ExecutorService service, long time) {
+ try {
+ NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Executor support
+
+ // Executor that runs in current thread
+ public static Executor CURRENT_THREAD = new CurrentThreadExecutor();
+
+ // Async executor queues the command into AWT event queue
+ public static ExecutorService AWT_EDT = AWTThread.INSTANCE;
+
+ // Sync executor blocks the call until the command is finished
+ public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync();
+
+}
+
+class AWTExecutorSync extends AbstractExecutorService {
+
+ @Override
+ public void execute(Runnable command) {
+ if (EventQueue.isDispatchThread())
+ {
+ command.run();
+ } else {
+ try {
+ EventQueue.invokeAndWait(command);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return false;
+ }
+
+ public static String getStackTrace(int levels){
+ StringBuilder sb = new StringBuilder();
+ sb.append( Thread.currentThread() );
+ sb.append( "\n" );
+ Exception e3 = new Exception();
+ try { throw e3; } catch(Exception e2) {
+
+ for (int i=1; i<Math.max(e2.getStackTrace().length, levels+1); i++) {
+ StackTraceElement ste = e2.getStackTrace()[i];
+ sb.append(" - ");
+ sb.append(ste);
+ sb.append( "\n" );
+ }
+ }
+ return sb.toString();
+ }
+}
+
+class CurrentThreadExecutor implements Executor {
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+}