1 package org.simantics.scl.runtime.utils;
3 import java.io.PrintWriter;
4 import java.io.StringWriter;
5 import java.util.Arrays;
6 import java.util.Collection;
8 import java.util.concurrent.Callable;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.ThreadFactory;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.TimeoutException;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.stream.Collectors;
19 import org.simantics.scl.runtime.SCLContext;
20 import org.simantics.scl.runtime.function.Function;
21 import org.simantics.scl.runtime.reporting.SCLReporting;
22 import org.simantics.scl.runtime.tuple.Tuple0;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 public class AsyncUtils {
28 private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUtils.class);
29 private static final AtomicInteger threadCount = new AtomicInteger(0);
31 private static final ThreadFactory threadFactory = r -> {
32 Thread t = new Thread(r, "scl-async-utils-thread-" + threadCount.incrementAndGet());
37 private static final ScheduledExecutorService scheduledExecutor = Executors
38 .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
40 @SuppressWarnings({ "rawtypes", "unchecked" })
41 public static void runAsync(Function f) {
42 SCLContext context = SCLContext.createDerivedContext();
43 scheduledExecutor.submit(() -> {
44 SCLContext.push(context);
46 f.apply(Tuple0.INSTANCE);
47 } catch (Exception e) {
48 LOGGER.error("Could not execute async", e);
56 @SuppressWarnings({ "rawtypes", "unchecked" })
57 public static List<Object> pmap(Function f, List<Object> l) {
58 return Arrays.asList(l.parallelStream().map(a -> f.apply(a)).toArray());
61 @SuppressWarnings({ "rawtypes", "unchecked" })
62 public static Future<?> submitAsync(Function f) {
63 SCLContext context = SCLContext.createDerivedContext();
64 return scheduledExecutor.submit(() -> {
65 SCLContext.push(context);
67 return f.apply(Tuple0.INSTANCE);
68 } catch (Throwable t) {
69 LOGGER.error("Could not execute async {}", f, t);
78 @SuppressWarnings({ "rawtypes" })
79 public static List<Future<Object>> invokeAllAsync(Collection<Function> fs) throws InterruptedException {
80 return invokeAllAsyncWithTimeout(fs, -1, null);
83 @SuppressWarnings({ "unchecked", "rawtypes" })
84 private static Callable<Object> toCallable(SCLContext context, Function f) {
86 SCLContext.push(context);
88 return f.apply(Tuple0.INSTANCE);
89 } catch (Throwable t) {
90 LOGGER.error("Could not invoke {}", f, t);
99 @SuppressWarnings({ "rawtypes" })
100 public static List<Future<Object>> invokeAllAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException {
101 SCLContext context = SCLContext.createDerivedContext();
102 List<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
105 return scheduledExecutor.invokeAll(invokables);
107 return scheduledExecutor.invokeAll(invokables, timeout, unit);
109 } catch (Throwable t) {
110 LOGGER.error("Could not invoke all", t);
115 @SuppressWarnings({ "rawtypes" })
116 public static Object invokeAnyAsync(Collection<Function> fs) throws InterruptedException, ExecutionException, TimeoutException {
117 return invokeAnyAsyncWithTimeout(fs, -1, null);
120 @SuppressWarnings({ "rawtypes" })
121 public static Object invokeAnyAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
122 SCLContext context = SCLContext.createDerivedContext();
123 Collection<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
126 return scheduledExecutor.invokeAny(invokables);
128 return scheduledExecutor.invokeAny(invokables, timeout, unit);
130 } catch (Throwable t) {
131 LOGGER.error("Could not invoke any", t);
136 @SuppressWarnings({ "rawtypes", "unchecked" })
137 public static Future<?> scheduleAtFixedRate(Function f, long initialDelay, long period, TimeUnit unit) {
138 SCLContext context = SCLContext.createDerivedContext();
139 return scheduledExecutor.scheduleAtFixedRate(() -> {
140 SCLContext.push(context);
142 f.apply(Tuple0.INSTANCE);
143 } catch (Throwable t) {
144 LOGGER.error("Could not execute {} with fixed rate {} {}", f, initialDelay, period, t);
149 }, initialDelay, period, unit);
152 @SuppressWarnings({ "rawtypes", "unchecked" })
153 public static Future<?> scheduleWithFixedDelay(Function f, long initialDelay, long period, TimeUnit unit) {
154 SCLContext context = SCLContext.createDerivedContext();
155 return scheduledExecutor.scheduleWithFixedDelay(() -> {
156 SCLContext.push(context);
158 f.apply(Tuple0.INSTANCE);
159 } catch (Throwable t) {
160 LOGGER.error("Could not execute {} with fixed delay {} {}", f, initialDelay, period, t);
165 }, initialDelay, period, unit);
168 @SuppressWarnings({ "rawtypes", "unchecked" })
169 public static Future<?> schedule(Function f, long delay, TimeUnit unit) {
170 SCLContext context = SCLContext.createDerivedContext();
171 return scheduledExecutor.schedule(() -> {
172 SCLContext.push(context);
174 return f.apply(Tuple0.INSTANCE);
175 } catch (Throwable t) {
176 LOGGER.error("Could not schedule {} with delay {}", f, delay, t);
185 private static void printError(Throwable t) {
186 StringWriter sw = new StringWriter();
187 t.printStackTrace(new PrintWriter(sw));
188 SCLReporting.printError(sw.toString());