1 package org.simantics.r.scl;
\r
3 import java.util.concurrent.ExecutorService;
\r
4 import java.util.concurrent.Executors;
\r
5 import java.util.concurrent.Semaphore;
\r
6 import java.util.concurrent.ThreadFactory;
\r
7 import java.util.concurrent.TimeUnit;
\r
9 import org.rosuda.REngine.Rserve.RConnection;
\r
10 import org.rosuda.REngine.Rserve.RserveException;
\r
11 import org.simantics.r.scl.variable.RNodeManager;
\r
12 import org.simantics.r.scl.variable.RVariableNode;
\r
13 import org.simantics.scl.runtime.SCLContext;
\r
14 import org.simantics.scl.runtime.function.Function;
\r
15 import org.simantics.scl.runtime.tuple.Tuple0;
\r
16 import org.simantics.simulator.variable.NodeManager;
\r
17 import org.simantics.simulator.variable.Realm;
\r
19 public class RSession implements Realm {
\r
20 public static final String R = "r";
\r
22 RConnection connection;
\r
24 Thread executorThread;
\r
25 ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
\r
27 public Thread newThread(Runnable r) {
\r
28 executorThread = new Thread(r);
\r
29 return executorThread;
\r
33 Semaphore beginSyncExec = new Semaphore(0);
\r
34 Semaphore endSyncExec = new Semaphore(0);
\r
36 RNodeManager nodeManager;
\r
38 Runnable scheduleSyncExec = new Runnable() {
\r
41 beginSyncExec.release();
\r
43 endSyncExec.acquire();
\r
44 } catch (InterruptedException e) {
\r
49 RSession(RConnection connection, String id) {
\r
50 this.connection = connection;
\r
52 this.nodeManager = new RNodeManager(this);
\r
55 public String getId() {
\r
59 public RConnection getConnection() {
\r
63 public Thread getThread() {
\r
64 return executorThread;
\r
67 @SuppressWarnings({ "rawtypes", "unchecked" })
\r
68 public Object syncExec(Function fun) throws RserveException, InterruptedException {
\r
69 executor.execute(scheduleSyncExec);
\r
71 SCLContext context = SCLContext.getCurrent();
\r
72 RConnection oldConnection = (RConnection)context.put(R, connection);
\r
75 beginSyncExec.acquire();
\r
76 Thread oldThread = executorThread;
\r
77 executorThread = Thread.currentThread();
\r
79 return fun.apply(Tuple0.INSTANCE);
\r
81 executorThread = oldThread;
\r
82 endSyncExec.release();
\r
85 context.put(R, oldConnection);
\r
89 @SuppressWarnings("rawtypes")
\r
90 public void asyncExec(final Function fun) {
\r
91 executor.execute(new Runnable() {
\r
92 @SuppressWarnings("unchecked")
\r
95 SCLContext context = SCLContext.getCurrent();
\r
96 context.put(R, connection);
\r
97 fun.apply(Tuple0.INSTANCE);
\r
103 public void syncExec(Runnable runnable) throws InterruptedException {
\r
104 executor.execute(scheduleSyncExec);
\r
106 beginSyncExec.acquire();
\r
107 Thread oldThread = executorThread;
\r
108 executorThread = Thread.currentThread();
\r
112 executorThread = oldThread;
\r
113 endSyncExec.release();
\r
118 public void asyncExec(Runnable runnable) {
\r
119 executor.execute(runnable);
\r
122 public void refreshVariables() {
\r
123 nodeManager.refreshVariables();
\r
126 public void refreshVariablesSync() {
\r
127 nodeManager.refreshVariablesSync();
\r
130 public void close() {
\r
131 RSessionManager.CONNECTIONS.remove(id);
\r
132 executor.shutdown();
\r
134 executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
\r
135 } catch (InterruptedException e) {
\r
137 connection.close();
\r
140 public NodeManager<RVariableNode> getNodeManager() {
\r
141 return nodeManager;
\r