package org.simantics.db.layer0; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.simantics.db.common.utils.Logger; import org.simantics.scl.runtime.SCLContext; import org.simantics.scl.runtime.tuple.Tuple0; import org.simantics.simulator.variable.Realm; abstract public class StandardRealm> implements Realm { private String id; private Thread executorThread; private StandardRealmThreadFactory factory = new StandardRealmThreadFactory(this); private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), factory); private Semaphore beginSyncExec = new Semaphore(0); private Semaphore endSyncExec = new Semaphore(0); private Engine engine; private StandardNodeManager nodeManager; private Runnable scheduleSyncExec = new Runnable() { @Override public void run() { beginSyncExec.release(); try { endSyncExec.acquire(); } catch (InterruptedException e) { } } }; protected StandardRealm(Engine engine, String id) { this.engine = engine; this.id = id; this.nodeManager = createManager(); } abstract protected StandardNodeManager createManager(); protected String getSCLContextKey() { return getClass().getSimpleName(); } public String getId() { return id; } public Engine getEngine() { return engine; } public Thread getThread() { return executorThread; } @SuppressWarnings({ "rawtypes", "unchecked" }) public Object syncExec(Function fun) throws InterruptedException { executor.execute(scheduleSyncExec); SCLContext context = SCLContext.getCurrent(); Engine oldConnection = (Engine)context.put(getSCLContextKey(), engine); try { beginSyncExec.acquire(); Thread oldThread = executorThread; executorThread = Thread.currentThread(); try { return fun.apply(Tuple0.INSTANCE); } finally { executorThread = oldThread; endSyncExec.release(); } } finally { context.put(getSCLContextKey(), oldConnection); } } @SuppressWarnings("rawtypes") public void asyncExec(final Function fun) { executor.execute(new Runnable() { @SuppressWarnings("unchecked") @Override public void run() { SCLContext context = SCLContext.getCurrent(); context.put(getSCLContextKey(), engine); fun.apply(Tuple0.INSTANCE); } }); } @Override public void syncExec(Runnable runnable) throws InterruptedException { if(executorThread == Thread.currentThread()) { try { runnable.run(); } catch (Throwable t) { Logger.defaultLogError(t); } finally { } return; } executor.execute(scheduleSyncExec); beginSyncExec.acquire(); Thread oldThread = executorThread; executorThread = Thread.currentThread(); try { runnable.run(); } catch (Throwable t) { Logger.defaultLogError(t); } finally { executorThread = oldThread; endSyncExec.release(); } } @Override public void asyncExec(Runnable runnable) { if(executorThread == Thread.currentThread()) { try { runnable.run(); } catch (Throwable t) { Logger.defaultLogError(t); } finally { } return; } executor.execute(runnable); } public void close() { executor.shutdown(); try { if (!executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) { List runnablesLeft = executor.shutdownNow(); if (!runnablesLeft.isEmpty()) { getLogger().info("Runnables left for realm " + this + " after executor shutdown! " + runnablesLeft); } } } catch (InterruptedException e) { getLogger().info("Could not shutdown executor " + executor + " for realm " + this, e); } factory.clear(); factory = null; // Should never be true if (!executorThread.isAlive()) executorThread.interrupt(); executorThread = null; executor = null; // Clear nodeManager nodeManager.clear(); nodeManager = null; } public StandardNodeManager getNodeManager() { return nodeManager; } public abstract org.slf4j.Logger getLogger(); private void setExecutorThread(Thread t) { executorThread = t; } private static class StandardRealmThreadFactory implements ThreadFactory { private StandardRealm realm; public StandardRealmThreadFactory(StandardRealm realm) { this.realm = realm; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); realm.setExecutorThread(t); return t; } void clear() { realm = null; } } }