From 3bf1cf8f931ff10ea17f0c92b94cc49eb42d07d2 Mon Sep 17 00:00:00 2001 From: jsimomaa Date: Thu, 14 Jun 2018 22:31:48 +0300 Subject: [PATCH] Async utilities for SCL * scheduleAtFixedRate * scheduleWithFixedDelay * schedule gitlab #23 Change-Id: I72ab85f4458a17a83fa4de95bc668d8fadac1d50 --- .../org/simantics/scl/db/SCLFunctions.java | 32 +-- .../org.simantics.scl.runtime/scl/Async.scl | 15 ++ .../scl/runtime/utils/AsyncUtils.java | 189 ++++++++++++++++-- 3 files changed, 207 insertions(+), 29 deletions(-) diff --git a/bundles/org.simantics.scl.db/src/org/simantics/scl/db/SCLFunctions.java b/bundles/org.simantics.scl.db/src/org/simantics/scl/db/SCLFunctions.java index 618d6a91f..23c1ad508 100644 --- a/bundles/org.simantics.scl.db/src/org/simantics/scl/db/SCLFunctions.java +++ b/bundles/org.simantics.scl.db/src/org/simantics/scl/db/SCLFunctions.java @@ -39,17 +39,21 @@ import org.simantics.scl.runtime.function.Function1; import org.simantics.scl.runtime.tuple.Tuple; import org.simantics.scl.runtime.tuple.Tuple0; import org.simantics.utils.DataContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings({"rawtypes", "unchecked"}) public class SCLFunctions { + private static final Logger LOGGER = LoggerFactory.getLogger(SCLFunctions.class); + public static final String GRAPH = "graph"; public static T safeExec(final Function f) { try { return (T)f.apply(Tuple0.INSTANCE); } catch (Throwable t) { - t.printStackTrace(); + LOGGER.error("safeExec caught exception", t); return null; } } @@ -94,18 +98,22 @@ public class SCLFunctions { public static void asyncWrite(final Function f) throws DatabaseException { SCLContext context = SCLContext.createDerivedContext(); - Simantics.getSession().asyncRequest(new WriteRequest() { - @Override - public void perform(WriteGraph graph) throws DatabaseException { - SCLContext.push(context); - context.put(GRAPH, graph); - try { - f.apply(Tuple0.INSTANCE); - } finally { - SCLContext.pop(); + if (Simantics.peekSession() != null) { + Simantics.getSession().asyncRequest(new WriteRequest() { + @Override + public void perform(WriteGraph graph) throws DatabaseException { + SCLContext.push(context); + context.put(GRAPH, graph); + try { + f.apply(Tuple0.INSTANCE); + } finally { + SCLContext.pop(); + } } - } - }); + }); + } else { + LOGGER.warn("No session available for asynchronous write requests"); + } } public static T syncWrite(final Function f) throws DatabaseException { diff --git a/bundles/org.simantics.scl.runtime/scl/Async.scl b/bundles/org.simantics.scl.runtime/scl/Async.scl index f4b3a9298..1988eb531 100644 --- a/bundles/org.simantics.scl.runtime/scl/Async.scl +++ b/bundles/org.simantics.scl.runtime/scl/Async.scl @@ -1,3 +1,18 @@ +import "Future" + +importJava "java.util.concurrent.TimeUnit" where + data TimeUnit + NANOSECONDS :: TimeUnit + MICROSECONDS :: TimeUnit + MILLISECONDS :: TimeUnit + SECONDS :: TimeUnit + HOURS :: TimeUnit + DAYS :: TimeUnit + importJava "org.simantics.scl.runtime.utils.AsyncUtils" where runAsync :: ( a) -> () pmap :: (a -> b) -> [a] -> [b] + + scheduleAtFixedRate :: ( a) -> Long -> Long -> TimeUnit -> Future a + scheduleWithFixedDelay :: ( a) -> Long -> Long -> TimeUnit -> Future a + schedule :: ( a) -> Long -> TimeUnit -> Future a \ No newline at end of file diff --git a/bundles/org.simantics.scl.runtime/src/org/simantics/scl/runtime/utils/AsyncUtils.java b/bundles/org.simantics.scl.runtime/src/org/simantics/scl/runtime/utils/AsyncUtils.java index 3392d461c..fcab99c20 100644 --- a/bundles/org.simantics.scl.runtime/src/org/simantics/scl/runtime/utils/AsyncUtils.java +++ b/bundles/org.simantics.scl.runtime/src/org/simantics/scl/runtime/utils/AsyncUtils.java @@ -3,34 +3,189 @@ package org.simantics.scl.runtime.utils; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.simantics.scl.runtime.SCLContext; import org.simantics.scl.runtime.function.Function; import org.simantics.scl.runtime.reporting.SCLReporting; import org.simantics.scl.runtime.tuple.Tuple0; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AsyncUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUtils.class); + private static final AtomicInteger threadCount = new AtomicInteger(0); + + private static final ThreadFactory threadFactory = r -> { + Thread t = new Thread(r, "scl-async-utils-thread-" + threadCount.incrementAndGet()); + t.setDaemon(true); + return t; + }; + + private static final ScheduledExecutorService scheduledExecutor = Executors + .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory); + + @SuppressWarnings({ "rawtypes", "unchecked" }) public static void runAsync(Function f) { SCLContext context = SCLContext.createDerivedContext(); - new Thread() { - @Override - public void run() { - SCLContext.push(context); - try { - f.apply(Tuple0.INSTANCE); - } catch(Exception e) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - SCLReporting.printError(sw.toString()); - } finally { - SCLContext.pop(); - } - } - }.start(); - } - + scheduledExecutor.submit(() -> { + SCLContext.push(context); + try { + f.apply(Tuple0.INSTANCE); + } catch (Exception e) { + LOGGER.error("Could not execute async", e); + printError(e); + } finally { + SCLContext.pop(); + } + }); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) public static List pmap(Function f, List l) { return Arrays.asList(l.parallelStream().map(a -> f.apply(a)).toArray()); } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Future submitAsync(Function f) { + SCLContext context = SCLContext.createDerivedContext(); + return scheduledExecutor.submit(() -> { + SCLContext.push(context); + try { + return f.apply(Tuple0.INSTANCE); + } catch (Throwable t) { + LOGGER.error("Could not execute async {}", f, t); + printError(t); + return null; + } finally { + SCLContext.pop(); + } + }); + } + + @SuppressWarnings({ "rawtypes" }) + public static List> invokeAllAsync(Collection fs) throws InterruptedException { + return invokeAllAsyncWithTimeout(fs, -1, null); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static Callable toCallable(SCLContext context, Function f) { + return () -> { + SCLContext.push(context); + try { + return f.apply(Tuple0.INSTANCE); + } catch (Throwable t) { + LOGGER.error("Could not invoke {}", f, t); + printError(t); + return null; + } finally { + SCLContext.pop(); + } + }; + } + + @SuppressWarnings({ "rawtypes" }) + public static List> invokeAllAsyncWithTimeout(Collection fs, long timeout, TimeUnit unit) throws InterruptedException { + SCLContext context = SCLContext.createDerivedContext(); + List> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList()); + try { + if (timeout == -1) { + return scheduledExecutor.invokeAll(invokables); + } else { + return scheduledExecutor.invokeAll(invokables, timeout, unit); + } + } catch (Throwable t) { + LOGGER.error("Could not invoke all", t); + throw t; + } + } + + @SuppressWarnings({ "rawtypes" }) + public static Object invokeAnyAsync(Collection fs) throws InterruptedException, ExecutionException, TimeoutException { + return invokeAnyAsyncWithTimeout(fs, -1, null); + } + + @SuppressWarnings({ "rawtypes" }) + public static Object invokeAnyAsyncWithTimeout(Collection fs, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + SCLContext context = SCLContext.createDerivedContext(); + Collection> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList()); + try { + if (timeout == -1) { + return scheduledExecutor.invokeAny(invokables); + } else { + return scheduledExecutor.invokeAny(invokables, timeout, unit); + } + } catch (Throwable t) { + LOGGER.error("Could not invoke any", t); + throw t; + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Future scheduleAtFixedRate(Function f, long initialDelay, long period, TimeUnit unit) { + SCLContext context = SCLContext.createDerivedContext(); + return scheduledExecutor.scheduleAtFixedRate(() -> { + SCLContext.push(context); + try { + f.apply(Tuple0.INSTANCE); + } catch (Throwable t) { + LOGGER.error("Could not execute {} with fixed rate {} {}", f, initialDelay, period, t); + printError(t); + } finally { + SCLContext.pop(); + } + }, initialDelay, period, unit); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Future scheduleWithFixedDelay(Function f, long initialDelay, long period, TimeUnit unit) { + SCLContext context = SCLContext.createDerivedContext(); + return scheduledExecutor.scheduleWithFixedDelay(() -> { + SCLContext.push(context); + try { + f.apply(Tuple0.INSTANCE); + } catch (Throwable t) { + LOGGER.error("Could not execute {} with fixed delay {} {}", f, initialDelay, period, t); + printError(t); + } finally { + SCLContext.pop(); + } + }, initialDelay, period, unit); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Future schedule(Function f, long delay, TimeUnit unit) { + SCLContext context = SCLContext.createDerivedContext(); + return scheduledExecutor.schedule(() -> { + SCLContext.push(context); + try { + return f.apply(Tuple0.INSTANCE); + } catch (Throwable t) { + LOGGER.error("Could not schedule {} with delay {}", f, delay, t); + printError(t); + return null; + } finally { + SCLContext.pop(); + } + }, delay, unit); + } + + private static void printError(Throwable t) { + StringWriter sw = new StringWriter(); + t.printStackTrace(new PrintWriter(sw)); + SCLReporting.printError(sw.toString()); + } + } -- 2.43.2