--- /dev/null
+package org.simantics.db.layer0;\r
+\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.LinkedBlockingQueue;\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.ThreadFactory;\r
+import java.util.concurrent.ThreadPoolExecutor;\r
+import java.util.concurrent.TimeUnit;\r
+import java.util.function.Function;\r
+\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.scl.runtime.SCLContext;\r
+import org.simantics.scl.runtime.tuple.Tuple0;\r
+import org.simantics.simulator.variable.Realm;\r
+\r
+abstract public class StandardRealm<Node, Engine extends StandardEngine<Node>> implements Realm {\r
+ \r
+ String id;\r
+ Thread executorThread;\r
+ ExecutorService executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,\r
+ new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {\r
+ @Override\r
+ public Thread newThread(Runnable r) {\r
+ executorThread = new Thread(r);\r
+ return executorThread;\r
+ }\r
+ });\r
+ \r
+ Semaphore beginSyncExec = new Semaphore(0);\r
+ Semaphore endSyncExec = new Semaphore(0);\r
+ \r
+ Engine engine;\r
+ StandardSessionManager<Node, Engine> sessionManager;\r
+ StandardNodeManager<Node, Engine> nodeManager;\r
+ \r
+ Runnable scheduleSyncExec = new Runnable() {\r
+ @Override\r
+ public void run() {\r
+ beginSyncExec.release();\r
+ try {\r
+ endSyncExec.acquire();\r
+ } catch (InterruptedException e) {\r
+ }\r
+ }\r
+ };\r
+ \r
+ protected StandardRealm(StandardSessionManager<Node, Engine> sessionManager, Engine engine, String id) {\r
+ this.sessionManager = sessionManager;\r
+ this.engine = engine;\r
+ this.id = id;\r
+ this.nodeManager = createManager();\r
+ }\r
+ \r
+ abstract protected StandardNodeManager<Node, Engine> createManager();\r
+ \r
+ protected String getSCLContextKey() {\r
+ return getClass().getSimpleName();\r
+ }\r
+\r
+ public String getId() {\r
+ return id;\r
+ }\r
+ \r
+ public Engine getEngine() {\r
+ return engine;\r
+ }\r
+\r
+ public Thread getThread() {\r
+ return executorThread;\r
+ }\r
+ \r
+ @SuppressWarnings({ "rawtypes", "unchecked" })\r
+ public Object syncExec(Function fun) throws InterruptedException {\r
+ \r
+ executor.execute(scheduleSyncExec);\r
+ SCLContext context = SCLContext.getCurrent();\r
+ Engine oldConnection = (Engine)context.put(getSCLContextKey(), engine);\r
+ \r
+ try {\r
+ beginSyncExec.acquire();\r
+ Thread oldThread = executorThread;\r
+ executorThread = Thread.currentThread();\r
+ try {\r
+ return fun.apply(Tuple0.INSTANCE);\r
+ } finally {\r
+ executorThread = oldThread;\r
+ endSyncExec.release();\r
+ }\r
+ } finally {\r
+ context.put(getSCLContextKey(), oldConnection);\r
+ }\r
+ }\r
+ \r
+ @SuppressWarnings("rawtypes")\r
+ public void asyncExec(final Function fun) {\r
+ executor.execute(new Runnable() {\r
+ @SuppressWarnings("unchecked")\r
+ @Override\r
+ public void run() {\r
+ SCLContext context = SCLContext.getCurrent();\r
+ context.put(getSCLContextKey(), engine);\r
+ fun.apply(Tuple0.INSTANCE);\r
+ }\r
+ });\r
+ }\r
+\r
+ @Override\r
+ public void syncExec(Runnable runnable) throws InterruptedException {\r
+ \r
+ if(executorThread == Thread.currentThread()) {\r
+ try {\r
+ runnable.run();\r
+ } catch (Throwable t) {\r
+ Logger.defaultLogError(t);\r
+ } finally {\r
+ }\r
+ return;\r
+ }\r
+\r
+ executor.execute(scheduleSyncExec);\r
+ \r
+ beginSyncExec.acquire();\r
+ Thread oldThread = executorThread;\r
+ executorThread = Thread.currentThread();\r
+ try {\r
+ runnable.run();\r
+ } catch (Throwable t) {\r
+ Logger.defaultLogError(t);\r
+ } finally {\r
+ executorThread = oldThread;\r
+ endSyncExec.release();\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void asyncExec(Runnable runnable) {\r
+\r
+ if(executorThread == Thread.currentThread()) {\r
+ try {\r
+ runnable.run();\r
+ } catch (Throwable t) {\r
+ Logger.defaultLogError(t);\r
+ } finally {\r
+ }\r
+ return;\r
+ }\r
+ \r
+ executor.execute(runnable);\r
+ }\r
+ \r
+ public void close() {\r
+ sessionManager.removeRealm(id);\r
+ executor.shutdown();\r
+ try {\r
+ executor.awaitTermination(500L, TimeUnit.MILLISECONDS);\r
+ } catch (InterruptedException e) {\r
+ }\r
+ }\r
+\r
+ public StandardNodeManager<Node, Engine> getNodeManager() {\r
+ return nodeManager;\r
+ }\r
+ \r
+}\r