]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.scl.runtime/src/org/simantics/scl/runtime/utils/AsyncUtils.java
Async utilities for SCL
[simantics/platform.git] / bundles / org.simantics.scl.runtime / src / org / simantics / scl / runtime / utils / AsyncUtils.java
index 3392d461c2ed05ae6d305f237740ca4d27f05b91..fcab99c20ae632f4b26320bdafecf66fbd46334b 100644 (file)
@@ -3,34 +3,189 @@ package org.simantics.scl.runtime.utils;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.simantics.scl.runtime.SCLContext;
 import org.simantics.scl.runtime.function.Function;
 import org.simantics.scl.runtime.reporting.SCLReporting;
 import org.simantics.scl.runtime.tuple.Tuple0;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AsyncUtils {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUtils.class);
+    private static final AtomicInteger threadCount = new AtomicInteger(0);
+
+    private static final ThreadFactory threadFactory = r -> {
+        Thread t = new Thread(r, "scl-async-utils-thread-" + threadCount.incrementAndGet());
+        t.setDaemon(true);
+        return t;
+    };
+
+    private static final ScheduledExecutorService scheduledExecutor = Executors
+            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     public static void runAsync(Function f) {
         SCLContext context = SCLContext.createDerivedContext();
-        new Thread() {
-            @Override
-            public void run() {
-                SCLContext.push(context);
-                try {
-                    f.apply(Tuple0.INSTANCE);
-                } catch(Exception e) {
-                    StringWriter sw = new StringWriter();
-                    e.printStackTrace(new PrintWriter(sw));
-                    SCLReporting.printError(sw.toString());
-                } finally {
-                    SCLContext.pop();
-                }
-            }
-        }.start();
-    }
-    
+        scheduledExecutor.submit(() -> {
+            SCLContext.push(context);
+            try {
+                f.apply(Tuple0.INSTANCE);
+            } catch (Exception e) {
+                LOGGER.error("Could not execute async", e);
+                printError(e);
+            } finally {
+                SCLContext.pop();
+            }
+        });
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     public static List<Object> pmap(Function f, List<Object> l) {
         return Arrays.asList(l.parallelStream().map(a -> f.apply(a)).toArray());
     }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static Future<?> submitAsync(Function f) {
+        SCLContext context = SCLContext.createDerivedContext();
+        return scheduledExecutor.submit(() -> {
+            SCLContext.push(context);
+            try {
+                return f.apply(Tuple0.INSTANCE);
+            } catch (Throwable t) {
+                LOGGER.error("Could not execute async {}", f, t);
+                printError(t);
+                return null;
+            } finally {
+                SCLContext.pop();
+            }
+        });
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    public static List<Future<Object>> invokeAllAsync(Collection<Function> fs) throws InterruptedException {
+        return invokeAllAsyncWithTimeout(fs, -1, null);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private static Callable<Object> toCallable(SCLContext context, Function f) {
+        return () -> {
+            SCLContext.push(context);
+            try {
+                return f.apply(Tuple0.INSTANCE);
+            } catch (Throwable t) {
+                LOGGER.error("Could not invoke {}", f, t);
+                printError(t);
+                return null;
+            } finally {
+                SCLContext.pop();
+            }
+        };
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    public static List<Future<Object>> invokeAllAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException {
+        SCLContext context = SCLContext.createDerivedContext();
+        List<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
+        try {
+            if (timeout == -1) {
+                return scheduledExecutor.invokeAll(invokables);
+            } else {
+                return scheduledExecutor.invokeAll(invokables, timeout, unit);
+            }
+        } catch (Throwable t) {
+            LOGGER.error("Could not invoke all", t);
+            throw t;
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    public static Object invokeAnyAsync(Collection<Function> fs) throws InterruptedException, ExecutionException, TimeoutException {
+        return invokeAnyAsyncWithTimeout(fs, -1, null);
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    public static Object invokeAnyAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        SCLContext context = SCLContext.createDerivedContext();
+        Collection<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
+        try {
+            if (timeout == -1) {
+                return scheduledExecutor.invokeAny(invokables);
+            } else {
+                return scheduledExecutor.invokeAny(invokables, timeout, unit);
+            }
+        } catch (Throwable t) {
+            LOGGER.error("Could not invoke any", t);
+            throw t;
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static Future<?> scheduleAtFixedRate(Function f, long initialDelay, long period, TimeUnit unit) {
+        SCLContext context = SCLContext.createDerivedContext();
+        return scheduledExecutor.scheduleAtFixedRate(() -> {
+            SCLContext.push(context);
+            try {
+                f.apply(Tuple0.INSTANCE);
+            } catch (Throwable t) {
+                LOGGER.error("Could not execute {} with fixed rate {} {}", f, initialDelay, period, t);
+                printError(t);
+            } finally {
+                SCLContext.pop();
+            }
+        }, initialDelay, period, unit);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static Future<?> scheduleWithFixedDelay(Function f, long initialDelay, long period, TimeUnit unit) {
+        SCLContext context = SCLContext.createDerivedContext();
+        return scheduledExecutor.scheduleWithFixedDelay(() -> {
+            SCLContext.push(context);
+            try {
+                f.apply(Tuple0.INSTANCE);
+            } catch (Throwable t) {
+                LOGGER.error("Could not execute {} with fixed delay {} {}", f, initialDelay, period, t);
+                printError(t);
+            } finally {
+                SCLContext.pop();
+            }
+        }, initialDelay, period, unit);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static Future<?> schedule(Function f, long delay, TimeUnit unit) {
+        SCLContext context = SCLContext.createDerivedContext();
+        return scheduledExecutor.schedule(() -> {
+            SCLContext.push(context);
+            try {
+                return f.apply(Tuple0.INSTANCE);
+            } catch (Throwable t) {
+                LOGGER.error("Could not schedule {} with delay {}", f, delay, t);
+                printError(t);
+                return null;
+            } finally {
+                SCLContext.pop();
+            }
+        }, delay, unit);
+    }
+
+    private static void printError(Throwable t) {
+        StringWriter sw = new StringWriter();
+        t.printStackTrace(new PrintWriter(sw));
+        SCLReporting.printError(sw.toString());
+    }
+
 }