]> gerrit.simantics Code Review - simantics/platform.git/blob
9ee57f51055aec994a49d33f08726fec5a21c9ca
[simantics/platform.git] /
1 package org.simantics.simulator.toolkit;
2
3 import java.math.BigDecimal;
4 import java.util.ArrayList;
5 import java.util.concurrent.Callable;
6 import java.util.concurrent.CopyOnWriteArrayList;
7 import java.util.concurrent.Semaphore;
8
9 import org.simantics.simulator.ExperimentState;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 /**
14  * @author Antti Villberg
15  * @since 1.34.0
16  */
17 abstract public class DynamicExperimentThread extends Thread {
18
19         private static final Logger LOGGER = LoggerFactory.getLogger(DynamicExperimentThread.class);
20
21         private CopyOnWriteArrayList<DynamicExperimentThreadListener> listeners = new CopyOnWriteArrayList<>();
22
23         private ExperimentState state = StandardExperimentStates.CREATED;
24
25         private long runStart = 0;
26
27         private double desiredRealtimeRatio = 1000.0;
28         private double obtainedRealtimeRatio = 1.0;
29
30         private long runTimeNs = 0;
31         private long endTimeNs = 0;
32         protected long simulationStepNs = 0;
33         protected double stepInSeconds = 1.0;
34
35         public DynamicExperimentThread() {
36         }
37
38         private void updateTimes() {
39                 long time = System.nanoTime();
40                 long elapsed = time-runStart; 
41
42                 obtainedRealtimeRatio = longToDoubleDivision(runTimeNs, elapsed);
43         }
44
45         private long rt;
46         private long rt_l;
47
48         protected double longToDoubleDivision(long l1, long l2) {
49                 rt = l1 / l2;
50                 rt_l = l1 % l2;
51                 double d = ((double)rt_l)/((double)l2);
52                 d += (double)rt;
53                 return d;
54         }
55
56         protected ArrayList<Runnable> tasks = new ArrayList<>();
57
58         abstract public void step(double stepLengthNanoSeconds);
59
60         public boolean inState(Class<? extends ExperimentState> state) {
61                 return state.isInstance(this.state);
62         }
63
64         public void initialize() throws Exception {
65         }
66
67         public void deinitialize() throws Exception {
68         }
69
70         long stepTime = 0;
71         long taskTime = 0;
72
73         @Override
74         public void run() {
75
76                 try {
77
78                         try {
79
80                                 initialize();
81
82                                 try {
83                                         runReally();
84                                 } catch (Exception e) {
85                                         LOGGER.error("Unhandled exception while running simulation thread", e);
86                                 } 
87
88                         } catch (Exception e) {
89                                 LOGGER.error("Unhandled exception while initializing simulation thread", e);
90                         } 
91
92                 } finally {
93
94                         try {
95                                 deinitialize();
96                         } catch (Exception e) {
97                                 LOGGER.error("Error while deinitializing simulation thread", e);
98                         }
99
100                 }
101
102         }
103
104         protected boolean inActiveState() {
105                 return !(
106                                 inState(StandardExperimentStates.Disposed.class)
107                                 || inState(StandardExperimentStates.Disposing.class)
108                                 //|| inState(StandardExperimentStates.Failure.class)
109                                 || inState(StandardExperimentStates.ToBeDisposed.class)
110                                 );
111         }
112
113         private void runReally() {
114
115                 while(inActiveState()) {
116
117                         if(inState(StandardExperimentStates.Running.class)) {
118
119                                 long asd = System.nanoTime();
120                                 step(simulationStepNs);
121                                 stepTime += System.nanoTime() - asd;
122                                 runTimeNs += simulationStepNs;
123                                 updateTimes();
124                                 long asd2 = System.nanoTime();
125                                 runTasks();
126                                 taskTime += System.nanoTime() - asd2;
127
128                                 //System.err.println(" st = " + 1e-9*stepTime + " tt = " + 1e-9*taskTime);
129
130                                 while(obtainedRealtimeRatio > desiredRealtimeRatio) {
131                                         int ran = runTasks();
132                                         if(ran == 0) {
133                                                 long elapsed = System.nanoTime()-runStart; 
134                                                 long deltaNs = BigDecimal.valueOf(runTimeNs).divide(BigDecimal.valueOf(desiredRealtimeRatio)).longValue() - elapsed;
135
136                                                 long deltaMs = deltaNs / 1000000;
137                                                 int deltaNsRem = (int)(deltaNs % 1000000);
138
139                                                 if(deltaNs > 0) {
140                                                         synchronized(tasks) {
141                                                                 if (inState(StandardExperimentStates.Running.class)) {
142                                                                         try {
143                                                                                 tasks.wait(deltaMs, deltaNsRem);
144                                                                         } catch (InterruptedException e) {
145                                                                                 LOGGER.warn("Dynamic experiment thread '" + this.getName() + "' (" + this.getClass().getName() + ") interrupted", e);
146                                                                         }
147                                                                 }
148                                                         }
149                                                 }
150                                         }
151                                         updateTimes();
152                                 }
153
154                         } else {
155
156                                 synchronized(tasks) {
157                                         while(!inState(StandardExperimentStates.Running.class) && inActiveState()) {
158                                                 int ran = runTasks();
159                                                 if(ran == 0) {
160                                                         try {
161                                                                 tasks.wait(Integer.MAX_VALUE);
162                                                         } catch (InterruptedException e) {
163                                                                 LOGGER.warn("Dynamic experiment thread '" + this.getName() + "' (" + this.getClass().getName() + ") interrupted", e);
164                                                         }
165                                                 }
166                                         }
167
168                                 }
169
170                         }
171
172                         if(runTimeNs >= endTimeNs && inActiveState())
173                                 changeState(StandardExperimentStates.STOPPED);
174
175                 }
176
177         }
178
179         Thread executorThread = this;
180
181         Semaphore beginSyncExec = new Semaphore(0);
182         Semaphore endSyncExec = new Semaphore(0);
183
184         Runnable scheduleSyncExec = () -> {
185                 beginSyncExec.release();
186                 try {
187                         endSyncExec.acquire();
188                 } catch (InterruptedException e) {
189                 }
190         };
191
192         public int runTasks() {
193                 ArrayList<Runnable> todo = new ArrayList<>();
194                 synchronized(tasks) {
195                         todo.addAll(tasks);
196                         tasks.clear();
197                 }
198                 todo.forEach(Runnable::run);
199                 return todo.size();
200         }
201
202         public void queue(Runnable runnable) {
203                 synchronized(tasks) {
204                         tasks.add(runnable);
205                         tasks.notify();
206                 }
207         }
208
209         public <T> T syncExec(Callable<T> callable) throws InterruptedException {
210
211                 if(executorThread == Thread.currentThread()) {
212                         try {
213                                 return callable.call();
214                         } catch (Throwable t) {
215                                 LOGGER.error("syncExec in current thread failed", t);
216                                 return null;
217                         } finally {
218                         }
219                 }
220
221                 queue(scheduleSyncExec);
222
223                 beginSyncExec.acquire();
224                 Thread oldThread = executorThread;
225                 executorThread = Thread.currentThread();
226                 try {
227                         return callable.call();
228                 } catch (Throwable t) {
229                         LOGGER.error("syncExec failed", t);
230                         return null;
231                 } finally {
232                         executorThread = oldThread;
233                         endSyncExec.release();
234                 }
235
236         }
237
238         public void asyncExec(Runnable runnable) {
239
240                 if(executorThread == Thread.currentThread()) {
241                         try {
242                                 runnable.run();
243                         } catch (Throwable t) {
244                                 LOGGER.error("asyncExec failed", t);
245                         } finally {
246                         }
247                         return;
248                 }
249
250                 queue(runnable);
251
252         }
253
254         public void setSimulationStepNs(long ns) {
255                 simulationStepNs = ns;
256                 stepInSeconds = BigDecimal.valueOf(simulationStepNs).multiply(BigDecimal.valueOf(1e-9)).doubleValue();
257         }
258
259         public void runDuration(long ns) {
260                 runStart = System.nanoTime();
261                 runTimeNs = 0;
262                 endTimeNs = ns;
263                 synchronized(tasks) {
264                         changeState(StandardExperimentStates.RUNNING);
265                         tasks.notify();
266                 }
267         }
268
269         public ExperimentState getExperimentState() {
270                 return state;
271         }
272
273         public void changeState(ExperimentState state) {
274                 this.state = state;
275                 fireStateChanged(state);
276         }
277
278         public void addListener(DynamicExperimentThreadListener listener) {
279                 if(!listeners.contains(listener))
280                         listeners.add(listener);
281         }
282
283         public void removeListener(DynamicExperimentThreadListener listener) {
284                 listeners.remove(listener);
285         }
286
287         protected void fireAfterStep() {
288                 listeners.forEach(DynamicExperimentThreadListener::afterStep);
289         }
290
291         protected void fireBeforeStep() {
292                 listeners.forEach(DynamicExperimentThreadListener::beforeStep);
293         }
294
295         protected void fireStateChanged(ExperimentState newState) {
296                 listeners.forEach(l -> l.stateChanged(newState));
297         }
298
299 }