-package org.simantics.modeling.scl;\r
-\r
-import java.io.IOException;\r
-import java.util.Map;\r
-import java.util.concurrent.ExecutorService;\r
-import java.util.concurrent.LinkedBlockingQueue;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.ThreadFactory;\r
-import java.util.concurrent.ThreadPoolExecutor;\r
-import java.util.concurrent.TimeUnit;\r
-\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.binding.Binding;\r
-import org.simantics.databoard.binding.mutable.Variant;\r
-import org.simantics.databoard.serialization.RuntimeSerializerConstructionException;\r
-import org.simantics.modeling.SCLTypeUtils;\r
-import org.simantics.scl.compiler.commands.CommandSession;\r
-import org.simantics.scl.compiler.types.Type;\r
-import org.simantics.scl.runtime.SCLContext;\r
-import org.simantics.scl.runtime.function.Function;\r
-import org.simantics.scl.runtime.tuple.Tuple0;\r
-import org.simantics.simulator.variable.NodeManager;\r
-import org.simantics.simulator.variable.Realm;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-import gnu.trove.map.hash.THashMap;\r
-\r
-public class SCLRealm implements Realm {\r
- private static final Logger LOGGER = LoggerFactory.getLogger(SCLRealm.class);\r
-\r
- public static final String SCL = "scl";\r
- \r
- THashMap<String,Type> contextTypes = new THashMap<String,Type>();\r
- \r
- CommandSession connection;\r
- String id;\r
- Thread executorThread;\r
- ExecutorService executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,\r
- new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {\r
- @Override\r
- public Thread newThread(Runnable r) {\r
- executorThread = new Thread(r);\r
- return executorThread;\r
- }\r
- });\r
- \r
- Semaphore beginSyncExec = new Semaphore(0);\r
- Semaphore endSyncExec = new Semaphore(0);\r
- \r
- SCLNodeManager nodeManager;\r
- \r
- Runnable scheduleSyncExec = new Runnable() {\r
- @Override\r
- public void run() {\r
- beginSyncExec.release();\r
- try {\r
- endSyncExec.acquire();\r
- } catch (InterruptedException e) {\r
- }\r
- }\r
- };\r
- \r
- SCLRealm(CommandSession connection, String id) {\r
- this.connection = connection;\r
- this.id = id;\r
- this.nodeManager = new SCLNodeManager(this);\r
- }\r
-\r
- public String getId() {\r
- return id;\r
- }\r
- \r
- public CommandSession getConnection() {\r
- return connection;\r
- }\r
- \r
- public Thread getThread() {\r
- return executorThread;\r
- }\r
- \r
- @SuppressWarnings({ "rawtypes", "unchecked" })\r
- public Object syncExec(Function fun) throws InterruptedException {\r
- executor.execute(scheduleSyncExec);\r
- \r
- SCLContext context = SCLContext.getCurrent();\r
- CommandSession oldConnection = (CommandSession)context.put(SCL, connection);\r
- \r
- try {\r
- beginSyncExec.acquire();\r
- Thread oldThread = executorThread;\r
- executorThread = Thread.currentThread();\r
- try {\r
- return fun.apply(Tuple0.INSTANCE);\r
- } finally {\r
- executorThread = oldThread;\r
- endSyncExec.release();\r
- }\r
- } finally {\r
- context.put(SCL, oldConnection);\r
- }\r
- }\r
- \r
- @SuppressWarnings("rawtypes")\r
- public void asyncExec(final Function fun) {\r
- executor.execute(new Runnable() {\r
- @SuppressWarnings("unchecked")\r
- @Override\r
- public void run() {\r
- SCLContext context = SCLContext.getCurrent();\r
- context.put(SCL, connection);\r
- fun.apply(Tuple0.INSTANCE);\r
- }\r
- });\r
- }\r
-\r
- @Override\r
- public void syncExec(Runnable runnable) throws InterruptedException {\r
- \r
- if(executorThread == Thread.currentThread()) {\r
- try {\r
- runnable.run();\r
- } catch (Throwable t) {\r
- LOGGER.error("Runnable failed in syncExec.", t);\r
- } finally {\r
- }\r
- return;\r
- }\r
-\r
- executor.execute(scheduleSyncExec);\r
- \r
- beginSyncExec.acquire();\r
- Thread oldThread = executorThread;\r
- executorThread = Thread.currentThread();\r
- try {\r
- runnable.run();\r
- } catch (Throwable t) {\r
- LOGGER.error("Runnable failed in syncExec.", t);\r
- } finally {\r
- executorThread = oldThread;\r
- endSyncExec.release();\r
- }\r
- }\r
-\r
- @Override\r
- public void asyncExec(Runnable runnable) {\r
-\r
- if(executorThread == Thread.currentThread()) {\r
- try {\r
- runnable.run();\r
- } catch (Throwable t) {\r
- LOGGER.error("Runnable failed in asyncExec.", t);\r
- } finally {\r
- }\r
- return;\r
- }\r
- \r
- executor.execute(runnable);\r
- }\r
- \r
- public void refreshVariables() {\r
- nodeManager.refreshVariables();\r
- }\r
- \r
- public void refreshVariablesSync() {\r
- nodeManager.refreshVariablesSync();\r
- }\r
-\r
- public void close() {\r
- SCLSessionManager.CONNECTIONS.remove(id);\r
- executor.shutdown();\r
- try {\r
- executor.awaitTermination(500L, TimeUnit.MILLISECONDS);\r
- } catch (InterruptedException e) {\r
- }\r
- //connection.close();\r
- }\r
-\r
- public NodeManager<String> getNodeManager() {\r
- return nodeManager;\r
- }\r
- \r
- private SCLState getState() {\r
- SCLState state = new SCLState();\r
- for(String key : connection.getVariables()) {\r
- Object value = connection.getVariableValue(key);\r
- try {\r
- Binding b = Bindings.getBinding(value.getClass());\r
- state.values.put(key, new Variant(b, value));\r
- } catch (Exception e) {\r
- }\r
- }\r
- return state;\r
- }\r
- \r
- public void applyState(byte[] blob) {\r
- try {\r
- SCLState state = (SCLState)SCLState.BINDING.serializer().deserialize(blob);\r
- for(Map.Entry<String,Variant> entry : state.values.entrySet()) {\r
- String key = entry.getKey();\r
- Variant value = entry.getValue();\r
- Type type = SCLTypeUtils.getType(value.type());\r
- if (type.getClassId() != Type.VAR_ID) {\r
- // do not add the property if type cannot be inferred\r
- connection.setVariable(key, type, value.getValue());\r
- }\r
- }\r
- } catch (RuntimeSerializerConstructionException e) {\r
- } catch (IOException e) {\r
- }\r
- }\r
- \r
- public byte[] serialize() {\r
- SCLState state = getState();\r
- try {\r
- return SCLState.BINDING.serializer().serialize(state);\r
- } catch (RuntimeSerializerConstructionException e) {\r
- return null;\r
- } catch (IOException e) {\r
- return null;\r
- }\r
- }\r
- \r
-}\r
+package org.simantics.modeling.scl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.binding.mutable.Variant;
+import org.simantics.databoard.serialization.RuntimeSerializerConstructionException;
+import org.simantics.modeling.SCLTypeUtils;
+import org.simantics.scl.compiler.commands.CommandSession;
+import org.simantics.scl.compiler.types.Type;
+import org.simantics.scl.runtime.SCLContext;
+import org.simantics.scl.runtime.function.Function;
+import org.simantics.scl.runtime.tuple.Tuple0;
+import org.simantics.simulator.variable.Realm;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import gnu.trove.map.hash.THashMap;
+
+public class SCLRealm implements Realm {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SCLRealm.class);
+
+ public static final String SCL = "scl";
+
+ private THashMap<String,Type> contextTypes = new THashMap<String,Type>();
+
+ private CommandSession connection;
+ private String id;
+ private Thread executorThread;
+ private SCLRealmThreadFactory factory = new SCLRealmThreadFactory(this);
+ private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), factory);
+
+ private Semaphore beginSyncExec = new Semaphore(0);
+ private Semaphore endSyncExec = new Semaphore(0);
+
+ private SCLNodeManager nodeManager;
+
+ private Runnable scheduleSyncExec = new Runnable() {
+ @Override
+ public void run() {
+ beginSyncExec.release();
+ try {
+ endSyncExec.acquire();
+ } catch (InterruptedException e) {
+ }
+ }
+ };
+
+ SCLRealm(CommandSession connection, String id) {
+ this.connection = connection;
+ this.id = id;
+ this.nodeManager = new SCLNodeManager(this);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public CommandSession getConnection() {
+ return connection;
+ }
+
+ public Thread getThread() {
+ return executorThread;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public Object syncExec(Function fun) throws InterruptedException {
+ executor.execute(scheduleSyncExec);
+
+ SCLContext context = SCLContext.getCurrent();
+ CommandSession oldConnection = (CommandSession)context.put(SCL, connection);
+
+ try {
+ beginSyncExec.acquire();
+ Thread oldThread = executorThread;
+ executorThread = Thread.currentThread();
+ try {
+ return fun.apply(Tuple0.INSTANCE);
+ } finally {
+ executorThread = oldThread;
+ endSyncExec.release();
+ }
+ } finally {
+ context.put(SCL, oldConnection);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void asyncExec(final Function fun) {
+ executor.execute(new Runnable() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ SCLContext context = SCLContext.getCurrent();
+ context.put(SCL, connection);
+ fun.apply(Tuple0.INSTANCE);
+ }
+ });
+ }
+
+ @Override
+ public void syncExec(Runnable runnable) throws InterruptedException {
+
+ if(executorThread == Thread.currentThread()) {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ LOGGER.error("Runnable failed in syncExec.", t);
+ } finally {
+ }
+ return;
+ }
+
+ executor.execute(scheduleSyncExec);
+
+ beginSyncExec.acquire();
+ Thread oldThread = executorThread;
+ executorThread = Thread.currentThread();
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ LOGGER.error("Runnable failed in syncExec.", t);
+ } finally {
+ executorThread = oldThread;
+ endSyncExec.release();
+ }
+ }
+
+ @Override
+ public void asyncExec(Runnable runnable) {
+
+ if(executorThread == Thread.currentThread()) {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ LOGGER.error("Runnable failed in asyncExec.", t);
+ } finally {
+ }
+ return;
+ }
+
+ executor.execute(runnable);
+ }
+
+ public void refreshVariables() {
+ nodeManager.refreshVariables();
+ }
+
+ public void refreshVariablesSync() {
+ nodeManager.refreshVariablesSync();
+ }
+
+ public void close() {
+ SCLSessionManager.CONNECTIONS.remove(id);
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
+ List<Runnable> runnables = executor.shutdownNow();
+ if (!runnables.isEmpty()) {
+ LOGGER.info("Some runnables left to execute in realm " + this + ": " + runnables);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.info("Could not shutdown executor " + executor + " in realm " + this, e);
+ }
+ //connection.close();
+
+ factory.clear();
+ factory = null;
+ // Should not happen
+ if (executorThread.isAlive())
+ executorThread.interrupt();
+ executorThread = null;
+ executor = null;
+
+ // clear nodeManager
+ nodeManager.clear();
+ }
+
+ public SCLNodeManager getNodeManager() {
+ return nodeManager;
+ }
+
+ private SCLState getState() {
+ SCLState state = new SCLState();
+ for(String key : connection.getVariables()) {
+ Object value = connection.getVariableValue(key);
+ try {
+ Binding b = Bindings.getBinding(value.getClass());
+ state.values.put(key, new Variant(b, value));
+ } catch (Exception e) {
+ }
+ }
+ return state;
+ }
+
+ public void applyState(byte[] blob) {
+ try {
+ SCLState state = (SCLState)SCLState.BINDING.serializer().deserialize(blob);
+ for(Map.Entry<String,Variant> entry : state.values.entrySet()) {
+ String key = entry.getKey();
+ Variant value = entry.getValue();
+ Type type = SCLTypeUtils.getType(value.type());
+ if (type.getClassId() != Type.VAR_ID) {
+ // do not add the property if type cannot be inferred
+ connection.setVariable(key, type, value.getValue());
+ }
+ }
+ } catch (RuntimeSerializerConstructionException e) {
+ } catch (IOException e) {
+ }
+ }
+
+ public byte[] serialize() {
+ SCLState state = getState();
+ try {
+ return SCLState.BINDING.serializer().serialize(state);
+ } catch (RuntimeSerializerConstructionException e) {
+ return null;
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ private static class SCLRealmThreadFactory implements ThreadFactory {
+
+ private SCLRealm realm;
+
+ public SCLRealmThreadFactory(SCLRealm realm) {
+ this.realm = realm;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ realm.setThread(t);
+ return t;
+ }
+
+ void clear() {
+ realm = null;
+ }
+ }
+
+ private void setThread(Thread t) {
+ this.executorThread = t;
+ }
+}