import org.simantics.scl.runtime.tuple.Tuple;
import org.simantics.scl.runtime.tuple.Tuple0;
import org.simantics.utils.DataContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings({"rawtypes", "unchecked"})
public class SCLFunctions {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SCLFunctions.class);
+
public static final String GRAPH = "graph";
public static <T> T safeExec(final Function f) {
try {
return (T)f.apply(Tuple0.INSTANCE);
} catch (Throwable t) {
- t.printStackTrace();
+ LOGGER.error("safeExec caught exception", t);
return null;
}
}
public static void asyncWrite(final Function f) throws DatabaseException {
SCLContext context = SCLContext.createDerivedContext();
- Simantics.getSession().asyncRequest(new WriteRequest() {
- @Override
- public void perform(WriteGraph graph) throws DatabaseException {
- SCLContext.push(context);
- context.put(GRAPH, graph);
- try {
- f.apply(Tuple0.INSTANCE);
- } finally {
- SCLContext.pop();
+ if (Simantics.peekSession() != null) {
+ Simantics.getSession().asyncRequest(new WriteRequest() {
+ @Override
+ public void perform(WriteGraph graph) throws DatabaseException {
+ SCLContext.push(context);
+ context.put(GRAPH, graph);
+ try {
+ f.apply(Tuple0.INSTANCE);
+ } finally {
+ SCLContext.pop();
+ }
}
- }
- });
+ });
+ } else {
+ LOGGER.warn("No session available for asynchronous write requests");
+ }
}
public static <T> T syncWrite(final Function f) throws DatabaseException {
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.simantics.scl.runtime.SCLContext;
import org.simantics.scl.runtime.function.Function;
import org.simantics.scl.runtime.reporting.SCLReporting;
import org.simantics.scl.runtime.tuple.Tuple0;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AsyncUtils {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUtils.class);
+ private static final AtomicInteger threadCount = new AtomicInteger(0);
+
+ private static final ThreadFactory threadFactory = r -> {
+ Thread t = new Thread(r, "scl-async-utils-thread-" + threadCount.incrementAndGet());
+ t.setDaemon(true);
+ return t;
+ };
+
+ private static final ScheduledExecutorService scheduledExecutor = Executors
+ .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public static void runAsync(Function f) {
SCLContext context = SCLContext.createDerivedContext();
- new Thread() {
- @Override
- public void run() {
- SCLContext.push(context);
- try {
- f.apply(Tuple0.INSTANCE);
- } catch(Exception e) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- SCLReporting.printError(sw.toString());
- } finally {
- SCLContext.pop();
- }
- }
- }.start();
- }
-
+ scheduledExecutor.submit(() -> {
+ SCLContext.push(context);
+ try {
+ f.apply(Tuple0.INSTANCE);
+ } catch (Exception e) {
+ LOGGER.error("Could not execute async", e);
+ printError(e);
+ } finally {
+ SCLContext.pop();
+ }
+ });
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public static List<Object> pmap(Function f, List<Object> l) {
return Arrays.asList(l.parallelStream().map(a -> f.apply(a)).toArray());
}
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static Future<?> submitAsync(Function f) {
+ SCLContext context = SCLContext.createDerivedContext();
+ return scheduledExecutor.submit(() -> {
+ SCLContext.push(context);
+ try {
+ return f.apply(Tuple0.INSTANCE);
+ } catch (Throwable t) {
+ LOGGER.error("Could not execute async {}", f, t);
+ printError(t);
+ return null;
+ } finally {
+ SCLContext.pop();
+ }
+ });
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ public static List<Future<Object>> invokeAllAsync(Collection<Function> fs) throws InterruptedException {
+ return invokeAllAsyncWithTimeout(fs, -1, null);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private static Callable<Object> toCallable(SCLContext context, Function f) {
+ return () -> {
+ SCLContext.push(context);
+ try {
+ return f.apply(Tuple0.INSTANCE);
+ } catch (Throwable t) {
+ LOGGER.error("Could not invoke {}", f, t);
+ printError(t);
+ return null;
+ } finally {
+ SCLContext.pop();
+ }
+ };
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ public static List<Future<Object>> invokeAllAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException {
+ SCLContext context = SCLContext.createDerivedContext();
+ List<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
+ try {
+ if (timeout == -1) {
+ return scheduledExecutor.invokeAll(invokables);
+ } else {
+ return scheduledExecutor.invokeAll(invokables, timeout, unit);
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Could not invoke all", t);
+ throw t;
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ public static Object invokeAnyAsync(Collection<Function> fs) throws InterruptedException, ExecutionException, TimeoutException {
+ return invokeAnyAsyncWithTimeout(fs, -1, null);
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ public static Object invokeAnyAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ SCLContext context = SCLContext.createDerivedContext();
+ Collection<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
+ try {
+ if (timeout == -1) {
+ return scheduledExecutor.invokeAny(invokables);
+ } else {
+ return scheduledExecutor.invokeAny(invokables, timeout, unit);
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Could not invoke any", t);
+ throw t;
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static Future<?> scheduleAtFixedRate(Function f, long initialDelay, long period, TimeUnit unit) {
+ SCLContext context = SCLContext.createDerivedContext();
+ return scheduledExecutor.scheduleAtFixedRate(() -> {
+ SCLContext.push(context);
+ try {
+ f.apply(Tuple0.INSTANCE);
+ } catch (Throwable t) {
+ LOGGER.error("Could not execute {} with fixed rate {} {}", f, initialDelay, period, t);
+ printError(t);
+ } finally {
+ SCLContext.pop();
+ }
+ }, initialDelay, period, unit);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static Future<?> scheduleWithFixedDelay(Function f, long initialDelay, long period, TimeUnit unit) {
+ SCLContext context = SCLContext.createDerivedContext();
+ return scheduledExecutor.scheduleWithFixedDelay(() -> {
+ SCLContext.push(context);
+ try {
+ f.apply(Tuple0.INSTANCE);
+ } catch (Throwable t) {
+ LOGGER.error("Could not execute {} with fixed delay {} {}", f, initialDelay, period, t);
+ printError(t);
+ } finally {
+ SCLContext.pop();
+ }
+ }, initialDelay, period, unit);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static Future<?> schedule(Function f, long delay, TimeUnit unit) {
+ SCLContext context = SCLContext.createDerivedContext();
+ return scheduledExecutor.schedule(() -> {
+ SCLContext.push(context);
+ try {
+ return f.apply(Tuple0.INSTANCE);
+ } catch (Throwable t) {
+ LOGGER.error("Could not schedule {} with delay {}", f, delay, t);
+ printError(t);
+ return null;
+ } finally {
+ SCLContext.pop();
+ }
+ }, delay, unit);
+ }
+
+ private static void printError(Throwable t) {
+ StringWriter sw = new StringWriter();
+ t.printStackTrace(new PrintWriter(sw));
+ SCLReporting.printError(sw.toString());
+ }
+
}