Fixed race condition in DynamicExperimentThread.
[simantics/platform.git] / bundles / org.simantics.simulator.toolkit / src / org / simantics / simulator / toolkit / DynamicExperimentThread.java
1 package org.simantics.simulator.toolkit;
2
3 import java.math.BigDecimal;
4 import java.util.ArrayList;
5 import java.util.concurrent.Callable;
6 import java.util.concurrent.CopyOnWriteArrayList;
7 import java.util.concurrent.Semaphore;
8
9 import org.simantics.simulator.ExperimentState;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 /**
14  * @author Antti Villberg
15  * @since 1.34.0
16  */
17 abstract public class DynamicExperimentThread extends Thread {
18
19         private static final Logger LOGGER = LoggerFactory.getLogger(DynamicExperimentThread.class);
20
21         private CopyOnWriteArrayList<DynamicExperimentThreadListener> listeners = new CopyOnWriteArrayList<>();
22
23         private ExperimentState state = StandardExperimentStates.CREATED;
24
25         private long runStart = 0;
26
27         private double desiredRealtimeRatio = 1000.0;
28         private double obtainedRealtimeRatio = 1.0;
29
30         private long runTimeNs = 0;
31         private long endTimeNs = 0;
32         protected long simulationStepNs = 0;
33         protected double stepInSeconds = 1.0;
34
35         public DynamicExperimentThread() {
36         }
37
38         private void updateTimes() {
39                 long time = System.nanoTime();
40                 long elapsed = time-runStart; 
41
42                 obtainedRealtimeRatio = longToDoubleDivision(runTimeNs, elapsed);
43         }
44
45         private long rt;
46         private long rt_l;
47
48         protected double longToDoubleDivision(long l1, long l2) {
49                 rt = l1 / l2;
50                 rt_l = l1 % l2;
51                 double d = ((double)rt_l)/((double)l2);
52                 d += (double)rt;
53                 return d;
54         }
55
56         protected ArrayList<Runnable> tasks = new ArrayList<>();
57
58         abstract public void step(double stepLengthNanoSeconds);
59
60         public boolean inState(Class<? extends ExperimentState> state) {
61                 return state.isInstance(this.state);
62         }
63
64         public void initialize() throws Exception {
65         }
66
67         public void deinitialize() throws Exception {
68         }
69
70         long stepTime = 0;
71         long taskTime = 0;
72
73         @Override
74         public void run() {
75
76                 try {
77
78                         try {
79
80                                 initialize();
81
82                                 try {
83                                         runReally();
84                                 } catch (Exception e) {
85                                         LOGGER.error("Unhandled exception while running simulation thread", e);
86                                 } 
87
88                         } catch (Exception e) {
89                                 LOGGER.error("Unhandled exception while initializing simulation thread", e);
90                         } 
91
92                 } finally {
93
94                         try {
95                                 deinitialize();
96                         } catch (Exception e) {
97                                 LOGGER.error("Error while deinitializing simulation thread", e);
98                         }
99
100                 }
101
102         }
103
104         protected boolean inActiveState() {
105                 return !(
106                                 inState(StandardExperimentStates.Disposed.class)
107                                 || inState(StandardExperimentStates.Disposing.class)
108                                 //|| inState(StandardExperimentStates.Failure.class)
109                                 || inState(StandardExperimentStates.ToBeDisposed.class)
110                                 );
111         }
112
113         private void runReally() {
114
115                 while(inActiveState()) {
116
117                         if(inState(StandardExperimentStates.Running.class)) {
118
119                                 long asd = System.nanoTime();
120                                 step(simulationStepNs);
121                                 stepTime += System.nanoTime() - asd;
122                                 runTimeNs += simulationStepNs;
123                                 updateTimes();
124                                 long asd2 = System.nanoTime();
125                                 runTasks();
126                                 taskTime += System.nanoTime() - asd2;
127
128                                 //System.err.println(" st = " + 1e-9*stepTime + " tt = " + 1e-9*taskTime);
129
130                                 while(obtainedRealtimeRatio > desiredRealtimeRatio) {
131                                         int ran = runTasks();
132                                         if(ran == 0) {
133                                                 long elapsed = System.nanoTime()-runStart; 
134                                                 long deltaNs = BigDecimal.valueOf(runTimeNs).divide(BigDecimal.valueOf(desiredRealtimeRatio)).longValue() - elapsed;
135
136                                                 long deltaMs = deltaNs / 1000000;
137                                                 int deltaNsRem = (int)(deltaNs % 1000000);
138
139                                                 if(deltaNs > 0) {
140                                                         synchronized(tasks) {
141                                                                 try {
142                                                                         tasks.wait(deltaMs, deltaNsRem);
143                                                                 } catch (InterruptedException e) {
144                                                                         e.printStackTrace();
145                                                                 }
146                                                         }
147                                                 }
148                                         }
149                                         updateTimes();
150                                 }
151
152                         } else {
153
154                                 synchronized(tasks) {
155                                         while(!inState(StandardExperimentStates.Running.class) && inActiveState()) {
156                                                 int ran = runTasks();
157                                                 if(ran == 0) {
158                                                         try {
159                                                                 tasks.wait(Integer.MAX_VALUE);
160                                                         } catch (InterruptedException e) {
161                                                                 e.printStackTrace();
162                                                         }
163                                                 }
164                                         }
165
166                                 }
167
168                         }
169
170                         if(runTimeNs >= endTimeNs && inActiveState())
171                                 changeState(StandardExperimentStates.STOPPED);
172
173                 }
174
175         }
176
177         Thread executorThread = this;
178
179         Semaphore beginSyncExec = new Semaphore(0);
180         Semaphore endSyncExec = new Semaphore(0);
181
182         Runnable scheduleSyncExec = () -> {
183                 beginSyncExec.release();
184                 try {
185                         endSyncExec.acquire();
186                 } catch (InterruptedException e) {
187                 }
188         };
189
190         public int runTasks() {
191                 ArrayList<Runnable> todo = new ArrayList<>();
192                 synchronized(tasks) {
193                         todo.addAll(tasks);
194                         tasks.clear();
195                 }
196                 todo.forEach(Runnable::run);
197                 return todo.size();
198         }
199
200         public void queue(Runnable runnable) {
201                 synchronized(tasks) {
202                         tasks.add(runnable);
203                         tasks.notify();
204                 }
205         }
206
207         public <T> T syncExec(Callable<T> callable) throws InterruptedException {
208
209                 if(executorThread == Thread.currentThread()) {
210                         try {
211                                 return callable.call();
212                         } catch (Throwable t) {
213                                 LOGGER.error("syncExec in current thread failed", t);
214                                 return null;
215                         } finally {
216                         }
217                 }
218
219                 queue(scheduleSyncExec);
220
221                 beginSyncExec.acquire();
222                 Thread oldThread = executorThread;
223                 executorThread = Thread.currentThread();
224                 try {
225                         return callable.call();
226                 } catch (Throwable t) {
227                         LOGGER.error("syncExec failed", t);
228                         return null;
229                 } finally {
230                         executorThread = oldThread;
231                         endSyncExec.release();
232                 }
233
234         }
235
236         public void asyncExec(Runnable runnable) {
237
238                 if(executorThread == Thread.currentThread()) {
239                         try {
240                                 runnable.run();
241                         } catch (Throwable t) {
242                                 LOGGER.error("asyncExec failed", t);
243                         } finally {
244                         }
245                         return;
246                 }
247
248                 queue(runnable);
249
250         }
251
252         public void setSimulationStepNs(long ns) {
253                 simulationStepNs = ns;
254                 stepInSeconds = BigDecimal.valueOf(simulationStepNs).multiply(BigDecimal.valueOf(1e-9)).doubleValue();
255         }
256
257         public void runDuration(long ns) {
258                 runStart = System.nanoTime();
259                 runTimeNs = 0;
260                 endTimeNs = ns;
261                 synchronized(tasks) {
262                         changeState(StandardExperimentStates.RUNNING);
263                         tasks.notify();
264                 }
265         }
266
267         public ExperimentState getExperimentState() {
268                 return state;
269         }
270
271         public void changeState(ExperimentState state) {
272                 this.state = state;
273                 fireStateChanged(state);
274         }
275
276         public void addListener(DynamicExperimentThreadListener listener) {
277                 if(!listeners.contains(listener))
278                         listeners.add(listener);
279         }
280
281         public void removeListener(DynamicExperimentThreadListener listener) {
282                 listeners.remove(listener);
283         }
284
285         protected void fireAfterStep() {
286                 listeners.forEach(DynamicExperimentThreadListener::afterStep);
287         }
288
289         protected void fireBeforeStep() {
290                 listeners.forEach(DynamicExperimentThreadListener::beforeStep);
291         }
292
293         protected void fireStateChanged(ExperimentState newState) {
294                 listeners.forEach(l -> l.stateChanged(newState));
295         }
296
297 }