1 package org.simantics.db.layer0;
\r
3 import java.util.concurrent.ExecutorService;
\r
4 import java.util.concurrent.LinkedBlockingQueue;
\r
5 import java.util.concurrent.Semaphore;
\r
6 import java.util.concurrent.ThreadFactory;
\r
7 import java.util.concurrent.ThreadPoolExecutor;
\r
8 import java.util.concurrent.TimeUnit;
\r
9 import java.util.function.Function;
\r
11 import org.simantics.db.common.utils.Logger;
\r
12 import org.simantics.scl.runtime.SCLContext;
\r
13 import org.simantics.scl.runtime.tuple.Tuple0;
\r
14 import org.simantics.simulator.variable.Realm;
\r
16 abstract public class StandardRealm<Node, Engine extends StandardEngine<Node>> implements Realm {
\r
19 Thread executorThread;
\r
20 ExecutorService executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
\r
21 new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
\r
23 public Thread newThread(Runnable r) {
\r
24 executorThread = new Thread(r);
\r
25 return executorThread;
\r
29 Semaphore beginSyncExec = new Semaphore(0);
\r
30 Semaphore endSyncExec = new Semaphore(0);
\r
33 StandardSessionManager<Node, Engine> sessionManager;
\r
34 StandardNodeManager<Node, Engine> nodeManager;
\r
36 Runnable scheduleSyncExec = new Runnable() {
\r
39 beginSyncExec.release();
\r
41 endSyncExec.acquire();
\r
42 } catch (InterruptedException e) {
\r
47 protected StandardRealm(StandardSessionManager<Node, Engine> sessionManager, Engine engine, String id) {
\r
48 this.sessionManager = sessionManager;
\r
49 this.engine = engine;
\r
51 this.nodeManager = createManager();
\r
54 abstract protected StandardNodeManager<Node, Engine> createManager();
\r
56 protected String getSCLContextKey() {
\r
57 return getClass().getSimpleName();
\r
60 public String getId() {
\r
64 public Engine getEngine() {
\r
68 public Thread getThread() {
\r
69 return executorThread;
\r
72 @SuppressWarnings({ "rawtypes", "unchecked" })
\r
73 public Object syncExec(Function fun) throws InterruptedException {
\r
75 executor.execute(scheduleSyncExec);
\r
76 SCLContext context = SCLContext.getCurrent();
\r
77 Engine oldConnection = (Engine)context.put(getSCLContextKey(), engine);
\r
80 beginSyncExec.acquire();
\r
81 Thread oldThread = executorThread;
\r
82 executorThread = Thread.currentThread();
\r
84 return fun.apply(Tuple0.INSTANCE);
\r
86 executorThread = oldThread;
\r
87 endSyncExec.release();
\r
90 context.put(getSCLContextKey(), oldConnection);
\r
94 @SuppressWarnings("rawtypes")
\r
95 public void asyncExec(final Function fun) {
\r
96 executor.execute(new Runnable() {
\r
97 @SuppressWarnings("unchecked")
\r
100 SCLContext context = SCLContext.getCurrent();
\r
101 context.put(getSCLContextKey(), engine);
\r
102 fun.apply(Tuple0.INSTANCE);
\r
108 public void syncExec(Runnable runnable) throws InterruptedException {
\r
110 if(executorThread == Thread.currentThread()) {
\r
113 } catch (Throwable t) {
\r
114 Logger.defaultLogError(t);
\r
120 executor.execute(scheduleSyncExec);
\r
122 beginSyncExec.acquire();
\r
123 Thread oldThread = executorThread;
\r
124 executorThread = Thread.currentThread();
\r
127 } catch (Throwable t) {
\r
128 Logger.defaultLogError(t);
\r
130 executorThread = oldThread;
\r
131 endSyncExec.release();
\r
136 public void asyncExec(Runnable runnable) {
\r
138 if(executorThread == Thread.currentThread()) {
\r
141 } catch (Throwable t) {
\r
142 Logger.defaultLogError(t);
\r
148 executor.execute(runnable);
\r
151 public void close() {
\r
152 sessionManager.removeRealm(id);
\r
153 executor.shutdown();
\r
155 executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
\r
156 } catch (InterruptedException e) {
\r
160 public StandardNodeManager<Node, Engine> getNodeManager() {
\r
161 return nodeManager;
\r