package org.simantics.db.layer0;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.simantics.simulator.variable.Realm;
abstract public class StandardRealm<Node, Engine extends StandardEngine<Node>> implements Realm {
-
- String id;
- Thread executorThread;
- ExecutorService executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- executorThread = new Thread(r);
- return executorThread;
- }
- });
-
- Semaphore beginSyncExec = new Semaphore(0);
- Semaphore endSyncExec = new Semaphore(0);
+
+ private String id;
+ private Thread executorThread;
+ private StandardRealmThreadFactory factory = new StandardRealmThreadFactory(this);
+ private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), factory);
+ private Semaphore beginSyncExec = new Semaphore(0);
+ private Semaphore endSyncExec = new Semaphore(0);
- Engine engine;
- StandardSessionManager<Node, Engine> sessionManager;
- StandardNodeManager<Node, Engine> nodeManager;
+ private Engine engine;
+ private StandardNodeManager<Node, Engine> nodeManager;
- Runnable scheduleSyncExec = new Runnable() {
+ private Runnable scheduleSyncExec = new Runnable() {
@Override
public void run() {
beginSyncExec.release();
}
};
- protected StandardRealm(StandardSessionManager<Node, Engine> sessionManager, Engine engine, String id) {
- this.sessionManager = sessionManager;
+ protected StandardRealm(Engine engine, String id) {
this.engine = engine;
this.id = id;
this.nodeManager = createManager();
}
-
+
abstract protected StandardNodeManager<Node, Engine> createManager();
protected String getSCLContextKey() {
}
public void close() {
- sessionManager.removeRealm(id);
executor.shutdown();
try {
- executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
+ if (!executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
+ List<Runnable> runnablesLeft = executor.shutdownNow();
+ if (!runnablesLeft.isEmpty()) {
+ getLogger().info("Runnables left for realm " + this + " after executor shutdown! " + runnablesLeft);
+ }
+ }
} catch (InterruptedException e) {
+ getLogger().info("Could not shutdown executor " + executor + " for realm " + this, e);
}
+
+ factory.clear();
+ factory = null;
+ // Should never be true
+ if (!executorThread.isAlive())
+ executorThread.interrupt();
+ executorThread = null;
+ executor = null;
+
+ // Clear nodeManager
+ nodeManager.clear();
+ nodeManager = null;
}
public StandardNodeManager<Node, Engine> getNodeManager() {
return nodeManager;
}
-
+
+ public abstract org.slf4j.Logger getLogger();
+
+ private void setExecutorThread(Thread t) {
+ executorThread = t;
+ }
+
+ private static class StandardRealmThreadFactory implements ThreadFactory {
+
+ private StandardRealm<?, ?> realm;
+
+ public StandardRealmThreadFactory(StandardRealm<?, ?> realm) {
+ this.realm = realm;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ realm.setExecutorThread(t);
+ return t;
+ }
+
+ void clear() {
+ realm = null;
+ }
+ }
}