]> gerrit.simantics Code Review - simantics/r.git/blob - org.simantics.r.scl/src/org/simantics/r/scl/RSession.java
62812a29090d7427468730bf9f0ee4b436fb24fd
[simantics/r.git] / org.simantics.r.scl / src / org / simantics / r / scl / RSession.java
1 /*******************************************************************************\r
2  * Copyright (c) 2014, 2016 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package org.simantics.r.scl;\r
13 \r
14 import java.util.concurrent.ExecutorService;\r
15 import java.util.concurrent.Executors;\r
16 import java.util.concurrent.Semaphore;\r
17 import java.util.concurrent.ThreadFactory;\r
18 import java.util.concurrent.TimeUnit;\r
19 \r
20 import org.rosuda.REngine.Rserve.RConnection;\r
21 import org.rosuda.REngine.Rserve.RserveException;\r
22 import org.simantics.r.scl.variable.RNodeManager;\r
23 import org.simantics.r.scl.variable.RVariableNode;\r
24 import org.simantics.scl.runtime.SCLContext;\r
25 import org.simantics.scl.runtime.function.Function;\r
26 import org.simantics.scl.runtime.tuple.Tuple0;\r
27 import org.simantics.simulator.variable.NodeManager;\r
28 import org.simantics.simulator.variable.Realm;\r
29 \r
30 public class RSession implements Realm {\r
31     public static final String R = "r";\r
32     \r
33     RConnection connection;\r
34     String id;\r
35     Thread executorThread;\r
36     ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {\r
37         @Override\r
38         public Thread newThread(Runnable r) {\r
39             executorThread = new Thread(r);\r
40             return executorThread;\r
41         }\r
42     });\r
43     \r
44     Semaphore beginSyncExec = new Semaphore(0);\r
45     Semaphore endSyncExec = new Semaphore(0);\r
46     \r
47     RNodeManager nodeManager;\r
48     \r
49     Runnable scheduleSyncExec = new Runnable() {\r
50         @Override\r
51         public void run() {\r
52             beginSyncExec.release();\r
53             try {\r
54                 endSyncExec.acquire();\r
55             } catch (InterruptedException e) {\r
56             }\r
57         }\r
58     };\r
59     \r
60     RSession(RConnection connection, String id) {\r
61         this.connection = connection;\r
62         this.id = id;\r
63         this.nodeManager = new RNodeManager(this);\r
64     }\r
65 \r
66     public String getId() {\r
67         return id;\r
68     }\r
69     \r
70     public RConnection getConnection() {\r
71         return connection;\r
72     }\r
73     \r
74     public Thread getThread() {\r
75         return executorThread;\r
76     }\r
77     \r
78     @SuppressWarnings({ "rawtypes", "unchecked" })\r
79     public Object syncExec(Function fun) throws RserveException, InterruptedException {\r
80         executor.execute(scheduleSyncExec);\r
81         \r
82         SCLContext context = SCLContext.getCurrent();\r
83         RConnection oldConnection = (RConnection)context.put(R, connection);\r
84         \r
85         try {\r
86             beginSyncExec.acquire();\r
87             Thread oldThread = executorThread;\r
88             executorThread = Thread.currentThread();\r
89             try {\r
90                 return fun.apply(Tuple0.INSTANCE);\r
91             } finally {\r
92                 executorThread = oldThread;\r
93                 endSyncExec.release();\r
94             }\r
95         } finally {\r
96             context.put(R, oldConnection);\r
97         }\r
98     }\r
99     \r
100     @SuppressWarnings("rawtypes")\r
101     public void asyncExec(final Function fun) {\r
102         executor.execute(new Runnable() {\r
103             @SuppressWarnings("unchecked")\r
104             @Override\r
105             public void run() {\r
106                 SCLContext context = SCLContext.getCurrent();\r
107                 context.put(R, connection);\r
108                 fun.apply(Tuple0.INSTANCE);\r
109             }\r
110         });\r
111     }\r
112 \r
113     @Override\r
114     public void syncExec(Runnable runnable) throws InterruptedException {\r
115         executor.execute(scheduleSyncExec);\r
116         \r
117         beginSyncExec.acquire();\r
118         Thread oldThread = executorThread;\r
119         executorThread = Thread.currentThread();\r
120         try {\r
121             runnable.run();\r
122         } finally {\r
123             executorThread = oldThread;\r
124             endSyncExec.release();\r
125         }\r
126     }\r
127 \r
128     @Override\r
129     public void asyncExec(Runnable runnable) {\r
130         executor.execute(runnable);\r
131     }\r
132     \r
133     public void refreshVariables() {\r
134         nodeManager.refreshVariables();\r
135     }\r
136     \r
137     public void refreshVariablesSync() {\r
138         nodeManager.refreshVariablesSync();\r
139     }\r
140 \r
141     public void close() {\r
142         RSessionManager.CONNECTIONS.remove(id);\r
143         executor.shutdown();\r
144         try {\r
145             executor.awaitTermination(500L, TimeUnit.MILLISECONDS);\r
146         } catch (InterruptedException e) {\r
147         }\r
148         connection.close();\r
149     }\r
150 \r
151     public NodeManager<RVariableNode> getNodeManager() {\r
152         return nodeManager;\r
153     }\r
154 }\r