X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.scl.runtime%2Fsrc%2Forg%2Fsimantics%2Fscl%2Fruntime%2Futils%2FAsyncUtils.java;fp=bundles%2Forg.simantics.scl.runtime%2Fsrc%2Forg%2Fsimantics%2Fscl%2Fruntime%2Futils%2FAsyncUtils.java;h=fcab99c20ae632f4b26320bdafecf66fbd46334b;hp=3392d461c2ed05ae6d305f237740ca4d27f05b91;hb=3bf1cf8f931ff10ea17f0c92b94cc49eb42d07d2;hpb=951846b17c8c02759b7a319b0f2952cafacb8bea diff --git a/bundles/org.simantics.scl.runtime/src/org/simantics/scl/runtime/utils/AsyncUtils.java b/bundles/org.simantics.scl.runtime/src/org/simantics/scl/runtime/utils/AsyncUtils.java index 3392d461c..fcab99c20 100644 --- a/bundles/org.simantics.scl.runtime/src/org/simantics/scl/runtime/utils/AsyncUtils.java +++ b/bundles/org.simantics.scl.runtime/src/org/simantics/scl/runtime/utils/AsyncUtils.java @@ -3,34 +3,189 @@ package org.simantics.scl.runtime.utils; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.simantics.scl.runtime.SCLContext; import org.simantics.scl.runtime.function.Function; import org.simantics.scl.runtime.reporting.SCLReporting; import org.simantics.scl.runtime.tuple.Tuple0; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AsyncUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUtils.class); + private static final AtomicInteger threadCount = new AtomicInteger(0); + + private static final ThreadFactory threadFactory = r -> { + Thread t = new Thread(r, "scl-async-utils-thread-" + threadCount.incrementAndGet()); + t.setDaemon(true); + return t; + }; + + private static final ScheduledExecutorService scheduledExecutor = Executors + .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory); + + @SuppressWarnings({ "rawtypes", "unchecked" }) public static void runAsync(Function f) { SCLContext context = SCLContext.createDerivedContext(); - new Thread() { - @Override - public void run() { - SCLContext.push(context); - try { - f.apply(Tuple0.INSTANCE); - } catch(Exception e) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - SCLReporting.printError(sw.toString()); - } finally { - SCLContext.pop(); - } - } - }.start(); - } - + scheduledExecutor.submit(() -> { + SCLContext.push(context); + try { + f.apply(Tuple0.INSTANCE); + } catch (Exception e) { + LOGGER.error("Could not execute async", e); + printError(e); + } finally { + SCLContext.pop(); + } + }); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) public static List pmap(Function f, List l) { return Arrays.asList(l.parallelStream().map(a -> f.apply(a)).toArray()); } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Future submitAsync(Function f) { + SCLContext context = SCLContext.createDerivedContext(); + return scheduledExecutor.submit(() -> { + SCLContext.push(context); + try { + return f.apply(Tuple0.INSTANCE); + } catch (Throwable t) { + LOGGER.error("Could not execute async {}", f, t); + printError(t); + return null; + } finally { + SCLContext.pop(); + } + }); + } + + @SuppressWarnings({ "rawtypes" }) + public static List> invokeAllAsync(Collection fs) throws InterruptedException { + return invokeAllAsyncWithTimeout(fs, -1, null); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static Callable toCallable(SCLContext context, Function f) { + return () -> { + SCLContext.push(context); + try { + return f.apply(Tuple0.INSTANCE); + } catch (Throwable t) { + LOGGER.error("Could not invoke {}", f, t); + printError(t); + return null; + } finally { + SCLContext.pop(); + } + }; + } + + @SuppressWarnings({ "rawtypes" }) + public static List> invokeAllAsyncWithTimeout(Collection fs, long timeout, TimeUnit unit) throws InterruptedException { + SCLContext context = SCLContext.createDerivedContext(); + List> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList()); + try { + if (timeout == -1) { + return scheduledExecutor.invokeAll(invokables); + } else { + return scheduledExecutor.invokeAll(invokables, timeout, unit); + } + } catch (Throwable t) { + LOGGER.error("Could not invoke all", t); + throw t; + } + } + + @SuppressWarnings({ "rawtypes" }) + public static Object invokeAnyAsync(Collection fs) throws InterruptedException, ExecutionException, TimeoutException { + return invokeAnyAsyncWithTimeout(fs, -1, null); + } + + @SuppressWarnings({ "rawtypes" }) + public static Object invokeAnyAsyncWithTimeout(Collection fs, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + SCLContext context = SCLContext.createDerivedContext(); + Collection> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList()); + try { + if (timeout == -1) { + return scheduledExecutor.invokeAny(invokables); + } else { + return scheduledExecutor.invokeAny(invokables, timeout, unit); + } + } catch (Throwable t) { + LOGGER.error("Could not invoke any", t); + throw t; + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Future scheduleAtFixedRate(Function f, long initialDelay, long period, TimeUnit unit) { + SCLContext context = SCLContext.createDerivedContext(); + return scheduledExecutor.scheduleAtFixedRate(() -> { + SCLContext.push(context); + try { + f.apply(Tuple0.INSTANCE); + } catch (Throwable t) { + LOGGER.error("Could not execute {} with fixed rate {} {}", f, initialDelay, period, t); + printError(t); + } finally { + SCLContext.pop(); + } + }, initialDelay, period, unit); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Future scheduleWithFixedDelay(Function f, long initialDelay, long period, TimeUnit unit) { + SCLContext context = SCLContext.createDerivedContext(); + return scheduledExecutor.scheduleWithFixedDelay(() -> { + SCLContext.push(context); + try { + f.apply(Tuple0.INSTANCE); + } catch (Throwable t) { + LOGGER.error("Could not execute {} with fixed delay {} {}", f, initialDelay, period, t); + printError(t); + } finally { + SCLContext.pop(); + } + }, initialDelay, period, unit); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Future schedule(Function f, long delay, TimeUnit unit) { + SCLContext context = SCLContext.createDerivedContext(); + return scheduledExecutor.schedule(() -> { + SCLContext.push(context); + try { + return f.apply(Tuple0.INSTANCE); + } catch (Throwable t) { + LOGGER.error("Could not schedule {} with delay {}", f, delay, t); + printError(t); + return null; + } finally { + SCLContext.pop(); + } + }, delay, unit); + } + + private static void printError(Throwable t) { + StringWriter sw = new StringWriter(); + t.printStackTrace(new PrintWriter(sw)); + SCLReporting.printError(sw.toString()); + } + }