/******************************************************************************* * Copyright (c) 2014, 2016 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.r.scl; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.rosuda.REngine.Rserve.RConnection; import org.rosuda.REngine.Rserve.RserveException; import org.simantics.r.scl.variable.RNodeManager; import org.simantics.scl.runtime.SCLContext; import org.simantics.scl.runtime.function.Function; import org.simantics.scl.runtime.tuple.Tuple0; import org.simantics.simulator.variable.NodeManager; import org.simantics.simulator.variable.Realm; public class RSession implements Realm { public static final String R = "r"; RConnection connection; String id; Thread executorThread; ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { executorThread = new Thread(r); return executorThread; } }); Semaphore beginSyncExec = new Semaphore(0); Semaphore endSyncExec = new Semaphore(0); RNodeManager nodeManager; Runnable scheduleSyncExec = new Runnable() { @Override public void run() { beginSyncExec.release(); try { endSyncExec.acquire(); } catch (InterruptedException e) { } } }; RSession(RConnection connection, String id) { this.connection = connection; this.id = id; this.nodeManager = new RNodeManager(this); } public String getId() { return id; } public RConnection getConnection() { return connection; } public Thread getThread() { return executorThread; } @SuppressWarnings({ "rawtypes", "unchecked" }) public Object syncExec(Function fun) throws RserveException, InterruptedException { executor.execute(scheduleSyncExec); SCLContext context = SCLContext.getCurrent(); RConnection oldConnection = (RConnection)context.put(R, connection); try { beginSyncExec.acquire(); Thread oldThread = executorThread; executorThread = Thread.currentThread(); try { return fun.apply(Tuple0.INSTANCE); } finally { executorThread = oldThread; endSyncExec.release(); } } finally { context.put(R, oldConnection); } } @SuppressWarnings("rawtypes") public void asyncExec(final Function fun) { executor.execute(new Runnable() { @SuppressWarnings("unchecked") @Override public void run() { SCLContext context = SCLContext.getCurrent(); context.put(R, connection); fun.apply(Tuple0.INSTANCE); } }); } @Override public void syncExec(Runnable runnable) throws InterruptedException { executor.execute(scheduleSyncExec); beginSyncExec.acquire(); Thread oldThread = executorThread; executorThread = Thread.currentThread(); try { runnable.run(); } finally { executorThread = oldThread; endSyncExec.release(); } } @Override public void asyncExec(Runnable runnable) { executor.execute(runnable); } public void refreshVariables() { nodeManager.refreshVariables(); } public void refreshVariablesSync() { nodeManager.refreshVariablesSync(); } public void close() { RSessionManager.CONNECTIONS.remove(id); executor.shutdown(); try { executor.awaitTermination(500L, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } connection.close(); } public NodeManager getNodeManager() { return nodeManager; } }