]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.modeling/src/org/simantics/modeling/scl/SCLRealm.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.modeling / src / org / simantics / modeling / scl / SCLRealm.java
1 package org.simantics.modeling.scl;
2
3 import java.io.IOException;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.concurrent.LinkedBlockingQueue;
7 import java.util.concurrent.Semaphore;
8 import java.util.concurrent.ThreadFactory;
9 import java.util.concurrent.ThreadPoolExecutor;
10 import java.util.concurrent.TimeUnit;
11
12 import org.simantics.databoard.Bindings;
13 import org.simantics.databoard.binding.Binding;
14 import org.simantics.databoard.binding.mutable.Variant;
15 import org.simantics.databoard.serialization.RuntimeSerializerConstructionException;
16 import org.simantics.modeling.SCLTypeUtils;
17 import org.simantics.scl.compiler.commands.CommandSession;
18 import org.simantics.scl.compiler.types.Type;
19 import org.simantics.scl.runtime.SCLContext;
20 import org.simantics.scl.runtime.function.Function;
21 import org.simantics.scl.runtime.tuple.Tuple0;
22 import org.simantics.simulator.variable.Realm;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import gnu.trove.map.hash.THashMap;
27
28 public class SCLRealm implements Realm {
29     private static final Logger LOGGER = LoggerFactory.getLogger(SCLRealm.class);
30
31     public static final String SCL = "scl";
32     
33     private THashMap<String,Type> contextTypes = new THashMap<String,Type>();
34     
35     private CommandSession connection;
36     private String id;
37     private Thread executorThread;
38     private SCLRealmThreadFactory factory = new SCLRealmThreadFactory(this);
39     private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
40             new LinkedBlockingQueue<Runnable>(), factory);
41     
42     private Semaphore beginSyncExec = new Semaphore(0);
43     private Semaphore endSyncExec = new Semaphore(0);
44     
45     private SCLNodeManager nodeManager;
46     
47     private Runnable scheduleSyncExec = new Runnable() {
48         @Override
49         public void run() {
50             beginSyncExec.release();
51             try {
52                 endSyncExec.acquire();
53             } catch (InterruptedException e) {
54             }
55         }
56     };
57     
58     SCLRealm(CommandSession connection, String id) {
59         this.connection = connection;
60         this.id = id;
61         this.nodeManager = new SCLNodeManager(this);
62     }
63
64     public String getId() {
65         return id;
66     }
67     
68     public CommandSession getConnection() {
69         return connection;
70     }
71     
72     public Thread getThread() {
73         return executorThread;
74     }
75     
76     @SuppressWarnings({ "rawtypes", "unchecked" })
77     public Object syncExec(Function fun) throws InterruptedException {
78         executor.execute(scheduleSyncExec);
79         
80         SCLContext context = SCLContext.getCurrent();
81         CommandSession oldConnection = (CommandSession)context.put(SCL, connection);
82         
83         try {
84             beginSyncExec.acquire();
85             Thread oldThread = executorThread;
86             executorThread = Thread.currentThread();
87             try {
88                 return fun.apply(Tuple0.INSTANCE);
89             } finally {
90                 executorThread = oldThread;
91                 endSyncExec.release();
92             }
93         } finally {
94             context.put(SCL, oldConnection);
95         }
96     }
97     
98     @SuppressWarnings("rawtypes")
99     public void asyncExec(final Function fun) {
100         executor.execute(new Runnable() {
101             @SuppressWarnings("unchecked")
102             @Override
103             public void run() {
104                 SCLContext context = SCLContext.getCurrent();
105                 context.put(SCL, connection);
106                 fun.apply(Tuple0.INSTANCE);
107             }
108         });
109     }
110
111     @Override
112     public void syncExec(Runnable runnable) throws InterruptedException {
113         
114         if(executorThread == Thread.currentThread()) {
115             try {
116                 runnable.run();
117             } catch (Throwable t) {
118                 LOGGER.error("Runnable failed in syncExec.", t);
119             } finally {
120             }
121             return;
122         }
123
124         executor.execute(scheduleSyncExec);
125         
126         beginSyncExec.acquire();
127         Thread oldThread = executorThread;
128         executorThread = Thread.currentThread();
129         try {
130             runnable.run();
131         } catch (Throwable t) {
132             LOGGER.error("Runnable failed in syncExec.", t);
133         } finally {
134             executorThread = oldThread;
135             endSyncExec.release();
136         }
137     }
138
139     @Override
140     public void asyncExec(Runnable runnable) {
141
142         if(executorThread == Thread.currentThread()) {
143             try {
144                 runnable.run();
145             } catch (Throwable t) {
146                 LOGGER.error("Runnable failed in asyncExec.", t);
147             } finally {
148             }
149             return;
150         }
151         
152         executor.execute(runnable);
153     }
154     
155     public void refreshVariables() {
156         nodeManager.refreshVariables();
157     }
158     
159     public void refreshVariablesSync() {
160         nodeManager.refreshVariablesSync();
161     }
162
163     public void close() {
164         SCLSessionManager.CONNECTIONS.remove(id);
165         executor.shutdown();
166         try {
167             if (!executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
168                 List<Runnable> runnables = executor.shutdownNow();
169                 if (!runnables.isEmpty()) {
170                     LOGGER.info("Some runnables left to execute in realm " + this + ": " + runnables);
171                 }
172             }
173         } catch (InterruptedException e) {
174             LOGGER.info("Could not shutdown executor " + executor + " in realm " + this, e);
175         }
176         //connection.close();
177         
178         factory.clear();
179         factory = null;
180         // Should not happen
181         if (executorThread.isAlive())
182             executorThread.interrupt();
183         executorThread = null;
184         executor = null;
185         
186         // clear nodeManager
187         nodeManager.clear();
188     }
189
190     public SCLNodeManager getNodeManager() {
191         return nodeManager;
192     }
193     
194     private SCLState getState() {
195         SCLState state = new SCLState();
196         for(String key : connection.getVariables()) {
197                 Object value = connection.getVariableValue(key);
198                 try {
199                         Binding b = Bindings.getBinding(value.getClass());
200                         state.values.put(key, new Variant(b, value));
201                 } catch (Exception e) {
202                 }
203         }
204         return state;
205     }
206     
207     public void applyState(byte[] blob) {
208         try {
209                         SCLState state = (SCLState)SCLState.BINDING.serializer().deserialize(blob);
210                 for(Map.Entry<String,Variant> entry : state.values.entrySet()) {
211                         String key = entry.getKey();
212                         Variant value = entry.getValue();
213                         Type type = SCLTypeUtils.getType(value.type());
214                         if (type.getClassId() != Type.VAR_ID) {
215                             // do not add the property if type cannot be inferred
216                             connection.setVariable(key, type, value.getValue());
217                         }
218                 }
219                 } catch (RuntimeSerializerConstructionException e) {
220                 } catch (IOException e) {
221                 }
222     }
223     
224     public byte[] serialize() {
225         SCLState state = getState();
226         try {
227                         return SCLState.BINDING.serializer().serialize(state);
228                 } catch (RuntimeSerializerConstructionException e) {
229                         return null;
230                 } catch (IOException e) {
231                         return null;
232                 }
233     }
234     
235     private static class SCLRealmThreadFactory implements ThreadFactory {
236
237         private SCLRealm realm;
238
239         public SCLRealmThreadFactory(SCLRealm realm) {
240              this.realm = realm;
241          }
242         
243         @Override
244         public Thread newThread(Runnable r) {
245             Thread t = new Thread(r);
246             realm.setThread(t);
247             return t;
248         }
249         
250         void clear() {
251             realm = null;
252         }
253     }
254
255     private void setThread(Thread t) {
256         this.executorThread = t;
257     }
258 }