package org.simantics.db.layer0; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.simantics.db.common.utils.Logger; import org.simantics.scl.runtime.SCLContext; import org.simantics.scl.runtime.tuple.Tuple0; import org.simantics.simulator.variable.Realm; abstract public class StandardRealm> implements Realm { String id; Thread executorThread; ExecutorService executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { executorThread = new Thread(r); return executorThread; } }); Semaphore beginSyncExec = new Semaphore(0); Semaphore endSyncExec = new Semaphore(0); Engine engine; StandardSessionManager sessionManager; StandardNodeManager nodeManager; Runnable scheduleSyncExec = new Runnable() { @Override public void run() { beginSyncExec.release(); try { endSyncExec.acquire(); } catch (InterruptedException e) { } } }; protected StandardRealm(StandardSessionManager sessionManager, Engine engine, String id) { this.sessionManager = sessionManager; this.engine = engine; this.id = id; this.nodeManager = createManager(); } abstract protected StandardNodeManager createManager(); protected String getSCLContextKey() { return getClass().getSimpleName(); } public String getId() { return id; } public Engine getEngine() { return engine; } public Thread getThread() { return executorThread; } @SuppressWarnings({ "rawtypes", "unchecked" }) public Object syncExec(Function fun) throws InterruptedException { executor.execute(scheduleSyncExec); SCLContext context = SCLContext.getCurrent(); Engine oldConnection = (Engine)context.put(getSCLContextKey(), engine); try { beginSyncExec.acquire(); Thread oldThread = executorThread; executorThread = Thread.currentThread(); try { return fun.apply(Tuple0.INSTANCE); } finally { executorThread = oldThread; endSyncExec.release(); } } finally { context.put(getSCLContextKey(), oldConnection); } } @SuppressWarnings("rawtypes") public void asyncExec(final Function fun) { executor.execute(new Runnable() { @SuppressWarnings("unchecked") @Override public void run() { SCLContext context = SCLContext.getCurrent(); context.put(getSCLContextKey(), engine); fun.apply(Tuple0.INSTANCE); } }); } @Override public void syncExec(Runnable runnable) throws InterruptedException { if(executorThread == Thread.currentThread()) { try { runnable.run(); } catch (Throwable t) { Logger.defaultLogError(t); } finally { } return; } executor.execute(scheduleSyncExec); beginSyncExec.acquire(); Thread oldThread = executorThread; executorThread = Thread.currentThread(); try { runnable.run(); } catch (Throwable t) { Logger.defaultLogError(t); } finally { executorThread = oldThread; endSyncExec.release(); } } @Override public void asyncExec(Runnable runnable) { if(executorThread == Thread.currentThread()) { try { runnable.run(); } catch (Throwable t) { Logger.defaultLogError(t); } finally { } return; } executor.execute(runnable); } public void close() { sessionManager.removeRealm(id); executor.shutdown(); try { executor.awaitTermination(500L, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } public StandardNodeManager getNodeManager() { return nodeManager; } }