package org.simantics.jdbc.variable; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.nio.file.Paths; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; import java.util.function.Function; import org.simantics.Simantics; import org.simantics.databoard.Bindings; import org.simantics.databoard.binding.error.BindingException; import org.simantics.db.ReadGraph; import org.simantics.db.common.request.ReadRequest; import org.simantics.db.exception.DatabaseException; import org.simantics.db.layer0.variable.NodeSupport; import org.simantics.jdbc.SimanticsJDBC; import org.simantics.simulator.toolkit.StandardRealm; import org.simantics.simulator.toolkit.db.StandardVariableSessionManager; import org.simantics.simulator.variable.exceptions.NodeManagerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.impossibl.postgres.api.jdbc.PGConnection; import com.impossibl.postgres.api.jdbc.PGNotificationListener; import com.impossibl.postgres.jdbc.PGDataSource; public class JDBCSessionManager extends StandardVariableSessionManager { private static final Logger LOGGER = LoggerFactory.getLogger(JDBCSessionManager.class); private static JDBCSessionManager INSTANCE = new JDBCSessionManager(); private static final String VALUE_SIMANTICS_JDBC_HOST = "localhost"; private static final int VALUE_SIMANTICS_JDBC_PORT = 5432; private static final String VALUE_SIMANTICS_JDBC_USER = "simantics"; private static final String VALUE_SIMANTICS_JDBC_PASSWORD = "simantics"; private static final String VALUE_SIMANTICS_JDBC_DATABASE = "simantics"; private String channelName; private PGNotificationListener listener; private PGDataSource dataSource; private Connection connection; private static Properties readProperties(InputStream s) throws IOException { try (InputStream is = s) { Properties props = new Properties(); props.load(is); return props; } } private static Properties safeReadProperties(URL url) { try { return readProperties(url.openStream()); } catch (IOException e) { LOGGER.error("Could not read props from " + url, e); return null; } } private static Properties safeReadProperties(URI uri) { try { return safeReadProperties(uri.toURL()); } catch (MalformedURLException e) { LOGGER.error("Could not read props from " + uri, e); return null; } } private static Properties safeReadProperties(String path) { return safeReadProperties(Paths.get(path).toUri()); } private static Properties readProperties() { String propFile = System.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_PROPERTYFILE, null); if (propFile != null) { Properties p = safeReadProperties(propFile); if (p != null) return p; } // Read default settings from built-in file and override them with values in System properties Properties p = safeReadProperties(JDBCSessionManager.class.getClassLoader().getResource("jdbc.properties")); if (p != null) { p.putAll(System.getProperties()); } else { p = System.getProperties(); } return p; } public JDBCSessionManager() { this.channelName = "test"; Properties props = readProperties(); String host = props.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_HOST, VALUE_SIMANTICS_JDBC_HOST); String port = props.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_PORT, "" + VALUE_SIMANTICS_JDBC_PORT); String database = props.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_DATABASE, VALUE_SIMANTICS_JDBC_DATABASE); String user = props.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_USER, VALUE_SIMANTICS_JDBC_USER); String password = props.getProperty(SimanticsJDBC.PROP_SIMANTICS_JDBC_PASSWORD, VALUE_SIMANTICS_JDBC_PASSWORD); try { int portNumber = Integer.valueOf(port); dataSource = new PGDataSource(); dataSource.setHost(host); dataSource.setPort(portNumber); dataSource.setDatabase(database); dataSource.setUser(user); dataSource.setPassword(password); this.connection = dataSource.getConnection(); this.listener = new PGNotificationListener() { @Override public void notification(int processId, String channelName, String payload) { if (LOGGER.isDebugEnabled()) LOGGER.debug("Received notification from processId={} channelName={} and payload={}", processId, channelName, payload); Simantics.getSession().asyncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { for (String realmId : INSTANCE.getRealms()) { try { JDBCRealm jdbcRealm = (JDBCRealm) INSTANCE.getOrCreateRealm(graph, realmId); jdbcRealm.getNodeManager().refreshVariable(new JDBCNode(payload)); } catch (DatabaseException e) { LOGGER.error("Could not refresh variable in realm {} with payload {}", realmId, payload, e); } } } }); } }; createTable(); init(); } catch (SQLException e) { LOGGER.error("Could not initialize JDBCSessionManager!", e); } } private void createTable() throws SQLException { Statement statement = connection.createStatement(); statement.execute("CREATE TABLE IF NOT EXISTS simantics_table (key VARCHAR UNIQUE, value JSON)"); statement.close(); } protected void init() throws SQLException { Statement statement = connection.createStatement(); statement.execute("LISTEN " + this.channelName); statement.close(); ((PGConnection) connection).addNotificationListener(this.listener); } protected void destroy() throws SQLException { try (PGConnection connection = (PGConnection) dataSource.getConnection()) { Statement statement = connection.createStatement(); statement.execute("UNLISTEN " + this.channelName); statement.close(); } } @Override protected JDBCNodeManagerSupport createEngine(ReadGraph graph, String id) throws DatabaseException { return new JDBCNodeManagerSupport(id, this.dataSource, this.channelName); } @Override protected StandardRealm createRealm(JDBCNodeManagerSupport engine, String id) { return new JDBCRealm(engine, id); } public static void setValue(ReadGraph graph, String id, String key, Object value) throws DatabaseException, NodeManagerException, BindingException, InterruptedException { JDBCRealm realm = (JDBCRealm) INSTANCE.getOrCreateRealm(graph, id); realm.asyncExec(() -> { try { realm.getNodeManager().setValue(new JDBCNode(key), key, value, Bindings.OBJECT); } catch (NodeManagerException | BindingException e) { LOGGER.error("Could not set value {} for {}", value, key, e); } }); } public static NodeSupport nodeSupport(ReadGraph graph, String sessionName) throws DatabaseException { return INSTANCE.getOrCreateNodeSupport(graph, sessionName); } public static Object getValue(ReadGraph graph, String uri, String key) throws InterruptedException, DatabaseException { JDBCRealm realm = (JDBCRealm) INSTANCE.getOrCreateRealm(graph, uri); return realm.syncExec(new Function() { @Override public Object apply(Object t) { try { return realm.getNodeManager().getValue(new JDBCNode(key), key).getValue(); } catch (NodeManagerException e) { LOGGER.error("Could not get value for {}", key, e); return null; } } }); } }