]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.utils.thread/src/org/simantics/utils/threads/ThreadUtils.java
Sync git svn branch with SVN repository r33144.
[simantics/platform.git] / bundles / org.simantics.utils.thread / src / org / simantics / utils / threads / ThreadUtils.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 /*\r
13  *\r
14  * @author Toni Kalajainen\r
15  */\r
16 package org.simantics.utils.threads;\r
17 \r
18 import static java.util.concurrent.TimeUnit.NANOSECONDS;\r
19 \r
20 import java.awt.EventQueue;\r
21 import java.lang.reflect.InvocationTargetException;\r
22 import java.util.Collection;\r
23 import java.util.Collections;\r
24 import java.util.HashMap;\r
25 import java.util.HashSet;\r
26 import java.util.LinkedList;\r
27 import java.util.List;\r
28 import java.util.Map;\r
29 import java.util.Set;\r
30 import java.util.concurrent.AbstractExecutorService;\r
31 import java.util.concurrent.Executor;\r
32 import java.util.concurrent.ExecutorService;\r
33 import java.util.concurrent.ScheduledExecutorService;\r
34 import java.util.concurrent.ScheduledThreadPoolExecutor;\r
35 import java.util.concurrent.Semaphore;\r
36 import java.util.concurrent.ThreadFactory;\r
37 import java.util.concurrent.TimeUnit;\r
38 import java.util.concurrent.atomic.AtomicInteger;\r
39 import java.util.concurrent.locks.Lock;\r
40 \r
41 /**\r
42  * Utility for switching threads\r
43  * \r
44  * @see Executors2\r
45  * @see IThreadWorkQueue\r
46  */\r
47 public class ThreadUtils {\r
48 \r
49     public static final int CORES = Runtime.getRuntime().availableProcessors();\r
50 \r
51     /**\r
52      * Defines the maximum thread pool size of\r
53      * {@link #getBlockingWorkExecutor()}. The system will enforce this value to\r
54      * be at least Math.max(CORES, 8). It can be configured through the system\r
55      * property <em>simantics.executor.blockingMaxThreads</em>.\r
56      */\r
57     public static final int MAX_BLOCKING_EXECUTOR_THREADS;\r
58 \r
59     static {\r
60         String blockingMaxThreadsProp = System.getProperty("simantics.executor.blockingMaxThreads", "" + CORES);\r
61         int blockingMaxThreads = CORES;\r
62         try {\r
63             blockingMaxThreads = Integer.parseInt(blockingMaxThreadsProp);\r
64         } catch (NumberFormatException e) {\r
65         }\r
66         MAX_BLOCKING_EXECUTOR_THREADS = Math.max(Math.max(blockingMaxThreads, 8), CORES);\r
67     }\r
68 \r
69     /** Never acquire non-blocking executor from this field, private use */\r
70     public static ScheduledExecutorService NON_BLOCKING_EXECUTOR;\r
71     /** Never acquire non-blocking executor from this field, private use */\r
72     public static ExecutorService BLOCKING_EXECUTOR;\r
73 \r
74     static ScheduledExecutorService TIMER;\r
75 \r
76     /**\r
77      * Get an {@link Executor} and {@link IThreadWorkQueue} for current thread\r
78      * \r
79      * @return an {@link Executor} and {@link IThreadWorkQueue} for current thread\r
80      */\r
81     public static CurrentThread getCurrentThread() {\r
82         return CurrentThread.INSTANCE;\r
83     }\r
84 \r
85     /**\r
86      * Get a timer executor. Never add blocking or long-wait runnables to this scheduler \r
87      * \r
88      * @return Executor that executes a non-blocking work\r
89      */\r
90     public static synchronized ScheduledExecutorService getTimer() {\r
91         if (TIMER == null) {\r
92             final ThreadGroup tg = new ThreadGroup("Timer");\r
93             final AtomicInteger counter = new AtomicInteger(0);\r
94             ThreadFactory tf = new ThreadFactory() {\r
95                 @Override\r
96                 public Thread newThread(Runnable r) {\r
97                     Thread t = new Thread(tg, r, "Timer-"+(counter.incrementAndGet()));\r
98                     if (!t.isDaemon())\r
99                         t.setDaemon(true);\r
100                     if (t.getPriority() != Thread.NORM_PRIORITY)\r
101                         t.setPriority(Thread.NORM_PRIORITY);\r
102                     return t;\r
103                 }\r
104             };\r
105             TIMER = new ScheduledThreadPoolExecutor( 1, tf );\r
106         }\r
107         return TIMER;\r
108     }\r
109 \r
110     /**\r
111      * Get Executor for work that doesn't lock or block. A non-blocking runnable never locks\r
112      * anything, no semaphores, no synchronized() {} blocks. no Object.wait(). \r
113      * \r
114      * @return Executor that executes a non-blocking work\r
115      */\r
116     public static synchronized ScheduledExecutorService getNonBlockingWorkExecutor() {\r
117         if (NON_BLOCKING_EXECUTOR == null) {\r
118             final ThreadGroup tg = new ThreadGroup("Non-Blocking-Worker-Group");\r
119             final AtomicInteger counter = new AtomicInteger(0);\r
120             ThreadFactory tf = new ThreadFactory() {\r
121                 @Override\r
122                 public Thread newThread(Runnable r) {\r
123                     Thread t = new Thread(tg, r, "Non-Blocking-Worker-"+(counter.incrementAndGet()));\r
124                     if (!t.isDaemon())\r
125                         t.setDaemon(true);\r
126                     if (t.getPriority() != Thread.NORM_PRIORITY)\r
127                         t.setPriority(Thread.NORM_PRIORITY);\r
128                     return t;\r
129                 }\r
130             };\r
131             NON_BLOCKING_EXECUTOR =\r
132                 new ScheduledThreadPoolExecutor( CORES, tf );\r
133         }\r
134         return NON_BLOCKING_EXECUTOR;\r
135     }\r
136 \r
137     /**\r
138      * Get Executor executes work that may lock or block. \r
139      * \r
140      * @return executor for blocking operations\r
141      */    \r
142     public static synchronized ExecutorService getBlockingWorkExecutor() {\r
143         if (BLOCKING_EXECUTOR == null) {\r
144             final ThreadGroup tg = new ThreadGroup("Blocking-Worker-Group");\r
145             final AtomicInteger counter = new AtomicInteger(0);\r
146             ThreadFactory tf = new ThreadFactory() {\r
147                 @Override\r
148                 public Thread newThread(Runnable r) {\r
149                     Thread t = new Thread(tg, r, "Blocking-Worker-"+(counter.incrementAndGet()));\r
150                     if (!t.isDaemon())\r
151                         t.setDaemon(true);\r
152                     if (t.getPriority() != Thread.NORM_PRIORITY)\r
153                         t.setPriority(Thread.NORM_PRIORITY);\r
154                     return t;\r
155                 }\r
156             };\r
157             BLOCKING_EXECUTOR =\r
158 //                new ThreadPoolExecutor(\r
159 //                        0,\r
160 //                        MAX_BLOCKING_EXECUTOR_THREADS,\r
161 //                        3L, TimeUnit.SECONDS,\r
162 //                        new SynchronousQueue<Runnable>(),\r
163 //                        tf);\r
164                     new ScheduledThreadPoolExecutor(MAX_BLOCKING_EXECUTOR_THREADS, tf);\r
165         }\r
166         return BLOCKING_EXECUTOR;\r
167     }\r
168 \r
169     /**\r
170      * Better thread access allows thread context switching back to waiting\r
171      * threads.\r
172      * \r
173      * @param access\r
174      * @return an enhanced version of the specified queue\r
175      */\r
176     public static IThreadWorkQueue getBetterThreadAccess(IThreadWorkQueue access)\r
177     {\r
178         if (access instanceof BetterThreadAccess)\r
179             return access;\r
180         return new BetterThreadAccess(access);\r
181     }\r
182 \r
183     static Map<Thread, WaitingThread> map =\r
184         new HashMap<Thread, WaitingThread>();\r
185 \r
186     /**\r
187      * Executes a runnable in thread synchronously.\r
188      * If the thread locked and waits for this thread, then\r
189      * the runnable is ran in that thread.\r
190      * \r
191      * This works only if all thread switching is done\r
192      * using this method.\r
193      * \r
194      * @param threadAccess\r
195      * @param runnable\r
196      * @return true if thread accepted the runnable\r
197      */\r
198     public static boolean syncExec(IThreadWorkQueue threadAccess, final Runnable runnable)\r
199     {\r
200         if (threadAccess instanceof BetterThreadAccess)\r
201             threadAccess = ((BetterThreadAccess) threadAccess).ta;\r
202 \r
203         // Using current thread\r
204         if (threadAccess.currentThreadAccess())\r
205         {\r
206             try {\r
207                 runnable.run();\r
208             } catch (RuntimeException e) {\r
209                 handleRunnableError(e);\r
210             }\r
211             return true;\r
212         }\r
213 \r
214         final Thread senderThread = Thread.currentThread();\r
215         final WaitingThread wt = new WaitingThread(senderThread);\r
216         WaitingThread prevWt;\r
217         Event e = new Event(runnable, new EventListener() {\r
218             @Override\r
219             public void eventDone(Event e) {\r
220                 wt.completed(e);\r
221             }}, null);\r
222 \r
223         synchronized (ThreadUtils.class) {\r
224             // Check if target has WaitingThread. Use it if it does\r
225             WaitingThread targetWt = getWaitingThread(threadAccess.getThread());\r
226             Thread waitingForThread = null;\r
227             if (targetWt != null)\r
228             {\r
229                 // Check if it is allowed to use the target WT\r
230                 if (isEventQueuingAllowed(senderThread, targetWt))\r
231                 {\r
232                     if (targetWt.addEvent(e)) {\r
233                         synchronized(wt) {\r
234                             waitingForThread = targetWt.thread;\r
235                             e.setThread(waitingForThread);\r
236                             wt.waitFor(e);\r
237                         }\r
238                     }\r
239                 }\r
240             }\r
241 \r
242             if (waitingForThread == null) {\r
243                 synchronized(wt) {\r
244                     waitingForThread = threadAccess.asyncExec(e);\r
245                     if (waitingForThread==null) return false;\r
246                     e.setThread(waitingForThread);\r
247                     wt.waitFor(e);\r
248                 }\r
249             }\r
250 \r
251             prevWt = setWaitingThread(senderThread, wt);\r
252         }\r
253         // run errands in the mean time\r
254         wt.waitAndProcessEvents();\r
255         wt.stopAcceptingEvents();\r
256         // Stop accepting errands\r
257         removeWaitingThread(senderThread, prevWt);\r
258         // Run last events\r
259         wt.waitAndProcessEvents();\r
260         return true;\r
261     }\r
262 \r
263     public static boolean multiSyncExec(Collection<Executable> executions)\r
264     {\r
265         if (executions.isEmpty()) return true;\r
266         return multiSyncExec(executions.toArray(new Executable[executions.size()]));\r
267     }\r
268 \r
269     /**\r
270      * Executes a list of executables in multiple threads and waits for all to complete.\r
271      * \r
272      * @param executions\r
273      * @return <code>true</code> once execution has completed (is this necessary?)\r
274      */\r
275     public static boolean multiSyncExec(Executable ... executions)\r
276     {\r
277         if (executions.length==0) return true;\r
278         if (executions.length==1) {\r
279             return syncExec(executions[0].threadAccess, executions[0].runnable);\r
280         }\r
281         final Thread senderThread = Thread.currentThread();\r
282         final WaitingThread wt = new WaitingThread(senderThread);\r
283         WaitingThread prevWt = null;\r
284         synchronized (ThreadUtils.class) {\r
285             for (Executable pair : executions)\r
286             {\r
287                 IThreadWorkQueue threadAccess = pair.threadAccess;\r
288                 if (threadAccess.currentThreadAccess())\r
289                     continue;\r
290                 if (threadAccess instanceof BetterThreadAccess)\r
291                     threadAccess = ((BetterThreadAccess) threadAccess).ta;\r
292                 Runnable runnable = pair.runnable;\r
293 \r
294                 Event e = new Event(runnable, new EventListener() {\r
295                     @Override\r
296                     public void eventDone(Event e) {\r
297                         wt.completed(e);\r
298                     }}, null);\r
299 \r
300                 // Check if target has WaitingThread. Use it if it does\r
301                 WaitingThread targetWt = getWaitingThread(threadAccess.getThread());\r
302                 Thread waitingForThread = null;\r
303                 if (targetWt != null)\r
304                 {\r
305                     // Check if it is allowed to use the target WT\r
306                     if (isEventQueuingAllowed(senderThread, targetWt))\r
307                     {\r
308                         if (targetWt.addEvent(e)) {\r
309                             synchronized(wt) {\r
310                                 waitingForThread = targetWt.thread;\r
311                                 e.setThread(waitingForThread);\r
312                                 wt.waitFor(e);\r
313                             }\r
314                         }\r
315                     }\r
316                 }\r
317 \r
318                 if (waitingForThread == null) {\r
319                     synchronized(wt) {\r
320                         waitingForThread = threadAccess.asyncExec(e);\r
321                         if (waitingForThread==null)\r
322                             return false;\r
323                         e.setThread(waitingForThread);\r
324                         wt.waitFor(e);\r
325                     }\r
326                 }\r
327             }\r
328             prevWt = setWaitingThread(senderThread, wt);\r
329         }\r
330 \r
331         // Run local runnables\r
332         for (Executable pair : executions)\r
333         {\r
334             IThreadWorkQueue threadAccess = pair.threadAccess;\r
335             Runnable runnable = pair.runnable;\r
336             if (threadAccess.currentThreadAccess())\r
337                 try {\r
338                     runnable.run();\r
339                 } catch (RuntimeException e) {\r
340                     handleRunnableError(e);\r
341                 }\r
342         }\r
343 \r
344         // run errands in the mean time\r
345         wt.waitAndProcessEvents();\r
346         // Stop accepting errands\r
347         wt.stopAcceptingEvents();\r
348         removeWaitingThread(senderThread, prevWt);\r
349         // Run last events\r
350         wt.waitAndProcessEvents();\r
351         return true;\r
352     }\r
353 \r
354     public static Thread asyncExec(IThreadWorkQueue threadAccess, final Runnable runnable)\r
355     {\r
356         if (threadAccess instanceof BetterThreadAccess)\r
357             threadAccess = ((BetterThreadAccess) threadAccess).ta;\r
358 \r
359         final Thread senderThread = Thread.currentThread();\r
360         synchronized (ThreadUtils.class) {\r
361 \r
362             Event e = new Event(runnable, null, null);\r
363 \r
364             // Check if target has WaitingThread. Use it if it does\r
365             WaitingThread targetWt = getWaitingThread(threadAccess.getThread());\r
366             if (targetWt != null)\r
367             {\r
368                 //      Check if it is allowed to use the target WT\r
369                 if (isEventQueuingAllowed(senderThread, targetWt))\r
370                 {\r
371                     if (targetWt.addEvent(e))\r
372                         return targetWt.thread;\r
373                 }\r
374             }\r
375             return threadAccess.asyncExec(runnable);\r
376         }\r
377     }\r
378 \r
379     private static boolean _waitsFor(Thread sourceThread, Thread targetThread, Set<Thread> visitedTargetThreads)\r
380     {\r
381         assert(targetThread!=null);\r
382         if (visitedTargetThreads.contains(targetThread))\r
383             return false;\r
384         visitedTargetThreads.add(targetThread);\r
385         if (sourceThread == targetThread) return false;\r
386         Set<Thread> waitsFor = getWaitsForThreads(targetThread);\r
387         if (waitsFor==null||waitsFor.isEmpty()) return false;\r
388         for (Thread aThreadTargetThreadWaitsFor : waitsFor)\r
389         {\r
390             if (aThreadTargetThreadWaitsFor==sourceThread) return true;\r
391             if (visitedTargetThreads.contains(aThreadTargetThreadWaitsFor)) continue;\r
392             if (_waitsFor(sourceThread, aThreadTargetThreadWaitsFor, visitedTargetThreads))\r
393                 return true;\r
394         }\r
395         return false;\r
396     }\r
397 \r
398     static boolean waitsFor(Thread sourceThread, Thread targetThread)\r
399     {\r
400         return _waitsFor(sourceThread, targetThread, new HashSet<Thread>(3));\r
401     }\r
402 \r
403     static boolean isEventQueuingAllowed(Thread sourceThread, WaitingThread eventQueue)\r
404     {\r
405         if (!eventQueue.acceptEvents) return false;\r
406         // queuing is allowed if target thread does wait for source thread\r
407         return waitsFor(sourceThread, eventQueue.thread);\r
408     }\r
409 \r
410     private static void handleRunnableError(Throwable t)\r
411     {\r
412         t.printStackTrace();\r
413     }\r
414 \r
415     interface EventListener {\r
416         void eventDone(Event e);\r
417     }\r
418 \r
419     public static class Event implements Runnable {\r
420         Runnable r;\r
421         EventListener l;\r
422         Semaphore s;\r
423         Thread t;\r
424         public Event(Runnable r, EventListener l, Semaphore s) {\r
425             this.r = r;\r
426             this.l = l;\r
427             this.s = s;\r
428         }\r
429         public void run() {\r
430             setThread(Thread.currentThread());\r
431             try {\r
432                 r.run();\r
433             } catch (RuntimeException e) {\r
434                 handleRunnableError(e);\r
435             } finally {\r
436                 if (s!=null)\r
437                     s.release(1);\r
438                 if (l!=null)\r
439                     l.eventDone(this);\r
440             }\r
441         }\r
442         public synchronized Thread getThread()\r
443         {\r
444             while (t==null) {\r
445                 try {\r
446                     t.wait();\r
447                 } catch (InterruptedException e) {\r
448                 }\r
449             }\r
450             return t;\r
451         }\r
452         public synchronized void setThread(Thread t)\r
453         {\r
454             assert(t!=null);\r
455             if (this.t!=null)\r
456                 assert(this.t==t);\r
457             this.t = t;\r
458             notify();\r
459         }\r
460     }\r
461 \r
462     /**\r
463      * WaitingThread is a thread that waits for something and in the meantime\r
464      * runs errands.\r
465      */\r
466     static class WaitingThread\r
467     {\r
468         final Thread thread;\r
469         LinkedList<Event> queue = new LinkedList<Event>();\r
470         boolean acceptEvents = true;\r
471         Set<Event> waitingFor = new HashSet<Event>();\r
472         Set<Event> completed = new HashSet<Event>();\r
473         public WaitingThread(Thread thread) {\r
474             this.thread = thread;\r
475         }\r
476         public synchronized void waitFor(Event event)\r
477         {\r
478             assert(thread!=null);\r
479             waitingFor.add(event);\r
480         }\r
481         public synchronized void completed(Event event)\r
482         {\r
483             //assert(waitingFor.contains(event));\r
484             completed.add(event);\r
485             if (completed.size()==waitingFor.size())\r
486                 notify();\r
487         }\r
488         synchronized boolean isEmpty() {\r
489             return queue.isEmpty();\r
490         }\r
491         synchronized boolean keepWaiting() {\r
492             if(waitingFor.size()!=completed.size())\r
493                 return true;\r
494             assert(waitingFor.equals(completed));\r
495             return false;\r
496         }\r
497         public synchronized boolean addEvent(Event r)\r
498         {\r
499             if (!acceptEvents) return false;\r
500             queue.add(r);\r
501             notify();\r
502             return true;\r
503         }\r
504         \r
505         static private int WAIT_MS = 10000;\r
506         static private int WAIT_THRESHOLD_NS = 900000*WAIT_MS;\r
507         \r
508         public void waitAndProcessEvents()\r
509         {\r
510             while (keepWaiting() || !isEmpty()) {\r
511                 Event e = null;\r
512                 synchronized(this)\r
513                 {\r
514                     if (!queue.isEmpty())\r
515                         e = queue.pop();\r
516                     if (e==null && keepWaiting())\r
517                         try {\r
518                                 long now = System.nanoTime();\r
519                             wait(WAIT_MS);\r
520                             long duration = System.nanoTime()-now;\r
521                             if(duration > (WAIT_THRESHOLD_NS)) {\r
522                                 for(Thread t : getWaitingForThreads())\r
523                                     if(!t.isAlive())\r
524                                         throw new IllegalStateException("Thread '" + thread + "' has died.");\r
525                             }\r
526                         } catch (InterruptedException e1) {\r
527                         }\r
528                 }\r
529                 if (e!=null) {\r
530                     try {\r
531                         e.run();\r
532                     } catch (RuntimeException e1) {\r
533                         e1.printStackTrace();\r
534                     }\r
535                 }\r
536             }\r
537             while (!isEmpty())\r
538             {\r
539                 Event e = null;\r
540                 synchronized(this)\r
541                 {\r
542                     if (!queue.isEmpty())\r
543                         e = queue.pop();\r
544                 }\r
545                 if (e!=null)\r
546                     try {\r
547                         e.run();\r
548                     } catch (RuntimeException e1) {\r
549                         e1.printStackTrace();\r
550                     }\r
551             }\r
552 \r
553         }\r
554         public synchronized void stopAcceptingEvents()\r
555         {\r
556             acceptEvents = false;\r
557         }\r
558         public synchronized Set<Thread> getWaitingForThreads()\r
559         {\r
560             Set<Thread> result = new HashSet<Thread>(waitingFor.size());\r
561             for (Event e : waitingFor)\r
562             {\r
563                 if (completed.contains(e)) continue;\r
564                 result.add(e.getThread());\r
565             }\r
566             return result;\r
567         }\r
568     }\r
569 \r
570     /**\r
571      * Blocks until waiting thread has been set for thread t\r
572      * @param t thread\r
573      * @return waiting thread\r
574      */\r
575     synchronized static WaitingThread getWaitingThreadSync(Thread t)\r
576     {\r
577         WaitingThread result;\r
578         do {\r
579             result = map.get(t);\r
580             if (result!=null) return result;\r
581             try {\r
582                 ThreadUtils.class.wait();\r
583             } catch (InterruptedException e) {\r
584             }\r
585         } while (true);\r
586     }\r
587 \r
588     /**\r
589      * Return an array which may have nulls\r
590      * @param t\r
591      * @return\r
592      */\r
593     static Set<Thread> getWaitsForThreads(Thread t)\r
594     {\r
595         WaitingThread wt = getWaitingThread(t);\r
596         if (wt==null) return null;\r
597         return wt.getWaitingForThreads();\r
598     }\r
599 \r
600     static synchronized WaitingThread getWaitingThread(Thread t)\r
601     {\r
602         return map.get(t);\r
603     }\r
604 \r
605     static synchronized WaitingThread setWaitingThread(Thread t, WaitingThread wt)\r
606     {\r
607         WaitingThread prev = map.put(t, wt);\r
608         ThreadUtils.class.notifyAll();\r
609         return prev;\r
610     }\r
611 \r
612     static synchronized void removeWaitingThread(Thread t, WaitingThread replaceWith)\r
613     {\r
614         assert(t == Thread.currentThread());\r
615         map.remove(t);\r
616         if (replaceWith!=null)\r
617             map.put(t, replaceWith);\r
618     }\r
619 \r
620     static class BetterThreadAccess implements IThreadWorkQueue {\r
621         IThreadWorkQueue ta;\r
622         public BetterThreadAccess(IThreadWorkQueue ta)\r
623         {\r
624             if (ta instanceof BetterThreadAccess)\r
625                 ta  = ((BetterThreadAccess) ta).ta;\r
626             this.ta = ta;\r
627         }\r
628         @Override\r
629         public Thread asyncExec(Runnable runnable) {\r
630             return ThreadUtils.asyncExec(ta, runnable);\r
631         }\r
632         @Override\r
633         public boolean currentThreadAccess() {\r
634             return ta.currentThreadAccess();\r
635         }\r
636         @Override\r
637         public Thread getThread() {\r
638             return ta.getThread();\r
639         }\r
640         @Override\r
641         public boolean syncExec(Runnable runnable) {\r
642             return ThreadUtils.syncExec(ta, runnable);\r
643         }\r
644     }\r
645 \r
646 \r
647 \r
648     /**\r
649      * Executes command in the executor while maintaining possibility to\r
650      * return back to caller.\r
651      * \r
652      * @param executor\r
653      * @param command\r
654      */\r
655     public static void exec(Executor executor, final Runnable command)\r
656     {\r
657         final Thread[] calleeThread = new Thread[1];\r
658         final Thread callerThread = Thread.currentThread();\r
659         final Runnable wrappedCommand = new Runnable() {\r
660             @Override\r
661             public void run() {\r
662                 calleeThread[0] = Thread.currentThread();\r
663                 try {\r
664                     command.run();\r
665                 } finally {\r
666                 }\r
667             }\r
668         };\r
669 \r
670         // add dependency\r
671         DEPENDENCIES.put(callerThread, calleeThread);\r
672         executor.execute(wrappedCommand);\r
673         DEPENDENCIES.remove(callerThread);\r
674     }\r
675     private static Map<Thread, Thread[]> DEPENDENCIES =\r
676         Collections.synchronizedMap( new HashMap<Thread, Thread[]>() );\r
677     @SuppressWarnings("unused")\r
678     private static boolean hasDependency(Thread waiter, Thread worker)\r
679     {\r
680         // Follow dependency chain until worker is reached\r
681         for (Thread t = waiter; t!=null;)\r
682         {\r
683             Thread[] potentialResult = DEPENDENCIES.get(t);\r
684             if (potentialResult==null) break;\r
685             t = potentialResult[0];\r
686             if (t==worker) return true;\r
687         }\r
688         return false;\r
689     }\r
690     \r
691     /**\r
692      * Lock multiple locks simultaneously. If all locks cannot be locked the \r
693      * thread sleeps a moment and tries again. \r
694      * \r
695      * If all locks cannot be locked at once, this method doesn't prevent \r
696      * other from locking them in the mean time.\r
697      * \r
698      * @param locks an array of locks. null values are ignored\r
699      */\r
700     public static void lock(Lock...locks) {\r
701         if (locks.length==0) return;\r
702         if (locks.length==1) {\r
703             locks[0].lock();\r
704             return;\r
705         }\r
706 \r
707         while (true) {\r
708             int i = 0;\r
709             for (;i<locks.length; i++) {\r
710                 Lock l = locks[i];\r
711                 if (l==null) continue;\r
712                 if ( !locks[i].tryLock() ) break;\r
713             }\r
714             if (i==locks.length) return;\r
715             for (int j=0; j<i; j++) {\r
716                 Lock l = locks[j];\r
717                 if (l==null) continue;\r
718                 l.unlock();\r
719             }\r
720             try { NANOSECONDS.sleep(10000); } catch (InterruptedException e) {}\r
721         }\r
722     }\r
723 \r
724     /**\r
725      * Lock multiple locks simultaneously. If all locks cannot be locked the \r
726      * thread sleeps a moment and tries again. \r
727      * \r
728      * If all locks cannot be locked at once, this method doesn't prevent \r
729      * other from locking them in the mean time.\r
730      * \r
731      * [UNTESTED]\r
732      * \r
733      * @param locks1 an array of locks. null values are ignored\r
734      * @param locks2 an array of locks. null values are ignored\r
735      */\r
736     public static void lock2(Lock[] locks1, Lock[] locks2) {\r
737         int l1 = locks1.length;\r
738         int l2 = locks2.length;\r
739         int c = l1+l2;\r
740         if (l1==0 && l2==0) return;\r
741 \r
742         while (true) {\r
743             int i = 0;\r
744             // Attempt to lock everything in locks 1 \r
745             for (;i<l1; i++) {\r
746                 Lock l = locks1[i];\r
747                 if (l==null) continue;\r
748                 if ( !locks1[i].tryLock() ) break;\r
749             }\r
750             // Attempt to lock everything in locks 2\r
751             if (i==l1) {\r
752                 for (;i<c; i++) {\r
753                     Lock l = locks2[i];\r
754                     if (l==null) continue;\r
755                     if ( !locks2[i-l1].tryLock() ) break;\r
756                 }\r
757             }\r
758             // All locked?\r
759             if (i==c) return;\r
760             // Cancel locks2\r
761             if (i>l1) {\r
762                 for (int j=l1; j<i; j++) {\r
763                     Lock l = locks2[j-l1];\r
764                     if (l==null) continue;\r
765                     l.unlock();\r
766                 }\r
767             }\r
768             // Cancel locks 1\r
769             if (i>0) { \r
770                 for (int j=0; j<i; j++) {\r
771                     Lock l = locks1[j];\r
772                     if (l==null) continue;\r
773                     l.unlock();\r
774                 }\r
775             }\r
776             try { NANOSECONDS.sleep(1); } catch (InterruptedException e) {}\r
777         }\r
778     } \r
779 \r
780     /**\r
781      * Try to lock multiple locks simultaneously. If all locks cannot be locked the \r
782      * thread sleeps a moment and tries again. \r
783      * \r
784      * If all locks cannot be locked at once, this method doesn't prevent \r
785      * other from locking them in the mean time.\r
786      * \r
787      * @param locks an array of locks. null values are ignored\r
788      * @return \r
789      */\r
790     public static boolean tryLock(Lock...locks) {\r
791         if (locks.length==0) return true;\r
792         if (locks.length==1) {\r
793             return locks[0].tryLock();\r
794         }\r
795 \r
796         int i = 0;\r
797         for (;i<locks.length; i++) {\r
798             Lock l = locks[i];\r
799             if (l==null) continue;\r
800             if ( !locks[i].tryLock() ) break;\r
801         }\r
802         if (i==locks.length) return true;\r
803 \r
804         // Unlock & fail\r
805         for (int j=0; j<i; j++) {\r
806             Lock l = locks[j];\r
807             if (l==null) continue;\r
808             l.unlock();\r
809         }\r
810         return false;\r
811     }\r
812 \r
813 \r
814     /**\r
815      * Unlock multiple locks. Use this after locking multiple locks.\r
816      * \r
817      * @param locks an array of locks. Null values are ignored\r
818      */\r
819     public static void unlock(Lock...locks) {\r
820         for (Lock lock : locks) {\r
821             if (lock != null) lock.unlock();\r
822         }\r
823     }\r
824 \r
825     /**\r
826      * Unlock multiple locks. Use this after locking multiple locks.\r
827      * \r
828      * @param locks1 an array of locks. Null values are ignored\r
829      * @param locsk2 an array of locks. Null values are ignored\r
830      */\r
831     public static void unlock2(Lock[] locks1, Lock[] locks2) {\r
832         for (Lock lock : locks1) {\r
833             if (lock != null) lock.unlock();\r
834         }\r
835         for (Lock lock : locks2) {\r
836             if (lock != null) lock.unlock();\r
837         }\r
838     }\r
839 \r
840     /**\r
841      * Join multiple arrays into a single array\r
842      * \r
843      * @param lockArrays\r
844      * @return\r
845      */\r
846     public static Lock[] appendLockArrays(Lock[]...lockArrays) {\r
847         int len = 0;\r
848         for (Lock[] array : lockArrays) len +=array.length;\r
849         Lock[] result = new Lock[ len ];\r
850         int i = 0;\r
851         for (Lock[] array : lockArrays) {\r
852             System.arraycopy(array, 0, result, i, array.length);\r
853             i += array.length;\r
854         }\r
855         return result;\r
856     }\r
857 \r
858     public static synchronized void shutdown() {\r
859         if (TIMER != null) {\r
860             //System.out.println("TIMERS");\r
861             shutdownAndAwaitTermination(TIMER, 1000);\r
862             TIMER = null;\r
863         }\r
864         if (NON_BLOCKING_EXECUTOR != null) {\r
865             //System.out.println("NON_BLOCKING");\r
866             shutdownAndAwaitTermination(NON_BLOCKING_EXECUTOR, 1000);\r
867             NON_BLOCKING_EXECUTOR = null;\r
868         }\r
869         if (BLOCKING_EXECUTOR != null) {\r
870             //System.out.println("BLOCKING");\r
871             shutdownAndAwaitTermination(BLOCKING_EXECUTOR, 1000);\r
872             BLOCKING_EXECUTOR = null;\r
873         }\r
874     }\r
875 \r
876     /**\r
877      * Grabbed from {@link ExecutorService} javadoc.\r
878      * \r
879      * @param pool {@link ExecutorService} to shut down\r
880      */\r
881     private static void shutdownAndAwaitTermination(ExecutorService pool, long timeoutMs) {\r
882         //long t = System.currentTimeMillis();\r
883         pool.shutdown(); // Disable new tasks from being submitted\r
884         try {\r
885             // Wait a while for existing tasks to terminate\r
886             if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {\r
887                 List<Runnable> leftovers = pool.shutdownNow(); // Cancel currently executing tasks\r
888                 if (!leftovers.isEmpty())\r
889                     System.err.println("Thread pool '" + pool.toString()  + "' contained " + leftovers.size() + " tasks at forced shutdown: " + leftovers);\r
890                 // Wait a while for tasks to respond to being cancelled\r
891                 if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS))\r
892                     System.err.println("Thread pool '" + pool.toString()  + "' did not terminate");\r
893             }\r
894         } catch (InterruptedException ie) {\r
895             // (Re-)Cancel if current thread also interrupted\r
896             pool.shutdownNow();\r
897             // Preserve interrupt status\r
898             Thread.currentThread().interrupt();\r
899         }\r
900         //long e = System.currentTimeMillis();\r
901         //System.out.println("shutdown took: " + ((e-t)*1e-3) + " ms");\r
902     }\r
903 \r
904     @SuppressWarnings("unused")\r
905     private static void uncheckedAwaitTermination(ExecutorService service, long time) {\r
906         try {\r
907             NON_BLOCKING_EXECUTOR.awaitTermination(time, TimeUnit.MILLISECONDS);\r
908         } catch (InterruptedException e) {\r
909         }\r
910     }\r
911 \r
912     // Executor support\r
913 \r
914     // Executor that runs in current thread\r
915     public static Executor CURRENT_THREAD = new CurrentThreadExecutor();\r
916 \r
917     // Async executor queues the command into AWT event queue\r
918     public static ExecutorService AWT_EDT = AWTThread.INSTANCE;\r
919 \r
920     // Sync executor blocks the call until the command is finished\r
921     public static ExecutorService AWT_EDT_SYNC = new AWTExecutorSync();\r
922 \r
923 }\r
924 \r
925 class AWTExecutorSync extends AbstractExecutorService {\r
926 \r
927     @Override\r
928     public void execute(Runnable command) {\r
929         if (EventQueue.isDispatchThread())\r
930         {\r
931             command.run();\r
932         } else {\r
933             try {\r
934                 EventQueue.invokeAndWait(command);\r
935             } catch (InterruptedException e) {\r
936                 throw new RuntimeException(e);\r
937             } catch (InvocationTargetException e) {\r
938                 throw new RuntimeException(e.getCause());\r
939             }\r
940         }\r
941     }\r
942 \r
943     @Override\r
944     public void shutdown() {\r
945     }\r
946 \r
947     @Override\r
948     public List<Runnable> shutdownNow() {\r
949         return null;\r
950     }\r
951 \r
952     @Override\r
953     public boolean isShutdown() {\r
954         return false;\r
955     }\r
956 \r
957     @Override\r
958     public boolean isTerminated() {\r
959         return false;\r
960     }\r
961 \r
962     @Override\r
963     public boolean awaitTermination(long timeout, TimeUnit unit)\r
964             throws InterruptedException {\r
965         return false;\r
966     }\r
967     \r
968     public static String getStackTrace(int levels){\r
969         StringBuilder sb = new StringBuilder();\r
970         sb.append( Thread.currentThread() );\r
971         sb.append( "\n" );\r
972                 Exception e3 = new Exception();         \r
973                 try { throw e3; } catch(Exception e2) {\r
974                         \r
975                         for (int i=1; i<Math.max(e2.getStackTrace().length, levels+1); i++) {\r
976                                 StackTraceElement ste = e2.getStackTrace()[i];\r
977                                 sb.append("  - ");\r
978                                 sb.append(ste);\r
979                         sb.append( "\n" );\r
980                         }               \r
981                 }\r
982         return sb.toString();\r
983     }\r
984 }\r
985 \r
986 class CurrentThreadExecutor implements Executor {\r
987     @Override\r
988     public void execute(Runnable command) {\r
989         command.run();\r
990     }\r
991 }\r