package org.simantics.modeling.scl; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.simantics.databoard.Bindings; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.mutable.Variant; import org.simantics.databoard.serialization.RuntimeSerializerConstructionException; import org.simantics.modeling.SCLTypeUtils; import org.simantics.scl.compiler.commands.CommandSession; import org.simantics.scl.compiler.types.Type; import org.simantics.scl.runtime.SCLContext; import org.simantics.scl.runtime.function.Function; import org.simantics.scl.runtime.tuple.Tuple0; import org.simantics.simulator.variable.Realm; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import gnu.trove.map.hash.THashMap; public class SCLRealm implements Realm { private static final Logger LOGGER = LoggerFactory.getLogger(SCLRealm.class); public static final String SCL = "scl"; private THashMap contextTypes = new THashMap(); private CommandSession connection; private String id; private Thread executorThread; private SCLRealmThreadFactory factory = new SCLRealmThreadFactory(this); private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), factory); private Semaphore beginSyncExec = new Semaphore(0); private Semaphore endSyncExec = new Semaphore(0); private SCLNodeManager nodeManager; private Runnable scheduleSyncExec = new Runnable() { @Override public void run() { beginSyncExec.release(); try { endSyncExec.acquire(); } catch (InterruptedException e) { } } }; SCLRealm(CommandSession connection, String id) { this.connection = connection; this.id = id; this.nodeManager = new SCLNodeManager(this); } public String getId() { return id; } public CommandSession getConnection() { return connection; } public Thread getThread() { return executorThread; } @SuppressWarnings({ "rawtypes", "unchecked" }) public Object syncExec(Function fun) throws InterruptedException { executor.execute(scheduleSyncExec); SCLContext context = SCLContext.getCurrent(); CommandSession oldConnection = (CommandSession)context.put(SCL, connection); try { beginSyncExec.acquire(); Thread oldThread = executorThread; executorThread = Thread.currentThread(); try { return fun.apply(Tuple0.INSTANCE); } finally { executorThread = oldThread; endSyncExec.release(); } } finally { context.put(SCL, oldConnection); } } @SuppressWarnings("rawtypes") public void asyncExec(final Function fun) { executor.execute(new Runnable() { @SuppressWarnings("unchecked") @Override public void run() { SCLContext context = SCLContext.getCurrent(); context.put(SCL, connection); fun.apply(Tuple0.INSTANCE); } }); } @Override public void syncExec(Runnable runnable) throws InterruptedException { if(executorThread == Thread.currentThread()) { try { runnable.run(); } catch (Throwable t) { LOGGER.error("Runnable failed in syncExec.", t); } finally { } return; } executor.execute(scheduleSyncExec); beginSyncExec.acquire(); Thread oldThread = executorThread; executorThread = Thread.currentThread(); try { runnable.run(); } catch (Throwable t) { LOGGER.error("Runnable failed in syncExec.", t); } finally { executorThread = oldThread; endSyncExec.release(); } } @Override public void asyncExec(Runnable runnable) { if(executorThread == Thread.currentThread()) { try { runnable.run(); } catch (Throwable t) { LOGGER.error("Runnable failed in asyncExec.", t); } finally { } return; } executor.execute(runnable); } public void refreshVariables() { nodeManager.refreshVariables(); } public void refreshVariablesSync() { nodeManager.refreshVariablesSync(); } public void close() { SCLSessionManager.CONNECTIONS.remove(id); executor.shutdown(); try { if (!executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) { List runnables = executor.shutdownNow(); if (!runnables.isEmpty()) { LOGGER.info("Some runnables left to execute in realm " + this + ": " + runnables); } } } catch (InterruptedException e) { LOGGER.info("Could not shutdown executor " + executor + " in realm " + this, e); } //connection.close(); factory.clear(); factory = null; // Should not happen if (executorThread.isAlive()) executorThread.interrupt(); executorThread = null; executor = null; // clear nodeManager nodeManager.clear(); } public SCLNodeManager getNodeManager() { return nodeManager; } private SCLState getState() { SCLState state = new SCLState(); for(String key : connection.getVariables()) { Object value = connection.getVariableValue(key); try { Binding b = Bindings.getBinding(value.getClass()); state.values.put(key, new Variant(b, value)); } catch (Exception e) { } } return state; } public void applyState(byte[] blob) { try { SCLState state = (SCLState)SCLState.BINDING.serializer().deserialize(blob); for(Map.Entry entry : state.values.entrySet()) { String key = entry.getKey(); Variant value = entry.getValue(); Type type = SCLTypeUtils.getType(value.type()); if (type.getClassId() != Type.VAR_ID) { // do not add the property if type cannot be inferred connection.setVariable(key, type, value.getValue()); } } } catch (RuntimeSerializerConstructionException e) { } catch (IOException e) { } } public byte[] serialize() { SCLState state = getState(); try { return SCLState.BINDING.serializer().serialize(state); } catch (RuntimeSerializerConstructionException e) { return null; } catch (IOException e) { return null; } } private static class SCLRealmThreadFactory implements ThreadFactory { private SCLRealm realm; public SCLRealmThreadFactory(SCLRealm realm) { this.realm = realm; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); realm.setThread(t); return t; } void clear() { realm = null; } } private void setThread(Thread t) { this.executorThread = t; } }