+ SCLContext context = SCLContext.createDerivedContext();
+ scheduledExecutor.submit(() -> {
+ SCLContext.push(context);
+ try {
+ f.apply(Tuple0.INSTANCE);
+ } catch (Exception e) {
+ LOGGER.error("Could not execute async", e);
+ printError(e);
+ } finally {
+ SCLContext.pop();
+ }
+ });
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static List<Object> pmap(Function f, List<Object> l) {
+ return Arrays.asList(l.parallelStream().map(a -> f.apply(a)).toArray());
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static Future<?> submitAsync(Function f) {
+ SCLContext context = SCLContext.createDerivedContext();
+ return scheduledExecutor.submit(() -> {
+ SCLContext.push(context);
+ try {
+ return f.apply(Tuple0.INSTANCE);
+ } catch (Throwable t) {
+ LOGGER.error("Could not execute async {}", f, t);
+ printError(t);
+ return null;
+ } finally {
+ SCLContext.pop();
+ }
+ });
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ public static List<Future<Object>> invokeAllAsync(Collection<Function> fs) throws InterruptedException {
+ return invokeAllAsyncWithTimeout(fs, -1, null);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private static Callable<Object> toCallable(SCLContext context, Function f) {
+ return () -> {
+ SCLContext.push(context);
+ try {
+ return f.apply(Tuple0.INSTANCE);
+ } catch (Throwable t) {
+ LOGGER.error("Could not invoke {}", f, t);
+ printError(t);
+ return null;
+ } finally {
+ SCLContext.pop();
+ }
+ };
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ public static List<Future<Object>> invokeAllAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException {
+ SCLContext context = SCLContext.createDerivedContext();
+ List<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
+ try {
+ if (timeout == -1) {
+ return scheduledExecutor.invokeAll(invokables);
+ } else {
+ return scheduledExecutor.invokeAll(invokables, timeout, unit);
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Could not invoke all", t);
+ throw t;
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ public static Object invokeAnyAsync(Collection<Function> fs) throws InterruptedException, ExecutionException, TimeoutException {
+ return invokeAnyAsyncWithTimeout(fs, -1, null);
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ public static Object invokeAnyAsyncWithTimeout(Collection<Function> fs, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ SCLContext context = SCLContext.createDerivedContext();
+ Collection<Callable<Object>> invokables = fs.stream().map(f -> toCallable(context, f)).collect(Collectors.toList());
+ try {
+ if (timeout == -1) {
+ return scheduledExecutor.invokeAny(invokables);
+ } else {
+ return scheduledExecutor.invokeAny(invokables, timeout, unit);
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Could not invoke any", t);
+ throw t;
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static Future<?> scheduleAtFixedRate(Function f, long initialDelay, long period, TimeUnit unit) {
+ SCLContext context = SCLContext.createDerivedContext();
+ return scheduledExecutor.scheduleAtFixedRate(() -> {
+ SCLContext.push(context);
+ try {
+ f.apply(Tuple0.INSTANCE);
+ } catch (Throwable t) {
+ LOGGER.error("Could not execute {} with fixed rate {} {}", f, initialDelay, period, t);
+ printError(t);
+ } finally {
+ SCLContext.pop();
+ }
+ }, initialDelay, period, unit);