Async utilities for SCL
[simantics/platform.git] / bundles / org.simantics.scl.runtime / src / org / simantics / scl / runtime / utils / AsyncUtils.java
1 package org.simantics.scl.runtime.utils;
2
3 import java.io.PrintWriter;
4 import java.io.StringWriter;
5 import java.util.Arrays;
6 import java.util.Collection;
7 import java.util.List;
8 import java.util.concurrent.Callable;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.ThreadFactory;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.TimeoutException;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.stream.Collectors;
18
19 import org.simantics.scl.runtime.SCLContext;
20 import org.simantics.scl.runtime.function.Function;
21 import org.simantics.scl.runtime.reporting.SCLReporting;
22 import org.simantics.scl.runtime.tuple.Tuple0;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class AsyncUtils {
27
28     private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUtils.class);
29     private static final AtomicInteger threadCount = new AtomicInteger(0);
30
31     private static final ThreadFactory threadFactory = r -> {
32         Thread t = new Thread(r, "scl-async-utils-thread-" + threadCount.incrementAndGet());
33         t.setDaemon(true);
34         return t;
35     };
36
37     private static final ScheduledExecutorService scheduledExecutor = Executors
38             .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
39
40     @SuppressWarnings({ "rawtypes", "unchecked" })
41     public static void runAsync(Function f) {
42         SCLContext context = SCLContext.createDerivedContext();
43         scheduledExecutor.submit(() -> {
44             SCLContext.push(context);
45             try {
46                 f.apply(Tuple0.INSTANCE);
47             } catch (Exception e) {
48                 LOGGER.error("Could not execute async", e);
49                 printError(e);
50             } finally {
51                 SCLContext.pop();
52             }
53         });
54     }
55
56     @SuppressWarnings({ "rawtypes", "unchecked" })
57     public static List<Object> pmap(Function f, List<Object> l) {
58         return Arrays.asList(l.parallelStream().map(a -> f.apply(a)).toArray());
59     }
60
61     @SuppressWarnings({ "rawtypes", "unchecked" })
62     public static Future<?> submitAsync(Function f) {
63         SCLContext context = SCLContext.createDerivedContext();
64         return scheduledExecutor.submit(() -> {
65             SCLContext.push(context);
66             try {
67                 return f.apply(Tuple0.INSTANCE);
68             } catch (Throwable t) {
69                 LOGGER.error("Could not execute async {}", f, t);
70                 printError(t);
71                 return null;
72             } finally {
73                 SCLContext.pop();
74             }
75         });
76     }
77
78     @SuppressWarnings({ "rawtypes" })
79     public static List<Future<Object>> invokeAllAsync(Collection<Function> fs) throws InterruptedException {
80         return invokeAllAsyncWithTimeout(fs, -1, null);
81     }
82
83     @SuppressWarnings({ "unchecked", "rawtypes" })
84     private static Callable<Object> toCallable(SCLContext context, Function f) {
85         return () -> {
86             SCLContext.push(context);
87             try {
88                 return f.apply(Tuple0.INSTANCE);
89             } catch (Throwable t) {
90                 LOGGER.error("Could not invoke {}", f, t);
91                 printError(t);
92                 return null;
93             } finally {
94                 SCLContext.pop();
95             }
96         };
97     }
98
99     @SuppressWarnings({ "rawtypes" })
100     public static List<Future<Object>> invokeAllAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException {
101         SCLContext context = SCLContext.createDerivedContext();
102         List<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
103         try {
104             if (timeout == -1) {
105                 return scheduledExecutor.invokeAll(invokables);
106             } else {
107                 return scheduledExecutor.invokeAll(invokables, timeout, unit);
108             }
109         } catch (Throwable t) {
110             LOGGER.error("Could not invoke all", t);
111             throw t;
112         }
113     }
114
115     @SuppressWarnings({ "rawtypes" })
116     public static Object invokeAnyAsync(Collection<Function> fs) throws InterruptedException, ExecutionException, TimeoutException {
117         return invokeAnyAsyncWithTimeout(fs, -1, null);
118     }
119
120     @SuppressWarnings({ "rawtypes" })
121     public static Object invokeAnyAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
122         SCLContext context = SCLContext.createDerivedContext();
123         Collection<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
124         try {
125             if (timeout == -1) {
126                 return scheduledExecutor.invokeAny(invokables);
127             } else {
128                 return scheduledExecutor.invokeAny(invokables, timeout, unit);
129             }
130         } catch (Throwable t) {
131             LOGGER.error("Could not invoke any", t);
132             throw t;
133         }
134     }
135
136     @SuppressWarnings({ "rawtypes", "unchecked" })
137     public static Future<?> scheduleAtFixedRate(Function f, long initialDelay, long period, TimeUnit unit) {
138         SCLContext context = SCLContext.createDerivedContext();
139         return scheduledExecutor.scheduleAtFixedRate(() -> {
140             SCLContext.push(context);
141             try {
142                 f.apply(Tuple0.INSTANCE);
143             } catch (Throwable t) {
144                 LOGGER.error("Could not execute {} with fixed rate {} {}", f, initialDelay, period, t);
145                 printError(t);
146             } finally {
147                 SCLContext.pop();
148             }
149         }, initialDelay, period, unit);
150     }
151
152     @SuppressWarnings({ "rawtypes", "unchecked" })
153     public static Future<?> scheduleWithFixedDelay(Function f, long initialDelay, long period, TimeUnit unit) {
154         SCLContext context = SCLContext.createDerivedContext();
155         return scheduledExecutor.scheduleWithFixedDelay(() -> {
156             SCLContext.push(context);
157             try {
158                 f.apply(Tuple0.INSTANCE);
159             } catch (Throwable t) {
160                 LOGGER.error("Could not execute {} with fixed delay {} {}", f, initialDelay, period, t);
161                 printError(t);
162             } finally {
163                 SCLContext.pop();
164             }
165         }, initialDelay, period, unit);
166     }
167
168     @SuppressWarnings({ "rawtypes", "unchecked" })
169     public static Future<?> schedule(Function f, long delay, TimeUnit unit) {
170         SCLContext context = SCLContext.createDerivedContext();
171         return scheduledExecutor.schedule(() -> {
172             SCLContext.push(context);
173             try {
174                 return f.apply(Tuple0.INSTANCE);
175             } catch (Throwable t) {
176                 LOGGER.error("Could not schedule {} with delay {}", f, delay, t);
177                 printError(t);
178                 return null;
179             } finally {
180                 SCLContext.pop();
181             }
182         }, delay, unit);
183     }
184
185     private static void printError(Throwable t) {
186         StringWriter sw = new StringWriter();
187         t.printStackTrace(new PrintWriter(sw));
188         SCLReporting.printError(sw.toString());
189     }
190
191 }