1 package org.simantics.scl.runtime.utils;
3 import java.io.PrintWriter;
4 import java.io.StringWriter;
5 import java.util.Collection;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.ExecutionException;
9 import java.util.concurrent.Executors;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.ScheduledExecutorService;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.TimeoutException;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.stream.Collectors;
18 import org.simantics.scl.runtime.SCLContext;
19 import org.simantics.scl.runtime.function.Function;
20 import org.simantics.scl.runtime.reporting.SCLReporting;
21 import org.simantics.scl.runtime.tuple.Tuple0;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 public class AsyncUtils {
27 private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUtils.class);
28 private static final AtomicInteger threadCount = new AtomicInteger(0);
30 private static final ThreadFactory threadFactory = r -> {
31 Thread t = new Thread(r, "scl-async-utils-thread-" + threadCount.incrementAndGet());
36 private static final ScheduledExecutorService scheduledExecutor = Executors
37 .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
39 @SuppressWarnings({ "rawtypes", "unchecked" })
40 public static void runAsync(Function f) {
41 SCLContext context = SCLContext.createDerivedContext();
42 scheduledExecutor.submit(() -> {
43 SCLContext.push(context);
45 f.apply(Tuple0.INSTANCE);
46 } catch (Throwable t) {
47 LOGGER.error("Could not execute async", t);
55 @SuppressWarnings({ "rawtypes", "unchecked" })
56 public static List<Object> pmap(Function f, List<Object> l) {
57 return l.parallelStream().map(f::apply).collect(Collectors.toList());
60 @SuppressWarnings({ "rawtypes", "unchecked" })
61 public static Future<?> submitAsync(Function f) {
62 SCLContext context = SCLContext.createDerivedContext();
63 return scheduledExecutor.submit(() -> {
64 SCLContext.push(context);
66 return f.apply(Tuple0.INSTANCE);
67 } catch (Throwable t) {
68 LOGGER.error("Could not execute async {}", f, t);
77 @SuppressWarnings({ "rawtypes" })
78 public static List<Future<Object>> invokeAllAsync(Collection<Function> fs) throws InterruptedException {
79 return invokeAllAsyncWithTimeout(fs, -1, null);
82 @SuppressWarnings({ "unchecked", "rawtypes" })
83 private static Callable<Object> toCallable(SCLContext context, Function f) {
85 SCLContext.push(context);
87 return f.apply(Tuple0.INSTANCE);
88 } catch (Throwable t) {
89 LOGGER.error("Could not invoke {}", f, t);
98 @SuppressWarnings({ "rawtypes" })
99 public static List<Future<Object>> invokeAllAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException {
100 SCLContext context = SCLContext.createDerivedContext();
101 List<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
104 return scheduledExecutor.invokeAll(invokables);
106 return scheduledExecutor.invokeAll(invokables, timeout, unit);
108 } catch (Throwable t) {
109 LOGGER.error("Could not invoke all", t);
114 @SuppressWarnings({ "rawtypes" })
115 public static Object invokeAnyAsync(Collection<Function> fs) throws InterruptedException, ExecutionException, TimeoutException {
116 return invokeAnyAsyncWithTimeout(fs, -1, null);
119 @SuppressWarnings({ "rawtypes" })
120 public static Object invokeAnyAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
121 SCLContext context = SCLContext.createDerivedContext();
122 Collection<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
125 return scheduledExecutor.invokeAny(invokables);
127 return scheduledExecutor.invokeAny(invokables, timeout, unit);
129 } catch (Throwable t) {
130 LOGGER.error("Could not invoke any", t);
135 @SuppressWarnings({ "rawtypes", "unchecked" })
136 public static Future<?> scheduleAtFixedRate(Function f, long initialDelay, long period, TimeUnit unit) {
137 SCLContext context = SCLContext.createDerivedContext();
138 return scheduledExecutor.scheduleAtFixedRate(() -> {
139 SCLContext.push(context);
141 f.apply(Tuple0.INSTANCE);
142 } catch (Throwable t) {
143 LOGGER.error("Could not execute {} with fixed rate {} {}", f, initialDelay, period, t);
148 }, initialDelay, period, unit);
151 @SuppressWarnings({ "rawtypes", "unchecked" })
152 public static Future<?> scheduleWithFixedDelay(Function f, long initialDelay, long period, TimeUnit unit) {
153 SCLContext context = SCLContext.createDerivedContext();
154 return scheduledExecutor.scheduleWithFixedDelay(() -> {
155 SCLContext.push(context);
157 f.apply(Tuple0.INSTANCE);
158 } catch (Throwable t) {
159 LOGGER.error("Could not execute {} with fixed delay {} {}", f, initialDelay, period, t);
164 }, initialDelay, period, unit);
167 @SuppressWarnings({ "rawtypes", "unchecked" })
168 public static Future<?> schedule(Function f, long delay, TimeUnit unit) {
169 SCLContext context = SCLContext.createDerivedContext();
170 return scheduledExecutor.schedule(() -> {
171 SCLContext.push(context);
173 return f.apply(Tuple0.INSTANCE);
174 } catch (Throwable t) {
175 LOGGER.error("Could not schedule {} with delay {}", f, delay, t);
184 private static void printError(Throwable t) {
185 StringWriter sw = new StringWriter();
186 t.printStackTrace(new PrintWriter(sw));
187 SCLReporting.printError(sw.toString());