]> gerrit.simantics Code Review - simantics/r.git/blob - org.simantics.r.scl/src/org/simantics/r/scl/RSession.java
Share project "org.simantics.r.feature" into "https://www.simantics.org/svn/simantics"
[simantics/r.git] / org.simantics.r.scl / src / org / simantics / r / scl / RSession.java
1 package org.simantics.r.scl;\r
2 \r
3 import java.util.concurrent.ExecutorService;\r
4 import java.util.concurrent.Executors;\r
5 import java.util.concurrent.Semaphore;\r
6 import java.util.concurrent.ThreadFactory;\r
7 import java.util.concurrent.TimeUnit;\r
8 \r
9 import org.rosuda.REngine.Rserve.RConnection;\r
10 import org.rosuda.REngine.Rserve.RserveException;\r
11 import org.simantics.r.scl.variable.RNodeManager;\r
12 import org.simantics.r.scl.variable.RVariableNode;\r
13 import org.simantics.scl.runtime.SCLContext;\r
14 import org.simantics.scl.runtime.function.Function;\r
15 import org.simantics.scl.runtime.tuple.Tuple0;\r
16 import org.simantics.simulator.variable.NodeManager;\r
17 import org.simantics.simulator.variable.Realm;\r
18 \r
19 public class RSession implements Realm {\r
20     public static final String R = "r";\r
21     \r
22     RConnection connection;\r
23     String id;\r
24     Thread executorThread;\r
25     ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {\r
26         @Override\r
27         public Thread newThread(Runnable r) {\r
28             executorThread = new Thread(r);\r
29             return executorThread;\r
30         }\r
31     });\r
32     \r
33     Semaphore beginSyncExec = new Semaphore(0);\r
34     Semaphore endSyncExec = new Semaphore(0);\r
35     \r
36     RNodeManager nodeManager;\r
37     \r
38     Runnable scheduleSyncExec = new Runnable() {\r
39         @Override\r
40         public void run() {\r
41             beginSyncExec.release();\r
42             try {\r
43                 endSyncExec.acquire();\r
44             } catch (InterruptedException e) {\r
45             }\r
46         }\r
47     };\r
48     \r
49     RSession(RConnection connection, String id) {\r
50         this.connection = connection;\r
51         this.id = id;\r
52         this.nodeManager = new RNodeManager(this);\r
53     }\r
54 \r
55     public String getId() {\r
56         return id;\r
57     }\r
58     \r
59     public RConnection getConnection() {\r
60         return connection;\r
61     }\r
62     \r
63     public Thread getThread() {\r
64         return executorThread;\r
65     }\r
66     \r
67     @SuppressWarnings({ "rawtypes", "unchecked" })\r
68     public Object syncExec(Function fun) throws RserveException, InterruptedException {\r
69         executor.execute(scheduleSyncExec);\r
70         \r
71         SCLContext context = SCLContext.getCurrent();\r
72         RConnection oldConnection = (RConnection)context.put(R, connection);\r
73         \r
74         try {\r
75             beginSyncExec.acquire();\r
76             Thread oldThread = executorThread;\r
77             executorThread = Thread.currentThread();\r
78             try {\r
79                 return fun.apply(Tuple0.INSTANCE);\r
80             } finally {\r
81                 executorThread = oldThread;\r
82                 endSyncExec.release();\r
83             }\r
84         } finally {\r
85             context.put(R, oldConnection);\r
86         }\r
87     }\r
88     \r
89     @SuppressWarnings("rawtypes")\r
90     public void asyncExec(final Function fun) {\r
91         executor.execute(new Runnable() {\r
92             @SuppressWarnings("unchecked")\r
93             @Override\r
94             public void run() {\r
95                 SCLContext context = SCLContext.getCurrent();\r
96                 context.put(R, connection);\r
97                 fun.apply(Tuple0.INSTANCE);\r
98             }\r
99         });\r
100     }\r
101 \r
102     @Override\r
103     public void syncExec(Runnable runnable) throws InterruptedException {\r
104         executor.execute(scheduleSyncExec);\r
105         \r
106         beginSyncExec.acquire();\r
107         Thread oldThread = executorThread;\r
108         executorThread = Thread.currentThread();\r
109         try {\r
110             runnable.run();\r
111         } finally {\r
112             executorThread = oldThread;\r
113             endSyncExec.release();\r
114         }\r
115     }\r
116 \r
117     @Override\r
118     public void asyncExec(Runnable runnable) {\r
119         executor.execute(runnable);\r
120     }\r
121     \r
122     public void refreshVariables() {\r
123         nodeManager.refreshVariables();\r
124     }\r
125     \r
126     public void refreshVariablesSync() {\r
127         nodeManager.refreshVariablesSync();\r
128     }\r
129 \r
130     public void close() {\r
131         RSessionManager.CONNECTIONS.remove(id);\r
132         executor.shutdown();\r
133         try {\r
134             executor.awaitTermination(500L, TimeUnit.MILLISECONDS);\r
135         } catch (InterruptedException e) {\r
136         }\r
137         connection.close();\r
138     }\r
139 \r
140     public NodeManager<RVariableNode> getNodeManager() {\r
141         return nodeManager;\r
142     }\r
143 }\r