2232497a22bb5fbc037dd4fd935b0dfb89bfbb63
[simantics/platform.git] / bundles / org.simantics.db.layer0 / src / org / simantics / db / layer0 / StandardRealm.java
1 package org.simantics.db.layer0;
2
3 import java.util.concurrent.ExecutorService;
4 import java.util.concurrent.LinkedBlockingQueue;
5 import java.util.concurrent.Semaphore;
6 import java.util.concurrent.ThreadFactory;
7 import java.util.concurrent.ThreadPoolExecutor;
8 import java.util.concurrent.TimeUnit;
9 import java.util.function.Function;
10
11 import org.simantics.db.common.utils.Logger;
12 import org.simantics.scl.runtime.SCLContext;
13 import org.simantics.scl.runtime.tuple.Tuple0;
14 import org.simantics.simulator.variable.Realm;
15
16 abstract public class StandardRealm<Node, Engine extends StandardEngine<Node>> implements Realm {
17         
18     String id;
19     Thread executorThread;
20     ExecutorService executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
21             new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
22         @Override
23         public Thread newThread(Runnable r) {
24             executorThread = new Thread(r);
25             return executorThread;
26         }
27     });
28     
29     Semaphore beginSyncExec = new Semaphore(0);
30     Semaphore endSyncExec = new Semaphore(0);
31     
32     Engine engine;
33     StandardSessionManager<Node, Engine> sessionManager;
34     StandardNodeManager<Node, Engine> nodeManager;
35     
36     Runnable scheduleSyncExec = new Runnable() {
37         @Override
38         public void run() {
39             beginSyncExec.release();
40             try {
41                 endSyncExec.acquire();
42             } catch (InterruptedException e) {
43             }
44         }
45     };
46     
47     protected StandardRealm(StandardSessionManager<Node, Engine> sessionManager, Engine engine, String id) {
48         this.sessionManager = sessionManager;
49         this.engine = engine;
50         this.id = id;
51         this.nodeManager = createManager();
52     }
53     
54     abstract protected StandardNodeManager<Node, Engine> createManager();
55     
56     protected String getSCLContextKey() {
57         return getClass().getSimpleName();
58     }
59
60     public String getId() {
61         return id;
62     }
63     
64     public Engine getEngine() {
65         return engine;
66     }
67
68     public Thread getThread() {
69         return executorThread;
70     }
71     
72     @SuppressWarnings({ "rawtypes", "unchecked" })
73     public Object syncExec(Function fun) throws InterruptedException {
74         
75         executor.execute(scheduleSyncExec);
76         SCLContext context = SCLContext.getCurrent();
77         Engine oldConnection = (Engine)context.put(getSCLContextKey(), engine);
78         
79         try {
80             beginSyncExec.acquire();
81             Thread oldThread = executorThread;
82             executorThread = Thread.currentThread();
83             try {
84                 return fun.apply(Tuple0.INSTANCE);
85             } finally {
86                 executorThread = oldThread;
87                 endSyncExec.release();
88             }
89         } finally {
90             context.put(getSCLContextKey(), oldConnection);
91         }
92     }
93     
94     @SuppressWarnings("rawtypes")
95     public void asyncExec(final Function fun) {
96         executor.execute(new Runnable() {
97             @SuppressWarnings("unchecked")
98             @Override
99             public void run() {
100                 SCLContext context = SCLContext.getCurrent();
101                 context.put(getSCLContextKey(), engine);
102                 fun.apply(Tuple0.INSTANCE);
103             }
104         });
105     }
106
107     @Override
108     public void syncExec(Runnable runnable) throws InterruptedException {
109         
110         if(executorThread == Thread.currentThread()) {
111             try {
112                 runnable.run();
113             } catch (Throwable t) {
114                 Logger.defaultLogError(t);
115             } finally {
116             }
117             return;
118         }
119
120         executor.execute(scheduleSyncExec);
121         
122         beginSyncExec.acquire();
123         Thread oldThread = executorThread;
124         executorThread = Thread.currentThread();
125         try {
126             runnable.run();
127         } catch (Throwable t) {
128                 Logger.defaultLogError(t);
129         } finally {
130             executorThread = oldThread;
131             endSyncExec.release();
132         }
133     }
134
135     @Override
136     public void asyncExec(Runnable runnable) {
137
138         if(executorThread == Thread.currentThread()) {
139             try {
140                 runnable.run();
141             } catch (Throwable t) {
142                 Logger.defaultLogError(t);
143             } finally {
144             }
145             return;
146         }
147         
148         executor.execute(runnable);
149     }
150     
151     public void close() {
152         sessionManager.removeRealm(id);
153         executor.shutdown();
154         try {
155             executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
156         } catch (InterruptedException e) {
157         }
158     }
159
160     public StandardNodeManager<Node, Engine> getNodeManager() {
161         return nodeManager;
162     }
163     
164 }