package org.simantics.scl.runtime.utils; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.simantics.scl.runtime.SCLContext; import org.simantics.scl.runtime.function.Function; import org.simantics.scl.runtime.reporting.SCLReporting; import org.simantics.scl.runtime.tuple.Tuple0; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AsyncUtils { private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUtils.class); private static final AtomicInteger threadCount = new AtomicInteger(0); private static final ThreadFactory threadFactory = r -> { Thread t = new Thread(r, "scl-async-utils-thread-" + threadCount.incrementAndGet()); t.setDaemon(true); return t; }; private static final ScheduledExecutorService scheduledExecutor = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory); @SuppressWarnings({ "rawtypes", "unchecked" }) public static void runAsync(Function f) { SCLContext context = SCLContext.createDerivedContext(); scheduledExecutor.submit(() -> { SCLContext.push(context); try { f.apply(Tuple0.INSTANCE); } catch (Exception e) { LOGGER.error("Could not execute async", e); printError(e); } finally { SCLContext.pop(); } }); } @SuppressWarnings({ "rawtypes", "unchecked" }) public static List pmap(Function f, List l) { return Arrays.asList(l.parallelStream().map(a -> f.apply(a)).toArray()); } @SuppressWarnings({ "rawtypes", "unchecked" }) public static Future submitAsync(Function f) { SCLContext context = SCLContext.createDerivedContext(); return scheduledExecutor.submit(() -> { SCLContext.push(context); try { return f.apply(Tuple0.INSTANCE); } catch (Throwable t) { LOGGER.error("Could not execute async {}", f, t); printError(t); return null; } finally { SCLContext.pop(); } }); } @SuppressWarnings({ "rawtypes" }) public static List> invokeAllAsync(Collection fs) throws InterruptedException { return invokeAllAsyncWithTimeout(fs, -1, null); } @SuppressWarnings({ "unchecked", "rawtypes" }) private static Callable toCallable(SCLContext context, Function f) { return () -> { SCLContext.push(context); try { return f.apply(Tuple0.INSTANCE); } catch (Throwable t) { LOGGER.error("Could not invoke {}", f, t); printError(t); return null; } finally { SCLContext.pop(); } }; } @SuppressWarnings({ "rawtypes" }) public static List> invokeAllAsyncWithTimeout(Collection fs, long timeout, TimeUnit unit) throws InterruptedException { SCLContext context = SCLContext.createDerivedContext(); List> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList()); try { if (timeout == -1) { return scheduledExecutor.invokeAll(invokables); } else { return scheduledExecutor.invokeAll(invokables, timeout, unit); } } catch (Throwable t) { LOGGER.error("Could not invoke all", t); throw t; } } @SuppressWarnings({ "rawtypes" }) public static Object invokeAnyAsync(Collection fs) throws InterruptedException, ExecutionException, TimeoutException { return invokeAnyAsyncWithTimeout(fs, -1, null); } @SuppressWarnings({ "rawtypes" }) public static Object invokeAnyAsyncWithTimeout(Collection fs, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { SCLContext context = SCLContext.createDerivedContext(); Collection> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList()); try { if (timeout == -1) { return scheduledExecutor.invokeAny(invokables); } else { return scheduledExecutor.invokeAny(invokables, timeout, unit); } } catch (Throwable t) { LOGGER.error("Could not invoke any", t); throw t; } } @SuppressWarnings({ "rawtypes", "unchecked" }) public static Future scheduleAtFixedRate(Function f, long initialDelay, long period, TimeUnit unit) { SCLContext context = SCLContext.createDerivedContext(); return scheduledExecutor.scheduleAtFixedRate(() -> { SCLContext.push(context); try { f.apply(Tuple0.INSTANCE); } catch (Throwable t) { LOGGER.error("Could not execute {} with fixed rate {} {}", f, initialDelay, period, t); printError(t); } finally { SCLContext.pop(); } }, initialDelay, period, unit); } @SuppressWarnings({ "rawtypes", "unchecked" }) public static Future scheduleWithFixedDelay(Function f, long initialDelay, long period, TimeUnit unit) { SCLContext context = SCLContext.createDerivedContext(); return scheduledExecutor.scheduleWithFixedDelay(() -> { SCLContext.push(context); try { f.apply(Tuple0.INSTANCE); } catch (Throwable t) { LOGGER.error("Could not execute {} with fixed delay {} {}", f, initialDelay, period, t); printError(t); } finally { SCLContext.pop(); } }, initialDelay, period, unit); } @SuppressWarnings({ "rawtypes", "unchecked" }) public static Future schedule(Function f, long delay, TimeUnit unit) { SCLContext context = SCLContext.createDerivedContext(); return scheduledExecutor.schedule(() -> { SCLContext.push(context); try { return f.apply(Tuple0.INSTANCE); } catch (Throwable t) { LOGGER.error("Could not schedule {} with delay {}", f, delay, t); printError(t); return null; } finally { SCLContext.pop(); } }, delay, unit); } private static void printError(Throwable t) { StringWriter sw = new StringWriter(); t.printStackTrace(new PrintWriter(sw)); SCLReporting.printError(sw.toString()); } }