]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.scl.runtime/src/org/simantics/scl/runtime/utils/AsyncUtils.java
Catch Throwable instead of Exception in SCL AsyncUtils
[simantics/platform.git] / bundles / org.simantics.scl.runtime / src / org / simantics / scl / runtime / utils / AsyncUtils.java
1 package org.simantics.scl.runtime.utils;
2
3 import java.io.PrintWriter;
4 import java.io.StringWriter;
5 import java.util.Collection;
6 import java.util.List;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.ExecutionException;
9 import java.util.concurrent.Executors;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.ScheduledExecutorService;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.TimeoutException;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.stream.Collectors;
17
18 import org.simantics.scl.runtime.SCLContext;
19 import org.simantics.scl.runtime.function.Function;
20 import org.simantics.scl.runtime.reporting.SCLReporting;
21 import org.simantics.scl.runtime.tuple.Tuple0;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 public class AsyncUtils {
26
27     private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUtils.class);
28     private static final AtomicInteger threadCount = new AtomicInteger(0);
29
30     private static final ThreadFactory threadFactory = r -> {
31         Thread t = new Thread(r, "scl-async-utils-thread-" + threadCount.incrementAndGet());
32         t.setDaemon(true);
33         return t;
34     };
35
36     private static final ScheduledExecutorService scheduledExecutor = Executors
37             .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
38
39     @SuppressWarnings({ "rawtypes", "unchecked" })
40     public static void runAsync(Function f) {
41         SCLContext context = SCLContext.createDerivedContext();
42         scheduledExecutor.submit(() -> {
43             SCLContext.push(context);
44             try {
45                 f.apply(Tuple0.INSTANCE);
46             } catch (Throwable t) {
47                 LOGGER.error("Could not execute async", t);
48                 printError(t);
49             } finally {
50                 SCLContext.pop();
51             }
52         });
53     }
54
55     @SuppressWarnings({ "rawtypes", "unchecked" })
56     public static List<Object> pmap(Function f, List<Object> l) {
57         return l.parallelStream().map(f::apply).collect(Collectors.toList());
58     }
59
60     @SuppressWarnings({ "rawtypes", "unchecked" })
61     public static Future<?> submitAsync(Function f) {
62         SCLContext context = SCLContext.createDerivedContext();
63         return scheduledExecutor.submit(() -> {
64             SCLContext.push(context);
65             try {
66                 return f.apply(Tuple0.INSTANCE);
67             } catch (Throwable t) {
68                 LOGGER.error("Could not execute async {}", f, t);
69                 printError(t);
70                 return null;
71             } finally {
72                 SCLContext.pop();
73             }
74         });
75     }
76
77     @SuppressWarnings({ "rawtypes" })
78     public static List<Future<Object>> invokeAllAsync(Collection<Function> fs) throws InterruptedException {
79         return invokeAllAsyncWithTimeout(fs, -1, null);
80     }
81
82     @SuppressWarnings({ "unchecked", "rawtypes" })
83     private static Callable<Object> toCallable(SCLContext context, Function f) {
84         return () -> {
85             SCLContext.push(context);
86             try {
87                 return f.apply(Tuple0.INSTANCE);
88             } catch (Throwable t) {
89                 LOGGER.error("Could not invoke {}", f, t);
90                 printError(t);
91                 return null;
92             } finally {
93                 SCLContext.pop();
94             }
95         };
96     }
97
98     @SuppressWarnings({ "rawtypes" })
99     public static List<Future<Object>> invokeAllAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException {
100         SCLContext context = SCLContext.createDerivedContext();
101         List<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
102         try {
103             if (timeout == -1) {
104                 return scheduledExecutor.invokeAll(invokables);
105             } else {
106                 return scheduledExecutor.invokeAll(invokables, timeout, unit);
107             }
108         } catch (Throwable t) {
109             LOGGER.error("Could not invoke all", t);
110             throw t;
111         }
112     }
113
114     @SuppressWarnings({ "rawtypes" })
115     public static Object invokeAnyAsync(Collection<Function> fs) throws InterruptedException, ExecutionException, TimeoutException {
116         return invokeAnyAsyncWithTimeout(fs, -1, null);
117     }
118
119     @SuppressWarnings({ "rawtypes" })
120     public static Object invokeAnyAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
121         SCLContext context = SCLContext.createDerivedContext();
122         Collection<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
123         try {
124             if (timeout == -1) {
125                 return scheduledExecutor.invokeAny(invokables);
126             } else {
127                 return scheduledExecutor.invokeAny(invokables, timeout, unit);
128             }
129         } catch (Throwable t) {
130             LOGGER.error("Could not invoke any", t);
131             throw t;
132         }
133     }
134
135     @SuppressWarnings({ "rawtypes", "unchecked" })
136     public static Future<?> scheduleAtFixedRate(Function f, long initialDelay, long period, TimeUnit unit) {
137         SCLContext context = SCLContext.createDerivedContext();
138         return scheduledExecutor.scheduleAtFixedRate(() -> {
139             SCLContext.push(context);
140             try {
141                 f.apply(Tuple0.INSTANCE);
142             } catch (Throwable t) {
143                 LOGGER.error("Could not execute {} with fixed rate {} {}", f, initialDelay, period, t);
144                 printError(t);
145             } finally {
146                 SCLContext.pop();
147             }
148         }, initialDelay, period, unit);
149     }
150
151     @SuppressWarnings({ "rawtypes", "unchecked" })
152     public static Future<?> scheduleWithFixedDelay(Function f, long initialDelay, long period, TimeUnit unit) {
153         SCLContext context = SCLContext.createDerivedContext();
154         return scheduledExecutor.scheduleWithFixedDelay(() -> {
155             SCLContext.push(context);
156             try {
157                 f.apply(Tuple0.INSTANCE);
158             } catch (Throwable t) {
159                 LOGGER.error("Could not execute {} with fixed delay {} {}", f, initialDelay, period, t);
160                 printError(t);
161             } finally {
162                 SCLContext.pop();
163             }
164         }, initialDelay, period, unit);
165     }
166
167     @SuppressWarnings({ "rawtypes", "unchecked" })
168     public static Future<?> schedule(Function f, long delay, TimeUnit unit) {
169         SCLContext context = SCLContext.createDerivedContext();
170         return scheduledExecutor.schedule(() -> {
171             SCLContext.push(context);
172             try {
173                 return f.apply(Tuple0.INSTANCE);
174             } catch (Throwable t) {
175                 LOGGER.error("Could not schedule {} with delay {}", f, delay, t);
176                 printError(t);
177                 return null;
178             } finally {
179                 SCLContext.pop();
180             }
181         }, delay, unit);
182     }
183
184     private static void printError(Throwable t) {
185         StringWriter sw = new StringWriter();
186         t.printStackTrace(new PrintWriter(sw));
187         SCLReporting.printError(sw.toString());
188     }
189
190 }