+package org.simantics.jdbc.variable;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.function.Function;
+
+import org.simantics.Simantics;
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.binding.error.BindingException;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.common.request.ReadRequest;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.layer0.variable.NodeSupport;
+import org.simantics.jdbc.SimanticsJDBC;
+import org.simantics.simulator.toolkit.StandardRealm;
+import org.simantics.simulator.toolkit.db.StandardSessionManager;
+import org.simantics.simulator.variable.exceptions.NodeManagerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.impossibl.postgres.api.jdbc.PGConnection;
+import com.impossibl.postgres.api.jdbc.PGNotificationListener;
+import com.impossibl.postgres.jdbc.PGDataSource;
+
+public class JDBCSessionManager extends StandardSessionManager<JDBCNode, JDBCNodeManagerSupport> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JDBCSessionManager.class);
+ private static JDBCSessionManager INSTANCE = new JDBCSessionManager();
+
+ private static final String VALUE_SIMANTICS_JDBC_HOST = "localhost";
+ private static final int VALUE_SIMANTICS_JDBC_PORT = 5432;
+ private static final String VALUE_SIMANTICS_JDBC_USER = "simantics";
+ private static final String VALUE_SIMANTICS_JDBC_PASSWORD = "simantics";
+ private static final String VALUE_SIMANTICS_JDBC_DATABASE = "simantics";
+
+ private String channelName;
+ private PGNotificationListener listener;
+
+ private PGDataSource dataSource;
+ private Connection connection;
+
+ private static Properties readProperties(InputStream s) throws IOException {
+ try (InputStream is = s) {
+ Properties props = new Properties();
+ props.load(is);
+ return props;
+ }
+ }
+
+ private static Properties safeReadProperties(URL url) {
+ try {
+ return readProperties(url.openStream());
+ } catch (IOException e) {
+ LOGGER.error("Could not read props from " + url, e);
+ return null;
+ }
+ }
+
+ private static Properties safeReadProperties(URI uri) {
+ try {
+ return safeReadProperties(uri.toURL());
+ } catch (MalformedURLException e) {
+ LOGGER.error("Could not read props from " + uri, e);
+ return null;
+ }
+ }
+
+ private static Properties safeReadProperties(String path) {
+ return safeReadProperties(Paths.get(path).toUri());
+ }
+
+ private static Properties readProperties() {
+ String propFile = System.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_PROPERTYFILE, null);
+ if (propFile != null) {
+ Properties p = safeReadProperties(propFile);
+ if (p != null)
+ return p;
+ }
+ // Read default settings from built-in file and override them with values in System properties
+ Properties p = safeReadProperties(JDBCSessionManager.class.getClassLoader().getResource("jdbc.properties"));
+ if (p != null) {
+ p.putAll(System.getProperties());
+ } else {
+ p = System.getProperties();
+ }
+ return p;
+ }
+
+ public JDBCSessionManager() {
+ this.channelName = "test";
+
+ Properties props = readProperties();
+ String host = props.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_HOST, VALUE_SIMANTICS_JDBC_HOST);
+ String port = props.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_PORT, "" + VALUE_SIMANTICS_JDBC_PORT);
+ String database = props.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_DATABASE, VALUE_SIMANTICS_JDBC_DATABASE);
+ String user = props.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_USER, VALUE_SIMANTICS_JDBC_USER);
+ String password = props.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_PASSWORD, VALUE_SIMANTICS_JDBC_PASSWORD);
+
+ try {
+ int portNumber = Integer.valueOf(port);
+
+ dataSource = new PGDataSource();
+ dataSource.setHost(host);
+ dataSource.setPort(portNumber);
+ dataSource.setDatabase(database);
+ dataSource.setUser(user);
+ dataSource.setPassword(password);
+
+ this.connection = dataSource.getConnection();
+ this.listener = new PGNotificationListener() {
+ @Override
+ public void notification(int processId, String channelName, String payload) {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.debug("Received notification from processId={} channelName={} and payload={}", processId, channelName, payload);
+ Simantics.getSession().asyncRequest(new ReadRequest() {
+ @Override
+ public void run(ReadGraph graph) throws DatabaseException {
+ for (String realmId : INSTANCE.getRealms()) {
+ try {
+ JDBCRealm jdbcRealm = (JDBCRealm) INSTANCE.getOrCreateRealm(graph, realmId);
+ jdbcRealm.getNodeManager().refreshVariable(new JDBCNode(payload));
+ } catch (DatabaseException e) {
+ LOGGER.error("Could not refresh variable in realm {} with payload {}", realmId, payload, e);
+ }
+ }
+ }
+ });
+ }
+ };
+ createTable();
+ init();
+ } catch (SQLException e) {
+ LOGGER.error("Could not initialize JDBCSessionManager!", e);
+ }
+ }
+
+ private void createTable() throws SQLException {
+ Statement statement = connection.createStatement();
+ statement.execute("CREATE TABLE IF NOT EXISTS simantics_table (key VARCHAR UNIQUE, value JSON)");
+ statement.close();
+ }
+
+ protected void init() throws SQLException {
+ Statement statement = connection.createStatement();
+ statement.execute("LISTEN " + this.channelName);
+ statement.close();
+ ((PGConnection) connection).addNotificationListener(this.listener);
+ }
+
+ protected void destroy() throws SQLException {
+ try (PGConnection connection = (PGConnection) dataSource.getConnection()) {
+ Statement statement = connection.createStatement();
+ statement.execute("UNLISTEN " + this.channelName);
+ statement.close();
+ }
+ }
+
+
+ @Override
+ protected JDBCNodeManagerSupport createEngine(ReadGraph graph, String id) throws DatabaseException {
+ return new JDBCNodeManagerSupport(id, this.dataSource, this.channelName);
+ }
+
+ @Override
+ protected StandardRealm<JDBCNode, JDBCNodeManagerSupport> createRealm(JDBCNodeManagerSupport engine, String id) {
+ return new JDBCRealm(engine, id);
+ }
+
+ public static void setValue(ReadGraph graph, String id, String key, Object value) throws DatabaseException, NodeManagerException, BindingException, InterruptedException {
+ JDBCRealm realm = (JDBCRealm) INSTANCE.getOrCreateRealm(graph, id);
+ realm.asyncExec(() -> {
+ try {
+ realm.getNodeManager().setValue(new JDBCNode(key), key, value, Bindings.OBJECT);
+ } catch (NodeManagerException | BindingException e) {
+ LOGGER.error("Could not set value {} for {}", value, key, e);
+ }
+ });
+ }
+
+ public static NodeSupport<?> nodeSupport(ReadGraph graph, String sessionName) throws DatabaseException {
+ return INSTANCE.getOrCreateNodeSupport(graph, sessionName);
+ }
+
+ public static Object getValue(ReadGraph graph, String uri, String key) throws InterruptedException, DatabaseException {
+ JDBCRealm realm = (JDBCRealm) INSTANCE.getOrCreateRealm(graph, uri);
+ return realm.syncExec(new Function<Object, Object>() {
+
+ @Override
+ public Object apply(Object t) {
+ try {
+ return realm.getNodeManager().getValue(new JDBCNode(key), key).getValue();
+ } catch (NodeManagerException e) {
+ LOGGER.error("Could not get value for {}", key, e);
+ return null;
+ }
+ }
+ });
+ }
+
+}