1 package org.simantics.db.layer0;
4 import java.util.concurrent.ExecutorService;
5 import java.util.concurrent.LinkedBlockingQueue;
6 import java.util.concurrent.Semaphore;
7 import java.util.concurrent.ThreadFactory;
8 import java.util.concurrent.ThreadPoolExecutor;
9 import java.util.concurrent.TimeUnit;
10 import java.util.function.Function;
12 import org.simantics.db.common.utils.Logger;
13 import org.simantics.scl.runtime.SCLContext;
14 import org.simantics.scl.runtime.tuple.Tuple0;
15 import org.simantics.simulator.variable.Realm;
17 abstract public class StandardRealm<Node, Engine extends StandardEngine<Node>> implements Realm {
20 private Thread executorThread;
21 private StandardRealmThreadFactory factory = new StandardRealmThreadFactory(this);
22 private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
23 new LinkedBlockingQueue<Runnable>(), factory);
24 private Semaphore beginSyncExec = new Semaphore(0);
25 private Semaphore endSyncExec = new Semaphore(0);
27 private Engine engine;
28 private StandardNodeManager<Node, Engine> nodeManager;
30 private Runnable scheduleSyncExec = new Runnable() {
33 beginSyncExec.release();
35 endSyncExec.acquire();
36 } catch (InterruptedException e) {
41 protected StandardRealm(Engine engine, String id) {
44 this.nodeManager = createManager();
47 abstract protected StandardNodeManager<Node, Engine> createManager();
49 protected String getSCLContextKey() {
50 return getClass().getSimpleName();
53 public String getId() {
57 public Engine getEngine() {
61 public Thread getThread() {
62 return executorThread;
65 @SuppressWarnings({ "rawtypes", "unchecked" })
66 public Object syncExec(Function fun) throws InterruptedException {
68 executor.execute(scheduleSyncExec);
69 SCLContext context = SCLContext.getCurrent();
70 Engine oldConnection = (Engine)context.put(getSCLContextKey(), engine);
73 beginSyncExec.acquire();
74 Thread oldThread = executorThread;
75 executorThread = Thread.currentThread();
77 return fun.apply(Tuple0.INSTANCE);
79 executorThread = oldThread;
80 endSyncExec.release();
83 context.put(getSCLContextKey(), oldConnection);
87 @SuppressWarnings("rawtypes")
88 public void asyncExec(final Function fun) {
89 executor.execute(new Runnable() {
90 @SuppressWarnings("unchecked")
93 SCLContext context = SCLContext.getCurrent();
94 context.put(getSCLContextKey(), engine);
95 fun.apply(Tuple0.INSTANCE);
101 public void syncExec(Runnable runnable) throws InterruptedException {
103 if(executorThread == Thread.currentThread()) {
106 } catch (Throwable t) {
107 Logger.defaultLogError(t);
113 executor.execute(scheduleSyncExec);
115 beginSyncExec.acquire();
116 Thread oldThread = executorThread;
117 executorThread = Thread.currentThread();
120 } catch (Throwable t) {
121 Logger.defaultLogError(t);
123 executorThread = oldThread;
124 endSyncExec.release();
129 public void asyncExec(Runnable runnable) {
131 if(executorThread == Thread.currentThread()) {
134 } catch (Throwable t) {
135 Logger.defaultLogError(t);
141 executor.execute(runnable);
144 public void close() {
147 if (!executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
148 List<Runnable> runnablesLeft = executor.shutdownNow();
149 if (!runnablesLeft.isEmpty()) {
150 getLogger().info("Runnables left for realm " + this + " after executor shutdown! " + runnablesLeft);
153 } catch (InterruptedException e) {
154 getLogger().info("Could not shutdown executor " + executor + " for realm " + this, e);
159 // Should never be true
160 if (!executorThread.isAlive())
161 executorThread.interrupt();
162 executorThread = null;
170 public StandardNodeManager<Node, Engine> getNodeManager() {
174 public abstract org.slf4j.Logger getLogger();
176 private void setExecutorThread(Thread t) {
180 private static class StandardRealmThreadFactory implements ThreadFactory {
182 private StandardRealm<?, ?> realm;
184 public StandardRealmThreadFactory(StandardRealm<?, ?> realm) {
189 public Thread newThread(Runnable r) {
190 Thread t = new Thread(r);
191 realm.setExecutorThread(t);