1 package org.simantics.db.layer0;
3 import java.util.concurrent.ExecutorService;
4 import java.util.concurrent.LinkedBlockingQueue;
5 import java.util.concurrent.Semaphore;
6 import java.util.concurrent.ThreadFactory;
7 import java.util.concurrent.ThreadPoolExecutor;
8 import java.util.concurrent.TimeUnit;
9 import java.util.function.Function;
11 import org.simantics.db.common.utils.Logger;
12 import org.simantics.scl.runtime.SCLContext;
13 import org.simantics.scl.runtime.tuple.Tuple0;
14 import org.simantics.simulator.variable.Realm;
16 abstract public class StandardRealm<Node, Engine extends StandardEngine<Node>> implements Realm {
19 Thread executorThread;
20 ExecutorService executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
21 new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
23 public Thread newThread(Runnable r) {
24 executorThread = new Thread(r);
25 return executorThread;
29 Semaphore beginSyncExec = new Semaphore(0);
30 Semaphore endSyncExec = new Semaphore(0);
33 StandardSessionManager<Node, Engine> sessionManager;
34 StandardNodeManager<Node, Engine> nodeManager;
36 Runnable scheduleSyncExec = new Runnable() {
39 beginSyncExec.release();
41 endSyncExec.acquire();
42 } catch (InterruptedException e) {
47 protected StandardRealm(StandardSessionManager<Node, Engine> sessionManager, Engine engine, String id) {
48 this.sessionManager = sessionManager;
51 this.nodeManager = createManager();
54 abstract protected StandardNodeManager<Node, Engine> createManager();
56 protected String getSCLContextKey() {
57 return getClass().getSimpleName();
60 public String getId() {
64 public Engine getEngine() {
68 public Thread getThread() {
69 return executorThread;
72 @SuppressWarnings({ "rawtypes", "unchecked" })
73 public Object syncExec(Function fun) throws InterruptedException {
75 executor.execute(scheduleSyncExec);
76 SCLContext context = SCLContext.getCurrent();
77 Engine oldConnection = (Engine)context.put(getSCLContextKey(), engine);
80 beginSyncExec.acquire();
81 Thread oldThread = executorThread;
82 executorThread = Thread.currentThread();
84 return fun.apply(Tuple0.INSTANCE);
86 executorThread = oldThread;
87 endSyncExec.release();
90 context.put(getSCLContextKey(), oldConnection);
94 @SuppressWarnings("rawtypes")
95 public void asyncExec(final Function fun) {
96 executor.execute(new Runnable() {
97 @SuppressWarnings("unchecked")
100 SCLContext context = SCLContext.getCurrent();
101 context.put(getSCLContextKey(), engine);
102 fun.apply(Tuple0.INSTANCE);
108 public void syncExec(Runnable runnable) throws InterruptedException {
110 if(executorThread == Thread.currentThread()) {
113 } catch (Throwable t) {
114 Logger.defaultLogError(t);
120 executor.execute(scheduleSyncExec);
122 beginSyncExec.acquire();
123 Thread oldThread = executorThread;
124 executorThread = Thread.currentThread();
127 } catch (Throwable t) {
128 Logger.defaultLogError(t);
130 executorThread = oldThread;
131 endSyncExec.release();
136 public void asyncExec(Runnable runnable) {
138 if(executorThread == Thread.currentThread()) {
141 } catch (Throwable t) {
142 Logger.defaultLogError(t);
148 executor.execute(runnable);
151 public void close() {
152 sessionManager.removeRealm(id);
155 executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
156 } catch (InterruptedException e) {
160 public StandardNodeManager<Node, Engine> getNodeManager() {