package org.simantics.simulator.toolkit; import java.math.BigDecimal; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import org.simantics.simulator.ExperimentState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Antti Villberg * @since 1.34.0 */ abstract public class DynamicExperimentThread extends Thread { private static final Logger LOGGER = LoggerFactory.getLogger(DynamicExperimentThread.class); private CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); private ExperimentState state = StandardExperimentStates.CREATED; private long runStart = 0; private double desiredRealtimeRatio = 1000.0; private double obtainedRealtimeRatio = 1.0; private long runTimeNs = 0; private long endTimeNs = 0; protected long simulationStepNs = 0; protected double stepInSeconds = 1.0; public DynamicExperimentThread() { } private void updateTimes() { long time = System.nanoTime(); long elapsed = time-runStart; obtainedRealtimeRatio = longToDoubleDivision(runTimeNs, elapsed); } private long rt; private long rt_l; protected double longToDoubleDivision(long l1, long l2) { rt = l1 / l2; rt_l = l1 % l2; double d = ((double)rt_l)/((double)l2); d += (double)rt; return d; } protected ArrayList tasks = new ArrayList<>(); abstract public void step(double stepLengthNanoSeconds); public boolean inState(Class state) { return state.isInstance(this.state); } public void initialize() throws Exception { } public void deinitialize() throws Exception { } long stepTime = 0; long taskTime = 0; @Override public void run() { try { try { initialize(); try { runReally(); } catch (Exception e) { LOGGER.error("Unhandled exception while running simulation thread", e); } } catch (Exception e) { LOGGER.error("Unhandled exception while initializing simulation thread", e); } } finally { try { deinitialize(); } catch (Exception e) { LOGGER.error("Error while deinitializing simulation thread", e); } } } protected boolean inActiveState() { return !( inState(StandardExperimentStates.Disposed.class) || inState(StandardExperimentStates.Disposing.class) //|| inState(StandardExperimentStates.Failure.class) || inState(StandardExperimentStates.ToBeDisposed.class) ); } private void runReally() { while(inActiveState()) { if(inState(StandardExperimentStates.Running.class)) { long asd = System.nanoTime(); step(simulationStepNs); stepTime += System.nanoTime() - asd; runTimeNs += simulationStepNs; updateTimes(); long asd2 = System.nanoTime(); runTasks(); taskTime += System.nanoTime() - asd2; //System.err.println(" st = " + 1e-9*stepTime + " tt = " + 1e-9*taskTime); while(obtainedRealtimeRatio > desiredRealtimeRatio) { int ran = runTasks(); if(ran == 0) { long elapsed = System.nanoTime()-runStart; long deltaNs = BigDecimal.valueOf(runTimeNs).divide(BigDecimal.valueOf(desiredRealtimeRatio)).longValue() - elapsed; long deltaMs = deltaNs / 1000000; int deltaNsRem = (int)(deltaNs % 1000000); if(deltaNs > 0) { synchronized(tasks) { try { tasks.wait(deltaMs, deltaNsRem); } catch (InterruptedException e) { e.printStackTrace(); } } } } updateTimes(); } } else { synchronized(tasks) { while(!inState(StandardExperimentStates.Running.class) && inActiveState()) { int ran = runTasks(); if(ran == 0) { try { tasks.wait(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } } } } if(runTimeNs >= endTimeNs && inActiveState()) changeState(StandardExperimentStates.STOPPED); } } Thread executorThread = this; Semaphore beginSyncExec = new Semaphore(0); Semaphore endSyncExec = new Semaphore(0); Runnable scheduleSyncExec = () -> { beginSyncExec.release(); try { endSyncExec.acquire(); } catch (InterruptedException e) { } }; public int runTasks() { ArrayList todo = new ArrayList<>(); synchronized(tasks) { todo.addAll(tasks); tasks.clear(); } todo.forEach(Runnable::run); return todo.size(); } public void queue(Runnable runnable) { synchronized(tasks) { tasks.add(runnable); tasks.notify(); } } public T syncExec(Callable callable) throws InterruptedException { if(executorThread == Thread.currentThread()) { try { return callable.call(); } catch (Throwable t) { LOGGER.error("syncExec in current thread failed", t); return null; } finally { } } queue(scheduleSyncExec); beginSyncExec.acquire(); Thread oldThread = executorThread; executorThread = Thread.currentThread(); try { return callable.call(); } catch (Throwable t) { LOGGER.error("syncExec failed", t); return null; } finally { executorThread = oldThread; endSyncExec.release(); } } public void asyncExec(Runnable runnable) { if(executorThread == Thread.currentThread()) { try { runnable.run(); } catch (Throwable t) { LOGGER.error("asyncExec failed", t); } finally { } return; } queue(runnable); } public void setSimulationStepNs(long ns) { simulationStepNs = ns; stepInSeconds = BigDecimal.valueOf(simulationStepNs).multiply(BigDecimal.valueOf(1e-9)).doubleValue(); } public void runDuration(long ns) { runStart = System.nanoTime(); runTimeNs = 0; endTimeNs = ns; synchronized(tasks) { changeState(StandardExperimentStates.RUNNING); tasks.notify(); } } public ExperimentState getExperimentState() { return state; } public void changeState(ExperimentState state) { this.state = state; fireStateChanged(state); } public void addListener(DynamicExperimentThreadListener listener) { if(!listeners.contains(listener)) listeners.add(listener); } public void removeListener(DynamicExperimentThreadListener listener) { listeners.remove(listener); } protected void fireAfterStep() { listeners.forEach(DynamicExperimentThreadListener::afterStep); } protected void fireBeforeStep() { listeners.forEach(DynamicExperimentThreadListener::beforeStep); } protected void fireStateChanged(ExperimentState newState) { listeners.forEach(l -> l.stateChanged(newState)); } }