-package org.simantics.db.layer0;\r
-\r
-import java.util.concurrent.ExecutorService;\r
-import java.util.concurrent.LinkedBlockingQueue;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.ThreadFactory;\r
-import java.util.concurrent.ThreadPoolExecutor;\r
-import java.util.concurrent.TimeUnit;\r
-import java.util.function.Function;\r
-\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.scl.runtime.SCLContext;\r
-import org.simantics.scl.runtime.tuple.Tuple0;\r
-import org.simantics.simulator.variable.Realm;\r
-\r
-abstract public class StandardRealm<Node, Engine extends StandardEngine<Node>> implements Realm {\r
- \r
- String id;\r
- Thread executorThread;\r
- ExecutorService executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,\r
- new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {\r
- @Override\r
- public Thread newThread(Runnable r) {\r
- executorThread = new Thread(r);\r
- return executorThread;\r
- }\r
- });\r
- \r
- Semaphore beginSyncExec = new Semaphore(0);\r
- Semaphore endSyncExec = new Semaphore(0);\r
- \r
- Engine engine;\r
- StandardSessionManager<Node, Engine> sessionManager;\r
- StandardNodeManager<Node, Engine> nodeManager;\r
- \r
- Runnable scheduleSyncExec = new Runnable() {\r
- @Override\r
- public void run() {\r
- beginSyncExec.release();\r
- try {\r
- endSyncExec.acquire();\r
- } catch (InterruptedException e) {\r
- }\r
- }\r
- };\r
- \r
- protected StandardRealm(StandardSessionManager<Node, Engine> sessionManager, Engine engine, String id) {\r
- this.sessionManager = sessionManager;\r
- this.engine = engine;\r
- this.id = id;\r
- this.nodeManager = createManager();\r
- }\r
- \r
- abstract protected StandardNodeManager<Node, Engine> createManager();\r
- \r
- protected String getSCLContextKey() {\r
- return getClass().getSimpleName();\r
- }\r
-\r
- public String getId() {\r
- return id;\r
- }\r
- \r
- public Engine getEngine() {\r
- return engine;\r
- }\r
-\r
- public Thread getThread() {\r
- return executorThread;\r
- }\r
- \r
- @SuppressWarnings({ "rawtypes", "unchecked" })\r
- public Object syncExec(Function fun) throws InterruptedException {\r
- \r
- executor.execute(scheduleSyncExec);\r
- SCLContext context = SCLContext.getCurrent();\r
- Engine oldConnection = (Engine)context.put(getSCLContextKey(), engine);\r
- \r
- try {\r
- beginSyncExec.acquire();\r
- Thread oldThread = executorThread;\r
- executorThread = Thread.currentThread();\r
- try {\r
- return fun.apply(Tuple0.INSTANCE);\r
- } finally {\r
- executorThread = oldThread;\r
- endSyncExec.release();\r
- }\r
- } finally {\r
- context.put(getSCLContextKey(), oldConnection);\r
- }\r
- }\r
- \r
- @SuppressWarnings("rawtypes")\r
- public void asyncExec(final Function fun) {\r
- executor.execute(new Runnable() {\r
- @SuppressWarnings("unchecked")\r
- @Override\r
- public void run() {\r
- SCLContext context = SCLContext.getCurrent();\r
- context.put(getSCLContextKey(), engine);\r
- fun.apply(Tuple0.INSTANCE);\r
- }\r
- });\r
- }\r
-\r
- @Override\r
- public void syncExec(Runnable runnable) throws InterruptedException {\r
- \r
- if(executorThread == Thread.currentThread()) {\r
- try {\r
- runnable.run();\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- } finally {\r
- }\r
- return;\r
- }\r
-\r
- executor.execute(scheduleSyncExec);\r
- \r
- beginSyncExec.acquire();\r
- Thread oldThread = executorThread;\r
- executorThread = Thread.currentThread();\r
- try {\r
- runnable.run();\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- } finally {\r
- executorThread = oldThread;\r
- endSyncExec.release();\r
- }\r
- }\r
-\r
- @Override\r
- public void asyncExec(Runnable runnable) {\r
-\r
- if(executorThread == Thread.currentThread()) {\r
- try {\r
- runnable.run();\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- } finally {\r
- }\r
- return;\r
- }\r
- \r
- executor.execute(runnable);\r
- }\r
- \r
- public void close() {\r
- sessionManager.removeRealm(id);\r
- executor.shutdown();\r
- try {\r
- executor.awaitTermination(500L, TimeUnit.MILLISECONDS);\r
- } catch (InterruptedException e) {\r
- }\r
- }\r
-\r
- public StandardNodeManager<Node, Engine> getNodeManager() {\r
- return nodeManager;\r
- }\r
- \r
-}\r
+package org.simantics.db.layer0;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.simantics.db.common.utils.Logger;
+import org.simantics.scl.runtime.SCLContext;
+import org.simantics.scl.runtime.tuple.Tuple0;
+import org.simantics.simulator.variable.Realm;
+
+abstract public class StandardRealm<Node, Engine extends StandardEngine<Node>> implements Realm {
+
+ private String id;
+ private Thread executorThread;
+ private StandardRealmThreadFactory factory = new StandardRealmThreadFactory(this);
+ private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), factory);
+ private Semaphore beginSyncExec = new Semaphore(0);
+ private Semaphore endSyncExec = new Semaphore(0);
+
+ private Engine engine;
+ private StandardNodeManager<Node, Engine> nodeManager;
+
+ private Runnable scheduleSyncExec = new Runnable() {
+ @Override
+ public void run() {
+ beginSyncExec.release();
+ try {
+ endSyncExec.acquire();
+ } catch (InterruptedException e) {
+ }
+ }
+ };
+
+ protected StandardRealm(Engine engine, String id) {
+ this.engine = engine;
+ this.id = id;
+ this.nodeManager = createManager();
+ }
+
+ abstract protected StandardNodeManager<Node, Engine> createManager();
+
+ protected String getSCLContextKey() {
+ return getClass().getSimpleName();
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public Engine getEngine() {
+ return engine;
+ }
+
+ public Thread getThread() {
+ return executorThread;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public Object syncExec(Function fun) throws InterruptedException {
+
+ executor.execute(scheduleSyncExec);
+ SCLContext context = SCLContext.getCurrent();
+ Engine oldConnection = (Engine)context.put(getSCLContextKey(), engine);
+
+ try {
+ beginSyncExec.acquire();
+ Thread oldThread = executorThread;
+ executorThread = Thread.currentThread();
+ try {
+ return fun.apply(Tuple0.INSTANCE);
+ } finally {
+ executorThread = oldThread;
+ endSyncExec.release();
+ }
+ } finally {
+ context.put(getSCLContextKey(), oldConnection);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void asyncExec(final Function fun) {
+ executor.execute(new Runnable() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ SCLContext context = SCLContext.getCurrent();
+ context.put(getSCLContextKey(), engine);
+ fun.apply(Tuple0.INSTANCE);
+ }
+ });
+ }
+
+ @Override
+ public void syncExec(Runnable runnable) throws InterruptedException {
+
+ if(executorThread == Thread.currentThread()) {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ Logger.defaultLogError(t);
+ } finally {
+ }
+ return;
+ }
+
+ executor.execute(scheduleSyncExec);
+
+ beginSyncExec.acquire();
+ Thread oldThread = executorThread;
+ executorThread = Thread.currentThread();
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ Logger.defaultLogError(t);
+ } finally {
+ executorThread = oldThread;
+ endSyncExec.release();
+ }
+ }
+
+ @Override
+ public void asyncExec(Runnable runnable) {
+
+ if(executorThread == Thread.currentThread()) {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ Logger.defaultLogError(t);
+ } finally {
+ }
+ return;
+ }
+
+ executor.execute(runnable);
+ }
+
+ public void close() {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
+ List<Runnable> runnablesLeft = executor.shutdownNow();
+ if (!runnablesLeft.isEmpty()) {
+ getLogger().info("Runnables left for realm " + this + " after executor shutdown! " + runnablesLeft);
+ }
+ }
+ } catch (InterruptedException e) {
+ getLogger().info("Could not shutdown executor " + executor + " for realm " + this, e);
+ }
+
+ factory.clear();
+ factory = null;
+ // Should never be true
+ if (!executorThread.isAlive())
+ executorThread.interrupt();
+ executorThread = null;
+ executor = null;
+
+ // Clear nodeManager
+ nodeManager.clear();
+ nodeManager = null;
+ }
+
+ public StandardNodeManager<Node, Engine> getNodeManager() {
+ return nodeManager;
+ }
+
+ public abstract org.slf4j.Logger getLogger();
+
+ private void setExecutorThread(Thread t) {
+ executorThread = t;
+ }
+
+ private static class StandardRealmThreadFactory implements ThreadFactory {
+
+ private StandardRealm<?, ?> realm;
+
+ public StandardRealmThreadFactory(StandardRealm<?, ?> realm) {
+ this.realm = realm;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ realm.setExecutorThread(t);
+ return t;
+ }
+
+ void clear() {
+ realm = null;
+ }
+ }
+}