]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.simulator.toolkit/src/org/simantics/simulator/toolkit/StandardRealm.java
Merge "Refactoring of simulator toolkit"
[simantics/platform.git] / bundles / org.simantics.simulator.toolkit / src / org / simantics / simulator / toolkit / StandardRealm.java
1 package org.simantics.simulator.toolkit;
2
3 import java.util.List;
4 import java.util.concurrent.LinkedBlockingQueue;
5 import java.util.concurrent.Semaphore;
6 import java.util.concurrent.ThreadFactory;
7 import java.util.concurrent.ThreadPoolExecutor;
8 import java.util.concurrent.TimeUnit;
9 import java.util.function.Function;
10
11 import org.simantics.scl.runtime.SCLContext;
12 import org.simantics.scl.runtime.tuple.Tuple0;
13 import org.simantics.simulator.variable.Realm;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 abstract public class StandardRealm<Node, Engine extends StandardNodeManagerSupport<Node>> implements Realm {
18
19     private static final Logger LOGGER = LoggerFactory.getLogger(StandardRealm.class);
20
21     private String id;
22     protected Thread executorThread;
23     private StandardRealmThreadFactory factory = new StandardRealmThreadFactory(this);
24     private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
25             new LinkedBlockingQueue<>(), factory);
26     private Semaphore beginSyncExec = new Semaphore(0);
27     private Semaphore endSyncExec = new Semaphore(0);
28
29     private Engine engine;
30     protected StandardNodeManager<Node, Engine> nodeManager;
31
32     private Runnable scheduleSyncExec = new Runnable() {
33         @Override
34         public void run() {
35             beginSyncExec.release();
36             try {
37                 endSyncExec.acquire();
38             } catch (InterruptedException e) {
39             }
40         }
41     };
42
43     protected StandardRealm(Engine engine, String id) {
44         this.engine = engine;
45         this.id = id;
46         this.nodeManager = createManager();
47     }
48
49     protected StandardNodeManager<Node, Engine> createManager(Node root) {
50         return new StandardNodeManager<Node, Engine>(this, root);
51     }
52
53     protected StandardNodeManager<Node, Engine> createManager() {
54         return createManager(createRootNode());
55     }
56
57     /*
58      * For compatibility reasons. Existing implementations implement createManager() directly and in that case this is not needed.
59      * New implementations should not implement createManager() but rather implement this.
60      */
61     protected Node createRootNode() {
62         throw new UnsupportedOperationException();
63     }
64
65     protected String getSCLContextKey() {
66         return getClass().getSimpleName();
67     }
68
69     public String getId() {
70         return id;
71     }
72
73     public Engine getEngine() {
74         return engine;
75     }
76
77     public Thread getThread() {
78         return executorThread;
79     }
80
81     @SuppressWarnings({ "rawtypes", "unchecked" })
82     public Object syncExec(Function fun) throws InterruptedException {
83         executor.execute(scheduleSyncExec);
84         SCLContext context = SCLContext.getCurrent();
85         Engine oldConnection = (Engine)context.put(getSCLContextKey(), engine);
86
87         try {
88             beginSyncExec.acquire();
89             Thread oldThread = executorThread;
90             executorThread = Thread.currentThread();
91             try {
92                 return fun.apply(Tuple0.INSTANCE);
93             } finally {
94                 executorThread = oldThread;
95                 endSyncExec.release();
96             }
97         } finally {
98             context.put(getSCLContextKey(), oldConnection);
99         }
100     }
101
102     @SuppressWarnings("rawtypes")
103     public void asyncExec(final Function fun) {
104         executor.execute(new Runnable() {
105             @SuppressWarnings("unchecked")
106             @Override
107             public void run() {
108                 SCLContext context = SCLContext.getCurrent();
109                 context.put(getSCLContextKey(), engine);
110                 fun.apply(Tuple0.INSTANCE);
111             }
112         });
113     }
114
115     @Override
116     public void syncExec(Runnable runnable) throws InterruptedException {
117         if(executorThread == Thread.currentThread()) {
118             try {
119                 runnable.run();
120             } catch (Throwable t) {
121                 LOGGER.error("Error executing runnable in realm", t);
122             } finally {
123             }
124             return;
125         }
126
127         executor.execute(scheduleSyncExec);
128
129         beginSyncExec.acquire();
130         Thread oldThread = executorThread;
131         executorThread = Thread.currentThread();
132         try {
133             runnable.run();
134         } catch (Throwable t) {
135             LOGGER.error("Error executing runnable in realm", t);
136         } finally {
137             executorThread = oldThread;
138             endSyncExec.release();
139         }
140     }
141
142     @Override
143     public void asyncExec(Runnable runnable) {
144         if(executorThread == Thread.currentThread()) {
145             try {
146                 runnable.run();
147             } catch (Throwable t) {
148                 LOGGER.error("Error executing runnable in realm", t);
149             } finally {
150             }
151             return;
152         }
153
154         executor.execute(runnable);
155     }
156
157     public void close() {
158         executor.shutdown();
159         try {
160             if (!executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
161                 List<Runnable> runnablesLeft = executor.shutdownNow();
162                 if (!runnablesLeft.isEmpty()) {
163                     getLogger().info("Runnables left for realm " + this + " after executor shutdown! " + runnablesLeft);
164                 }
165             }
166         } catch (InterruptedException e) {
167             getLogger().info("Could not shutdown executor " + executor + " for realm " + this, e);
168         }
169
170         factory.clear();
171         factory = null;
172         // Should never be true
173         if (!executorThread.isAlive())
174             executorThread.interrupt();
175         executorThread = null;
176         executor = null;
177
178         // Clear nodeManager
179         nodeManager.clear();
180         nodeManager = null;
181     }
182
183     public StandardNodeManager<Node, Engine> getNodeManager() {
184         return nodeManager;
185     }
186
187     public abstract org.slf4j.Logger getLogger();
188
189     private void setExecutorThread(Thread t) {
190         executorThread = t;
191     }
192
193     private static class StandardRealmThreadFactory implements ThreadFactory {
194
195         private StandardRealm<?, ?> realm;
196
197         public StandardRealmThreadFactory(StandardRealm<?, ?> realm) {
198             this.realm = realm;
199         }
200
201         @Override
202         public Thread newThread(Runnable r) {
203             Thread t = new Thread(r);
204             realm.setExecutorThread(t);
205             return t;
206         }
207
208         void clear() {
209             realm = null;
210         }
211     }
212
213 }