1 package org.simantics.simulator.toolkit;
3 import java.math.BigDecimal;
4 import java.util.ArrayList;
5 import java.util.concurrent.Callable;
6 import java.util.concurrent.CopyOnWriteArrayList;
7 import java.util.concurrent.Semaphore;
9 import org.simantics.simulator.ExperimentState;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
14 * @author Antti Villberg
17 abstract public class DynamicExperimentThread extends Thread {
19 private static final Logger LOGGER = LoggerFactory.getLogger(DynamicExperimentThread.class);
21 private CopyOnWriteArrayList<DynamicExperimentThreadListener> listeners = new CopyOnWriteArrayList<>();
23 private ExperimentState state = StandardExperimentStates.CREATED;
25 private long runStart = 0;
27 private double desiredRealtimeRatio = 1000.0;
28 private double obtainedRealtimeRatio = 1.0;
30 private long runTimeNs = 0;
31 private long endTimeNs = 0;
32 protected long simulationStepNs = 0;
33 protected double stepInSeconds = 1.0;
35 public DynamicExperimentThread() {
38 private void updateTimes() {
39 long time = System.nanoTime();
40 long elapsed = time-runStart;
42 obtainedRealtimeRatio = longToDoubleDivision(runTimeNs, elapsed);
48 protected double longToDoubleDivision(long l1, long l2) {
51 double d = ((double)rt_l)/((double)l2);
56 protected ArrayList<Runnable> tasks = new ArrayList<>();
58 abstract public void step(double stepLengthNanoSeconds);
60 public boolean inState(Class<? extends ExperimentState> state) {
61 return state.isInstance(this.state);
64 public void initialize() throws Exception {
67 public void deinitialize() throws Exception {
84 } catch (Exception e) {
85 LOGGER.error("Unhandled exception while running simulation thread", e);
88 } catch (Exception e) {
89 LOGGER.error("Unhandled exception while initializing simulation thread", e);
96 } catch (Exception e) {
97 LOGGER.error("Error while deinitializing simulation thread", e);
104 protected boolean inActiveState() {
106 inState(StandardExperimentStates.Disposed.class)
107 || inState(StandardExperimentStates.Disposing.class)
108 //|| inState(StandardExperimentStates.Failure.class)
109 || inState(StandardExperimentStates.ToBeDisposed.class)
113 private void runReally() {
115 while(inActiveState()) {
117 if(inState(StandardExperimentStates.Running.class)) {
119 long asd = System.nanoTime();
120 step(simulationStepNs);
121 stepTime += System.nanoTime() - asd;
122 runTimeNs += simulationStepNs;
124 long asd2 = System.nanoTime();
126 taskTime += System.nanoTime() - asd2;
128 //System.err.println(" st = " + 1e-9*stepTime + " tt = " + 1e-9*taskTime);
130 while(obtainedRealtimeRatio > desiredRealtimeRatio) {
131 int ran = runTasks();
133 long elapsed = System.nanoTime()-runStart;
134 long deltaNs = BigDecimal.valueOf(runTimeNs).divide(BigDecimal.valueOf(desiredRealtimeRatio)).longValue() - elapsed;
136 long deltaMs = deltaNs / 1000000;
137 int deltaNsRem = (int)(deltaNs % 1000000);
140 synchronized(tasks) {
141 if (inState(StandardExperimentStates.Running.class)) {
143 tasks.wait(deltaMs, deltaNsRem);
144 } catch (InterruptedException e) {
145 LOGGER.warn("Dynamic experiment thread '" + this.getName() + "' (" + this.getClass().getName() + ") interrupted", e);
156 synchronized(tasks) {
157 while(!inState(StandardExperimentStates.Running.class) && inActiveState()) {
158 int ran = runTasks();
161 tasks.wait(Integer.MAX_VALUE);
162 } catch (InterruptedException e) {
163 LOGGER.warn("Dynamic experiment thread '" + this.getName() + "' (" + this.getClass().getName() + ") interrupted", e);
172 if(runTimeNs >= endTimeNs && inActiveState())
173 changeState(StandardExperimentStates.STOPPED);
179 Thread executorThread = this;
181 Semaphore beginSyncExec = new Semaphore(0);
182 Semaphore endSyncExec = new Semaphore(0);
184 Runnable scheduleSyncExec = () -> {
185 beginSyncExec.release();
187 endSyncExec.acquire();
188 } catch (InterruptedException e) {
192 public int runTasks() {
193 ArrayList<Runnable> todo = new ArrayList<>();
194 synchronized(tasks) {
198 todo.forEach(Runnable::run);
202 public void queue(Runnable runnable) {
203 synchronized(tasks) {
209 public <T> T syncExec(Callable<T> callable) throws InterruptedException {
211 if(executorThread == Thread.currentThread()) {
213 return callable.call();
214 } catch (Throwable t) {
215 LOGGER.error("syncExec in current thread failed", t);
221 queue(scheduleSyncExec);
223 beginSyncExec.acquire();
224 Thread oldThread = executorThread;
225 executorThread = Thread.currentThread();
227 return callable.call();
228 } catch (Throwable t) {
229 LOGGER.error("syncExec failed", t);
232 executorThread = oldThread;
233 endSyncExec.release();
238 public void asyncExec(Runnable runnable) {
240 if(executorThread == Thread.currentThread()) {
243 } catch (Throwable t) {
244 LOGGER.error("asyncExec failed", t);
254 public void setSimulationStepNs(long ns) {
255 simulationStepNs = ns;
256 stepInSeconds = BigDecimal.valueOf(simulationStepNs).multiply(BigDecimal.valueOf(1e-9)).doubleValue();
259 public void runDuration(long ns) {
260 runStart = System.nanoTime();
263 synchronized(tasks) {
264 changeState(StandardExperimentStates.RUNNING);
269 public ExperimentState getExperimentState() {
273 public void changeState(ExperimentState state) {
275 fireStateChanged(state);
278 public void addListener(DynamicExperimentThreadListener listener) {
279 if(!listeners.contains(listener))
280 listeners.add(listener);
283 public void removeListener(DynamicExperimentThreadListener listener) {
284 listeners.remove(listener);
287 protected void fireAfterStep() {
288 listeners.forEach(DynamicExperimentThreadListener::afterStep);
291 protected void fireBeforeStep() {
292 listeners.forEach(DynamicExperimentThreadListener::beforeStep);
295 protected void fireStateChanged(ExperimentState newState) {
296 listeners.forEach(l -> l.stateChanged(newState));