]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/StandardRealm.java
68d002458b3af80e448f19f2f4314dc6b4794af2
[simantics/platform.git] / bundles / org.simantics.db.layer0 / src / org / simantics / db / layer0 / StandardRealm.java
1 package org.simantics.db.layer0;
2
3 import java.util.List;
4 import java.util.concurrent.ExecutorService;
5 import java.util.concurrent.LinkedBlockingQueue;
6 import java.util.concurrent.Semaphore;
7 import java.util.concurrent.ThreadFactory;
8 import java.util.concurrent.ThreadPoolExecutor;
9 import java.util.concurrent.TimeUnit;
10 import java.util.function.Function;
11
12 import org.simantics.db.common.utils.Logger;
13 import org.simantics.scl.runtime.SCLContext;
14 import org.simantics.scl.runtime.tuple.Tuple0;
15 import org.simantics.simulator.variable.Realm;
16
17 abstract public class StandardRealm<Node, Engine extends StandardEngine<Node>> implements Realm {
18
19     private String id;
20     private Thread executorThread;
21     private StandardRealmThreadFactory factory = new StandardRealmThreadFactory(this);
22     private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
23             new LinkedBlockingQueue<Runnable>(), factory);
24     private Semaphore beginSyncExec = new Semaphore(0);
25     private Semaphore endSyncExec = new Semaphore(0);
26     
27     private Engine engine;
28     private StandardNodeManager<Node, Engine> nodeManager;
29     
30     private Runnable scheduleSyncExec = new Runnable() {
31         @Override
32         public void run() {
33             beginSyncExec.release();
34             try {
35                 endSyncExec.acquire();
36             } catch (InterruptedException e) {
37             }
38         }
39     };
40     
41     protected StandardRealm(Engine engine, String id) {
42         this.engine = engine;
43         this.id = id;
44         this.nodeManager = createManager();
45     }
46
47     abstract protected StandardNodeManager<Node, Engine> createManager();
48     
49     protected String getSCLContextKey() {
50         return getClass().getSimpleName();
51     }
52
53     public String getId() {
54         return id;
55     }
56     
57     public Engine getEngine() {
58         return engine;
59     }
60
61     public Thread getThread() {
62         return executorThread;
63     }
64     
65     @SuppressWarnings({ "rawtypes", "unchecked" })
66     public Object syncExec(Function fun) throws InterruptedException {
67         
68         executor.execute(scheduleSyncExec);
69         SCLContext context = SCLContext.getCurrent();
70         Engine oldConnection = (Engine)context.put(getSCLContextKey(), engine);
71         
72         try {
73             beginSyncExec.acquire();
74             Thread oldThread = executorThread;
75             executorThread = Thread.currentThread();
76             try {
77                 return fun.apply(Tuple0.INSTANCE);
78             } finally {
79                 executorThread = oldThread;
80                 endSyncExec.release();
81             }
82         } finally {
83             context.put(getSCLContextKey(), oldConnection);
84         }
85     }
86     
87     @SuppressWarnings("rawtypes")
88     public void asyncExec(final Function fun) {
89         executor.execute(new Runnable() {
90             @SuppressWarnings("unchecked")
91             @Override
92             public void run() {
93                 SCLContext context = SCLContext.getCurrent();
94                 context.put(getSCLContextKey(), engine);
95                 fun.apply(Tuple0.INSTANCE);
96             }
97         });
98     }
99
100     @Override
101     public void syncExec(Runnable runnable) throws InterruptedException {
102         
103         if(executorThread == Thread.currentThread()) {
104             try {
105                 runnable.run();
106             } catch (Throwable t) {
107                 Logger.defaultLogError(t);
108             } finally {
109             }
110             return;
111         }
112
113         executor.execute(scheduleSyncExec);
114         
115         beginSyncExec.acquire();
116         Thread oldThread = executorThread;
117         executorThread = Thread.currentThread();
118         try {
119             runnable.run();
120         } catch (Throwable t) {
121                 Logger.defaultLogError(t);
122         } finally {
123             executorThread = oldThread;
124             endSyncExec.release();
125         }
126     }
127
128     @Override
129     public void asyncExec(Runnable runnable) {
130
131         if(executorThread == Thread.currentThread()) {
132             try {
133                 runnable.run();
134             } catch (Throwable t) {
135                 Logger.defaultLogError(t);
136             } finally {
137             }
138             return;
139         }
140         
141         executor.execute(runnable);
142     }
143     
144     public void close() {
145         executor.shutdown();
146         try {
147             if (!executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
148                 List<Runnable> runnablesLeft = executor.shutdownNow();
149                 if (!runnablesLeft.isEmpty()) {
150                     getLogger().info("Runnables left for realm " + this + " after executor shutdown! " + runnablesLeft);
151                 }
152             }
153         } catch (InterruptedException e) {
154             getLogger().info("Could not shutdown executor " + executor + " for realm " + this, e);
155         }
156         
157         factory.clear();
158         factory = null;
159         // Should never be true
160         if (!executorThread.isAlive())
161             executorThread.interrupt();
162         executorThread = null;
163         executor = null;
164         
165         // Clear nodeManager
166         nodeManager.clear();
167         nodeManager = null;
168     }
169
170     public StandardNodeManager<Node, Engine> getNodeManager() {
171         return nodeManager;
172     }
173
174     public abstract org.slf4j.Logger getLogger();
175
176     private void setExecutorThread(Thread t) {
177         executorThread = t;
178     }
179
180     private static class StandardRealmThreadFactory implements ThreadFactory {
181         
182         private StandardRealm<?, ?> realm;
183
184         public StandardRealmThreadFactory(StandardRealm<?, ?> realm) {
185             this.realm = realm;
186         }
187         
188         @Override
189         public Thread newThread(Runnable r) {
190             Thread t = new Thread(r);
191             realm.setExecutorThread(t);
192             return t;
193         }
194         
195         void clear() {
196             realm = null;
197         }
198     }
199 }