]> gerrit.simantics Code Review - simantics/r.git/blob - org.simantics.r.scl/src/org/simantics/r/scl/RSession.java
(refs #6833) Test RExp inheritance in SCL
[simantics/r.git] / org.simantics.r.scl / src / org / simantics / r / scl / RSession.java
1 /*******************************************************************************\r
2  * Copyright (c) 2014, 2016 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package org.simantics.r.scl;\r
13 \r
14 import java.util.concurrent.ExecutorService;\r
15 import java.util.concurrent.Executors;\r
16 import java.util.concurrent.Semaphore;\r
17 import java.util.concurrent.ThreadFactory;\r
18 import java.util.concurrent.TimeUnit;\r
19 \r
20 import org.rosuda.REngine.Rserve.RConnection;\r
21 import org.rosuda.REngine.Rserve.RserveException;\r
22 import org.simantics.r.scl.variable.RNodeManager;\r
23 import org.simantics.scl.runtime.SCLContext;\r
24 import org.simantics.scl.runtime.function.Function;\r
25 import org.simantics.scl.runtime.tuple.Tuple0;\r
26 import org.simantics.simulator.variable.NodeManager;\r
27 import org.simantics.simulator.variable.Realm;\r
28 \r
29 public class RSession implements Realm {\r
30     public static final String R = "r";\r
31     \r
32     RConnection connection;\r
33     String id;\r
34     Thread executorThread;\r
35     ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {\r
36         @Override\r
37         public Thread newThread(Runnable r) {\r
38             executorThread = new Thread(r);\r
39             return executorThread;\r
40         }\r
41     });\r
42     \r
43     Semaphore beginSyncExec = new Semaphore(0);\r
44     Semaphore endSyncExec = new Semaphore(0);\r
45     \r
46     RNodeManager nodeManager;\r
47     \r
48     Runnable scheduleSyncExec = new Runnable() {\r
49         @Override\r
50         public void run() {\r
51             beginSyncExec.release();\r
52             try {\r
53                 endSyncExec.acquire();\r
54             } catch (InterruptedException e) {\r
55             }\r
56         }\r
57     };\r
58     \r
59     RSession(RConnection connection, String id) {\r
60         this.connection = connection;\r
61         this.id = id;\r
62         this.nodeManager = new RNodeManager(this);\r
63     }\r
64 \r
65     public String getId() {\r
66         return id;\r
67     }\r
68     \r
69     public RConnection getConnection() {\r
70         return connection;\r
71     }\r
72     \r
73     public Thread getThread() {\r
74         return executorThread;\r
75     }\r
76     \r
77     @SuppressWarnings({ "rawtypes", "unchecked" })\r
78     public Object syncExec(Function fun) throws RserveException, InterruptedException {\r
79         executor.execute(scheduleSyncExec);\r
80         \r
81         SCLContext context = SCLContext.getCurrent();\r
82         RConnection oldConnection = (RConnection)context.put(R, connection);\r
83         \r
84         try {\r
85             beginSyncExec.acquire();\r
86             Thread oldThread = executorThread;\r
87             executorThread = Thread.currentThread();\r
88             try {\r
89                 return fun.apply(Tuple0.INSTANCE);\r
90             } finally {\r
91                 executorThread = oldThread;\r
92                 endSyncExec.release();\r
93             }\r
94         } finally {\r
95             context.put(R, oldConnection);\r
96         }\r
97     }\r
98     \r
99     @SuppressWarnings("rawtypes")\r
100     public void asyncExec(final Function fun) {\r
101         executor.execute(new Runnable() {\r
102             @SuppressWarnings("unchecked")\r
103             @Override\r
104             public void run() {\r
105                 SCLContext context = SCLContext.getCurrent();\r
106                 context.put(R, connection);\r
107                 fun.apply(Tuple0.INSTANCE);\r
108             }\r
109         });\r
110     }\r
111 \r
112     @Override\r
113     public void syncExec(Runnable runnable) throws InterruptedException {\r
114         executor.execute(scheduleSyncExec);\r
115         \r
116         beginSyncExec.acquire();\r
117         Thread oldThread = executorThread;\r
118         executorThread = Thread.currentThread();\r
119         try {\r
120             runnable.run();\r
121         } finally {\r
122             executorThread = oldThread;\r
123             endSyncExec.release();\r
124         }\r
125     }\r
126 \r
127     @Override\r
128     public void asyncExec(Runnable runnable) {\r
129         executor.execute(runnable);\r
130     }\r
131     \r
132     public void refreshVariables() {\r
133         nodeManager.refreshVariables();\r
134     }\r
135     \r
136     public void refreshVariablesSync() {\r
137         nodeManager.refreshVariablesSync();\r
138     }\r
139 \r
140     public void close() {\r
141         RSessionManager.CONNECTIONS.remove(id);\r
142         executor.shutdown();\r
143         try {\r
144             executor.awaitTermination(500L, TimeUnit.MILLISECONDS);\r
145         } catch (InterruptedException e) {\r
146         }\r
147         connection.close();\r
148     }\r
149 \r
150     public NodeManager getNodeManager() {\r
151         return nodeManager;\r
152     }\r
153 }\r