]> gerrit.simantics Code Review - simantics/platform.git/commitdiff
Async utilities for SCL
authorjsimomaa <jani.simomaa@gmail.com>
Thu, 14 Jun 2018 19:31:48 +0000 (22:31 +0300)
committerTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Sun, 5 Aug 2018 06:41:33 +0000 (09:41 +0300)
* scheduleAtFixedRate
* scheduleWithFixedDelay
* schedule

gitlab #23

Change-Id: I72ab85f4458a17a83fa4de95bc668d8fadac1d50
(cherry picked from commit 3bf1cf8f931ff10ea17f0c92b94cc49eb42d07d2)

bundles/org.simantics.scl.db/src/org/simantics/scl/db/SCLFunctions.java
bundles/org.simantics.scl.runtime/scl/Async.scl
bundles/org.simantics.scl.runtime/src/org/simantics/scl/runtime/utils/AsyncUtils.java

index aae7a89a26da69df9dc3e40360729ec673b754ed..8f535eb5a904bbf507db3423ce3cc4aa4ecd50e4 100644 (file)
@@ -38,17 +38,21 @@ import org.simantics.scl.runtime.function.Function1;
 import org.simantics.scl.runtime.tuple.Tuple;
 import org.simantics.scl.runtime.tuple.Tuple0;
 import org.simantics.utils.DataContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class SCLFunctions {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(SCLFunctions.class);
+
     public static final String GRAPH = "graph";
 
     public static <T> T safeExec(final Function f) {
         try {
             return (T)f.apply(Tuple0.INSTANCE);
         } catch (Throwable t) {
-            t.printStackTrace();
+            LOGGER.error("safeExec caught exception", t);
             return null;
         }
     }
@@ -93,18 +97,22 @@ public class SCLFunctions {
     
     public static void asyncWrite(final Function f) throws DatabaseException {
         SCLContext context = SCLContext.createDerivedContext();
-        Simantics.getSession().asyncRequest(new WriteRequest() {
-            @Override
-            public void perform(WriteGraph graph) throws DatabaseException {
-                SCLContext.push(context);
-                context.put(GRAPH, graph);
-                try {
-                    f.apply(Tuple0.INSTANCE);
-                } finally {
-                    SCLContext.pop();
+        if (Simantics.peekSession() != null) {
+            Simantics.getSession().asyncRequest(new WriteRequest() {
+                @Override
+                public void perform(WriteGraph graph) throws DatabaseException {
+                    SCLContext.push(context);
+                    context.put(GRAPH, graph);
+                    try {
+                        f.apply(Tuple0.INSTANCE);
+                    } finally {
+                        SCLContext.pop();
+                    }
                 }
-            }
-        });
+            });
+        } else {
+            LOGGER.warn("No session available for asynchronous write requests");
+        }
     }
     
     public static <T> T syncWrite(final Function f) throws DatabaseException {
index f4b3a92983eb38e8484f8d6e34b0e5c7b22f2d29..1988eb5319eebb2c29ed9fa27e2589c766b56f6b 100644 (file)
@@ -1,3 +1,18 @@
+import "Future"
+
+importJava "java.util.concurrent.TimeUnit" where
+    data TimeUnit
+    NANOSECONDS :: TimeUnit
+    MICROSECONDS :: TimeUnit
+    MILLISECONDS :: TimeUnit
+    SECONDS :: TimeUnit
+    HOURS :: TimeUnit
+    DAYS :: TimeUnit
+
 importJava "org.simantics.scl.runtime.utils.AsyncUtils" where
     runAsync :: (<Proc> a) -> <Proc> ()
     pmap :: (a -> <Proc> b) -> [a] -> <Proc> [b]
+
+    scheduleAtFixedRate :: (<Proc> a) -> Long -> Long -> TimeUnit -> <Proc> Future a
+    scheduleWithFixedDelay :: (<Proc> a) -> Long -> Long -> TimeUnit -> <Proc> Future a
+    schedule :: (<Proc> a) -> Long -> TimeUnit -> <Proc> Future a
\ No newline at end of file
index 3392d461c2ed05ae6d305f237740ca4d27f05b91..fcab99c20ae632f4b26320bdafecf66fbd46334b 100644 (file)
@@ -3,34 +3,189 @@ package org.simantics.scl.runtime.utils;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.simantics.scl.runtime.SCLContext;
 import org.simantics.scl.runtime.function.Function;
 import org.simantics.scl.runtime.reporting.SCLReporting;
 import org.simantics.scl.runtime.tuple.Tuple0;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AsyncUtils {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUtils.class);
+    private static final AtomicInteger threadCount = new AtomicInteger(0);
+
+    private static final ThreadFactory threadFactory = r -> {
+        Thread t = new Thread(r, "scl-async-utils-thread-" + threadCount.incrementAndGet());
+        t.setDaemon(true);
+        return t;
+    };
+
+    private static final ScheduledExecutorService scheduledExecutor = Executors
+            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     public static void runAsync(Function f) {
         SCLContext context = SCLContext.createDerivedContext();
-        new Thread() {
-            @Override
-            public void run() {
-                SCLContext.push(context);
-                try {
-                    f.apply(Tuple0.INSTANCE);
-                } catch(Exception e) {
-                    StringWriter sw = new StringWriter();
-                    e.printStackTrace(new PrintWriter(sw));
-                    SCLReporting.printError(sw.toString());
-                } finally {
-                    SCLContext.pop();
-                }
-            }
-        }.start();
-    }
-    
+        scheduledExecutor.submit(() -> {
+            SCLContext.push(context);
+            try {
+                f.apply(Tuple0.INSTANCE);
+            } catch (Exception e) {
+                LOGGER.error("Could not execute async", e);
+                printError(e);
+            } finally {
+                SCLContext.pop();
+            }
+        });
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     public static List<Object> pmap(Function f, List<Object> l) {
         return Arrays.asList(l.parallelStream().map(a -> f.apply(a)).toArray());
     }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static Future<?> submitAsync(Function f) {
+        SCLContext context = SCLContext.createDerivedContext();
+        return scheduledExecutor.submit(() -> {
+            SCLContext.push(context);
+            try {
+                return f.apply(Tuple0.INSTANCE);
+            } catch (Throwable t) {
+                LOGGER.error("Could not execute async {}", f, t);
+                printError(t);
+                return null;
+            } finally {
+                SCLContext.pop();
+            }
+        });
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    public static List<Future<Object>> invokeAllAsync(Collection<Function> fs) throws InterruptedException {
+        return invokeAllAsyncWithTimeout(fs, -1, null);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private static Callable<Object> toCallable(SCLContext context, Function f) {
+        return () -> {
+            SCLContext.push(context);
+            try {
+                return f.apply(Tuple0.INSTANCE);
+            } catch (Throwable t) {
+                LOGGER.error("Could not invoke {}", f, t);
+                printError(t);
+                return null;
+            } finally {
+                SCLContext.pop();
+            }
+        };
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    public static List<Future<Object>> invokeAllAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException {
+        SCLContext context = SCLContext.createDerivedContext();
+        List<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
+        try {
+            if (timeout == -1) {
+                return scheduledExecutor.invokeAll(invokables);
+            } else {
+                return scheduledExecutor.invokeAll(invokables, timeout, unit);
+            }
+        } catch (Throwable t) {
+            LOGGER.error("Could not invoke all", t);
+            throw t;
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    public static Object invokeAnyAsync(Collection<Function> fs) throws InterruptedException, ExecutionException, TimeoutException {
+        return invokeAnyAsyncWithTimeout(fs, -1, null);
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    public static Object invokeAnyAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        SCLContext context = SCLContext.createDerivedContext();
+        Collection<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
+        try {
+            if (timeout == -1) {
+                return scheduledExecutor.invokeAny(invokables);
+            } else {
+                return scheduledExecutor.invokeAny(invokables, timeout, unit);
+            }
+        } catch (Throwable t) {
+            LOGGER.error("Could not invoke any", t);
+            throw t;
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static Future<?> scheduleAtFixedRate(Function f, long initialDelay, long period, TimeUnit unit) {
+        SCLContext context = SCLContext.createDerivedContext();
+        return scheduledExecutor.scheduleAtFixedRate(() -> {
+            SCLContext.push(context);
+            try {
+                f.apply(Tuple0.INSTANCE);
+            } catch (Throwable t) {
+                LOGGER.error("Could not execute {} with fixed rate {} {}", f, initialDelay, period, t);
+                printError(t);
+            } finally {
+                SCLContext.pop();
+            }
+        }, initialDelay, period, unit);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static Future<?> scheduleWithFixedDelay(Function f, long initialDelay, long period, TimeUnit unit) {
+        SCLContext context = SCLContext.createDerivedContext();
+        return scheduledExecutor.scheduleWithFixedDelay(() -> {
+            SCLContext.push(context);
+            try {
+                f.apply(Tuple0.INSTANCE);
+            } catch (Throwable t) {
+                LOGGER.error("Could not execute {} with fixed delay {} {}", f, initialDelay, period, t);
+                printError(t);
+            } finally {
+                SCLContext.pop();
+            }
+        }, initialDelay, period, unit);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static Future<?> schedule(Function f, long delay, TimeUnit unit) {
+        SCLContext context = SCLContext.createDerivedContext();
+        return scheduledExecutor.schedule(() -> {
+            SCLContext.push(context);
+            try {
+                return f.apply(Tuple0.INSTANCE);
+            } catch (Throwable t) {
+                LOGGER.error("Could not schedule {} with delay {}", f, delay, t);
+                printError(t);
+                return null;
+            } finally {
+                SCLContext.pop();
+            }
+        }, delay, unit);
+    }
+
+    private static void printError(Throwable t) {
+        StringWriter sw = new StringWriter();
+        t.printStackTrace(new PrintWriter(sw));
+        SCLReporting.printError(sw.toString());
+    }
+
 }