/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.server.internal; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.IntBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import org.simantics.db.common.utils.Logger; import org.simantics.db.server.DatabaseLastExitException; import org.simantics.db.server.GuardFileVersionException; import org.simantics.db.server.ProCoreException; import org.simantics.db.server.ServerNotFoundException; import org.simantics.db.server.protocol.CloseClientSessionFunction; import org.simantics.db.server.protocol.ExecuteFunction; import org.simantics.db.server.protocol.MessageNumber; public class ProCoreServer { private static Map workingDirs = new HashMap(); private static Map databaseIds = new HashMap(); private static final String LOCALHOST = ProCoreClient.LOCALHOST; public static final int VERSION_MAJOR = 1; public static final int VERSION_MINOR = 4; // 2015-05-05 public static final String BRANCH_DIR = "procore.headClusters.procore"; public static final String TAIL_DIR = "procore.tailClusters.procore"; public static final String TAIL_FILE = "procore.tail.procore"; public static final String CONFIG_FILE = "procore.config.procore"; public static final String GUARD_FILE = "procore.guard.procore"; public static final String JOURNAL_FILE = "procore.journal.procore"; public static final String LOG_FILE = "procore.log.procore"; public static final String DCS_FILE = "procore.dumpChangeSets.procore"; public static final String RECOVERY_NEEDED_FILE = "recovery.needed"; public static final String RECOVERY_IGNORED_FILE = "recovery.ignored"; public static final String PROTOCOL_IGNORED_FILE = "protocol.ignored"; public static final String PAGE_FILE_PATTERN = "procore.page*.procore"; public static final String DATA_PREFIX = "ClusterData."; public static final String DATA_PATTERN = DATA_PREFIX + "*"; public static final String INDEX_PREFIX = "ClusterIndex."; public static final String INDEX_PATTERN = INDEX_PREFIX + "*"; public static final String VALUE_PREFIX = "ExternalValue."; public static final String VALUE_PATTERN = VALUE_PREFIX + "*"; public static final String VALUE_SUFFIX = ".procore"; public static final String DELETED_PREFIX = "ClusterDeleted."; public static final String DELETED_PATTERN = DELETED_PREFIX + "*"; private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS; public static final String QUIT_CMD = "quit"; // private static final int SHORT_SLEEP_COUNT = 1; // private static final int LONG_SLEEP_COUNT = 10; // static public void stopProCoreServer(File workingDir) throws ProCoreException { // GuardFile guardFile = new GuardFile(workingDir); // guardFile.stopProCoreServer(); // } private static boolean isRunning(String aPid) throws ProCoreException { boolean isRunning = false; try { switch (OSType.calculate()) { case WINDOWS: { Process p = Runtime.getRuntime().exec("tasklist.exe /fo csv /nh /fi \"pid eq " + aPid + "\""); BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); String line = input.readLine(); if (line.matches(".*ProCoreServer.exe.*")) isRunning = true; input.close(); break; } case UNIX: { Process p = Runtime.getRuntime().exec("ps -p " + aPid + " -o comm="); BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); String line = input.readLine(); if (line.matches(".*ProCoreServer.*")) isRunning = true; input.close(); break; } default: } } catch (Exception e) { throw new ProCoreException("Could not get list of running processes.", e); } return isRunning; } static class ProCoreProcess { private enum State { Created, Initialized, ThreadStarted, ProcessStarted, Prompted, GotPort, ThreadStopped }; class Manager implements Runnable { private State state = State.Created; private CountDownLatch serverPrompted; private LinkedBlockingQueue reply; private volatile String command; private volatile Integer exitValue; private volatile boolean carryOn; private volatile int port; private final File dbFolder; // Just for exception i.e. error reporting. Manager(File dbFolder) { this.dbFolder = dbFolder; init(); } private void init() { serverPrompted = new CountDownLatch(1); reply = new LinkedBlockingQueue(); exitValue = null; carryOn = true; port = 0; state = State.Initialized; } int getPort() { return port; } void tryToStart() throws ProCoreException, InterruptedException { if (thread.isAlive()) return; thread = new Thread(manager, manager.initAndGetName()); thread.start(); serverPrompted.await(); if (!state.equals(State.Prompted)) throw new DatabaseLastExitException(dbFolder, "Server did not prompt."); String reply = giveCommand("print port"); try { port = Integer.parseInt(reply); } catch (NumberFormatException e) { if (null != exitValue && 0 != exitValue) throw new DatabaseLastExitException(dbFolder, "Server did not prompt."); if (reply.equals("End of server thread.")) throw new ProCoreException("Server did not start (did not prompt). state=" + state); else throw new ProCoreException("Server did not start. Could not parse port. reply=" + reply); } state = State.GotPort; } String giveCommand(String command) throws ProCoreException, InterruptedException { if (!thread.isAlive()) throw new ProCoreException("Server thread not alive."); if (null != this.command) throw new ProCoreException("Old command pending."); this.command = command; return reply.take(); } private String initAndGetName() { init(); return "ProCoreProcess " + ++instanceCount + " " + builder.directory(); } boolean isActive() throws ProCoreException { if (null == exitValue && thread.isAlive()) return true; else return false; } boolean tryToStop() throws ProCoreException { if (!thread.isAlive()) return true; carryOn = false; thread.interrupt(); try { thread.join(10000); // milliseconds } catch (InterruptedException e) { } return !thread.isAlive(); } @Override public void run() { state = State.ThreadStarted; Util.log("ProCoreServer thread started."); try { Util.trace("ProCoreProcessManager start."); Process process = builder.start(); exitValue = exitValue(process); if (null != exitValue) Util.logError("Server process did not start."); else { state = State.ProcessStarted; runProcessStarted(process); // Process start } } catch (IOException e) { Util.logError("Failed during process watch."); } finally { reply.add("End of server thread."); Util.trace("ProCoreServerThread stop."); state = State.ThreadStopped; serverPrompted.countDown(); } } private void runProcessStarted(Process process) throws IOException { try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));) { do { try { if (reader.ready()) handleLine(reader.readLine(), reader, writer); else if (null != command) { writer.write(command); command = null; writer.newLine(); writer.flush(); String line = getReply(reader); reply.add(line); } else Thread.sleep(100); // milliseconds } catch (InterruptedException e) { Util.trace("ProCoreServerThread interrupted."); } exitValue = exitValue(process); if (null == exitValue && !carryOn) try { try { writer.write(QUIT_CMD); writer.newLine(); writer.flush(); } catch (Throwable t) { Util.logError("Wait for process death was interrupted.", t); } finally { } exitValue = process.waitFor(); } catch (InterruptedException e) { Util.logError("Wait for process death was interrupted.", e); carryOn = true; continue; } } while (null == exitValue); } // End of resource try. } private String getReply(BufferedReader reader) throws IOException { StringBuilder sb = new StringBuilder(); while (true) { String line = reader.readLine(); if (null == line || line.startsWith("Waiting for input?")) break; sb.append(line); } return sb.toString(); } private void handleLine(String line, BufferedReader reader, BufferedWriter writer) throws IOException { if (DEBUG) Util.showDebug(line); if (line.startsWith("Main end.")) { carryOn = false; // Last line from server. } else if (serverPrompted.getCount() > 0 && line.startsWith("Waiting for input?")) { state = State.Prompted; serverPrompted.countDown(); // Server has been started and is waiting for console input. } else Util.trace("From server. line=" + line); } private Integer exitValue(Process process) { try { return process.exitValue(); } catch (IllegalThreadStateException e) { return null; // Still alive. } } } private static int instanceCount; // Keeps count of thread instances. private final ProcessBuilder builder; private final Manager manager; private Thread thread; ProCoreProcess(File exeFile, File workingDir) { String[] args = new String[0]; String[] cmd = new String[1 + args.length]; cmd[0] = exeFile.getAbsolutePath().toString(); System.arraycopy(args, 0, cmd, 1, args.length); builder = new ProcessBuilder(cmd); builder.redirectErrorStream(true); builder.directory(workingDir); manager = new Manager(workingDir); thread = new Thread(manager, manager.initAndGetName()); } public void tryToStart() throws ProCoreException, InterruptedException { manager.tryToStart(); } // public int exitValue() throws IllegalStateException { // Integer i = proCoreServerThread.getExitValue(); // if (null == i) // throw new IllegalStateException("ProCoreServerThread is still running."); // return i; // } // public boolean ignoreProtocolVersion() { // File wd = builder.directory(); // if (null == wd) // return false; // Path ignore = wd.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE); // if (Files.exists(ignore)) // return true; // return false; // } public boolean isActive() throws ProCoreException { return manager.isActive(); } public boolean tryToStop() throws ProCoreException { return manager.tryToStop(); } int getPort() throws ProCoreException { return manager.getPort(); } } private static enum OSType { UNIX, WINDOWS, UNKNOWN; public static OSType calculate() { String osName = System.getProperty("os.name"); assert osName != null; osName = osName.toLowerCase(); if (osName.startsWith("windows")) return WINDOWS; if (osName.startsWith("mac os x") || osName.startsWith("linux") || osName.startsWith("sun")) return UNIX; return UNKNOWN; } } static public boolean ignoreProtocolVersion(File dbFolder) { if (null == dbFolder) return false; if (Files.exists(dbFolder.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE))) return true; return false; } static public ProCoreServer getProCoreServer(File serverDir, final File workingDir) throws ProCoreException, ServerNotFoundException { try { ProCoreServer proCoreServer; synchronized (workingDirs) { proCoreServer = workingDirs.get(workingDir.getCanonicalPath()); if (null != proCoreServer) return proCoreServer; if (!serverDir.isDirectory()) throw new ProCoreException("Expected server directory as argument"); if (!workingDir.isDirectory()) throw new ProCoreException("Expected working directory as argument. wd=" + workingDir.getAbsolutePath()); String exeSuffix = Activator.getExeSuffix(); File executable = new File(serverDir, "ProCoreServer" + exeSuffix); proCoreServer = new ProCoreServer(executable, workingDir); workingDirs.put(workingDir.getCanonicalPath(), proCoreServer); return proCoreServer; } } catch (IOException e) { throw new ProCoreException("IOException", e); } } public static void logVersionInformation() { StringBuilder msg = new StringBuilder(200); String nl = " \n"; // Default log error removes newlines so I add space to make the text easier to read. msg.append("ProCore version information:"); msg.append(nl); msg.append("ProCoreServer version=" + ProCoreServer.VERSION_MAJOR + "." + ProCoreServer.VERSION_MINOR + " protocol=" + MessageNumber.ProtocolVersionMajor + "." + MessageNumber.ProtocolVersionMinor); try { msg.append(" folder=" + org.simantics.db.server.internal.Activator.getServerFolder()); } catch (IOException e) { msg.append(" folder information not available"); } msg.append(nl); msg.append("svn id=$Id: ProCoreServer.java r31684 2015-09-10 13:45:00Z $"); String str = msg.toString(); Logger.defaultLog(str); } private static String[] getFileLines(final File aFile, int maxLines) { final String[] STRING_EMPTY_ARRAY = new String[0]; try { if (!aFile.canRead()) { return STRING_EMPTY_ARRAY; } try (FileInputStream fis = new FileInputStream(aFile); InputStreamReader isr = new InputStreamReader(fis, "US-ASCII"); BufferedReader br = new BufferedReader(isr);){ String line = null; int count = 0; ArrayList lines = new ArrayList(); while ((line = br.readLine()) != null) { if (++count > maxLines) break; // Caller not interested in more lines. lines.add(line); } return lines.toArray(STRING_EMPTY_ARRAY); } catch (FileNotFoundException e) { return STRING_EMPTY_ARRAY; } } catch (Throwable t) { if (DEBUG) t.printStackTrace(); Logger.defaultLogError(t); } return new String[0]; } private static int parseInt(String s) throws ProCoreException { try { return Integer.parseInt(s); } catch (NumberFormatException e) { String msg = "Could not convert string to number. string=" + s; Logger.defaultLogError(msg); throw new ProCoreException(msg, e); } } private String databaseId = null; private final GuardFile guardFile; private final ProCoreClient proCoreClient; ProCoreClient getProCoreClient() { return proCoreClient; } private final ProCoreProcess proCoreProcess; private ProCoreServer(File exeFile, File workingDir) throws ProCoreException { if (null == exeFile) throw new ProCoreException("Illegal argument: exeFile is null."); if (null == workingDir) throw new ProCoreException("Illegal argument: workingDir is null."); guardFile = new GuardFile(workingDir); proCoreClient = new ProCoreClient(this); proCoreProcess = new ProCoreProcess(exeFile, workingDir); Util.log("ProCoreServer exe: " + exeFile); Util.log("ProCoreServer folder: " + workingDir); logVersionInformation(); } // //@Override public synchronized String execute(String command) throws ProCoreException, InterruptedException { return proCoreClient.execute(command); } private void startInternal() throws ProCoreException { if (proCoreClient.isConnected()) return; Exception exception; try { proCoreProcess.tryToStart(); int port = proCoreProcess.getPort(); SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true); proCoreClient.connect(sa); return; } catch (InterruptedException e) { Util.logError("ProCoreProcess start was interrupted.", e); exception = e; } catch (DatabaseLastExitException e) { throw e; // This situation can be analyzed and handled by start handler. } catch (ProCoreException e) { Util.logError("Failed to start ProCoreServer process.", e); exception = e; } boolean isActive = guardFile.isActive(proCoreClient); if (!isActive) throw new ProCoreException("Failed to connect to ProCoreServer.", exception); } // private int getWaitCount() throws ProCoreException { // if (!proCoreProcess.isActive()) // return SHORT_SLEEP_COUNT; // Short wait because server process is not running. // else // return LONG_SLEEP_COUNT; // } // private void startInternalReinit(int exitStatus) // throws ProCoreException { // switch (exitStatus) { // default: // What has happened here? // throw new DatabaseStartException(workingDir, getExitMessage(exitStatus)); // case -1: // Exit status not available, not sure what has happened here. // case 29: // Server died because it could not read exit status from guard file. // case 30: // Server died because old exit status was not zero. // case 40: // Server died because it received terminating signal. // case 42: // Server died because it could not read end status from guard file. // case 43: // Server died because end value was not zero. // case 44: // Server died because server could not read version from guard file. // case 45: // Server died because server did not support guard file version. // case 47: // Server died because server could not listen to any port. // break; // case 32: // Server died becuase database version did not match. // throw new DatabaseVersionException(workingDir, getExitMessage(exitStatus)); // case 33: // Server died because database has been corrupted. // throw new DatabaseCorruptedException(workingDir, getExitMessage(exitStatus)); // case 39: // Server died because last exit was not clean. // throw new DatabaseLastExitException(workingDir, getExitMessage(exitStatus)); // } // boolean dead = guardFile.wait4ServerToDie(proCoreClient, getWaitCount()); // if (!dead) { // StringBuilder sb = new StringBuilder(256); // sb.append("ProCoreServer is running according to guard file. Giving up trying to start a new instance. Server exit message:\n"); // sb.append(getExitMessage(exitStatus)); // throw new DatabaseStartException(workingDir, sb.toString()); // } // boolean deleted = guardFile.delete(); // if (!deleted) // Logger.defaultLogError("ProCoreServer is dead but can not delete guard file. " + getExitMessage(exitStatus)); // } //@Override public synchronized boolean isActive() throws ProCoreException { if (!proCoreClient.isConnected()) return guardFile.isActive(proCoreClient); proCoreClient.execute(""); return true; } public synchronized boolean isAlive() throws ProCoreException { if (proCoreClient.isConnected()) return true; if (proCoreProcess.isActive()) return true; return guardFile.isAlive(); } Client newClient() throws ProCoreException { if (proCoreClient.isConnected()) return proCoreClient.newClient(); int port = getPort(); InetSocketAddress isa = new InetSocketAddress(LOCALHOST, port); return proCoreClient.newClient(this, isa); } public synchronized boolean tryToStop() throws ProCoreException { synchronized (proCoreClient) { ProCoreException ex = null; if (!proCoreClient.isConnected() && !proCoreProcess.isActive()) { try { proCoreClient.connect(getPort()); } catch (InterruptedException e) { } } if (proCoreClient.isConnected()) { String s = ""; try { s = proCoreClient.execute(QUIT_CMD); String t = s.replaceAll("\n", ""); int n = Integer.parseInt(t); if (n != 1) Util.log("Number of connections after quit command is " + n + "."); proCoreClient.closeClient(0); if (!isLocal() && n > 1) throw new ProCoreException("More than one connection for remote. n=" + n); } catch (NumberFormatException e) { Util.logError("Failed to parse number of connections. s=" + s, e); } catch (ProCoreException e) { ex = e; } finally { proCoreClient.disconnect(); } } try { proCoreProcess.tryToStop(); } finally { if (proCoreProcess.isActive() && null == ex) ex = new ProCoreException("Failed to stop ProCoreProcess."); } if (null != ex) throw ex; return !proCoreClient.isConnected() && !proCoreProcess.isActive(); } } public synchronized boolean isConnected() throws ProCoreException { return proCoreClient.isConnected(); } public synchronized boolean isLocal() throws ProCoreException { return proCoreClient.isConnected() && proCoreProcess.isActive(); } public synchronized void disconnect() throws ProCoreException { proCoreClient.disconnect(); } public synchronized void connect() throws ProCoreException, InterruptedException { int port = getPort(); if (0 == port) throw new ProCoreException("Port 0 not supported as connection address."); proCoreClient.connect(port); } private String getExitMessage(int exitValue) { switch (exitValue) { default: return "Unexpected exit value = " + exitValue + "."; case 1: return "Exception was thrown. This indicates problems with program logic."; case 2: return "CallException was thrown. This indicates problems with calls to/from parent server."; case 3: return "ProtocolException was thrown. This indicates problems with executable versions between local and parent servers."; case 4: return "NoSuchElementException was thrown. This indicates problems with program logic."; case 5: return "IllegalArgumentException was thrown. This indicates problems with program logic."; case 6: return "IllegalResourceException was thrown. This indicates problems with server data."; case 7: return "IllegalClusterException was thrown. This indicates problems with server data."; case 8: return "ParseException was thrown. This indicates problems with/during parsing of the configuration file."; case 9: return "PortSemaphoreException was thrown. This indicates that some other server is using the same port number."; case 10: return "DatabaseSemaphoreException was thrown. This indicates that some other server is using the same database id."; case 11: return "SystemException was thrown. This indicates problems with program logic."; case 12: return "CppException was thrown. This indicates problems with program logic."; case 13: return "UnknownException was thrown. This indicates problems with program logic."; case 14: return "ThrowException was thrown. This indicates problems with program logic."; case 15: return "AssertException was thrown. This indicates problems with program logic."; case 16: return "ParentCheckException was thrown. This indicates problems with the database version of parent."; case 17: return "ParentConnectionException was thrown. This indicates problems with parent child relationship."; case 18: return "ConnectionException was thrown. This indicates problems with TCP/IP connections."; case 19: return "ProxyException was thrown. This indicates problems with TCP/IP connections to proxy."; case 20: return "NoSuchFileException was thrown. This indicates problems with a missing file."; case 21: return "JournalException was thrown. This indicates problems with journal mechanisim."; case 22: return "OutOfSpaceException was thrown. This indicates problems with memory."; case 23: return "ExternalValueException was thrown. This indicates problems with large value handling."; case 24: return "IllegalTransactionException was thrown. This indicates problems with transaction logic."; case 25: return "WriteTransactionException was thrown. This indicates that server failed during write transaction."; case 26: return "ConsoleException was thrown. This indicates that server console was closed without quit command."; case 27: return "ExitException was thrown. This indicates that server did not exit cleanly."; case 28: return "ExitReadPIdException was thrown. This indicates that server did not exit cleanly."; case 29: return "ExitReadStatusException was thrown. This indicates that server could not read exit status from guard file."; case 30: return "ExitStatusValueException was thrown. This indicates that server did not exit cleanly."; case 31: return "UpdateException was thrown. This indicates that server failed during update."; case 32: return "DatabaseVersionException was thrown. This indicates that server can not use given database."; case 33: return "DatabaseCorruptionException was thrown. This indicates that server has detected database corruption."; case 34: return "ExitReadPortException was thrown. This indicates that server did not start or exit cleanly."; case 35: return "ExitGuardOpenException was thrown. This indicates that server could not open guard file."; case 36: return "ExitGuardOpenWriteException was thrown. This indicates that server could not open guard file for writing."; case 37: return "ExitGuardWritePidException was thrown. This indicates that server could not write pid to guard file."; case 38: return "ExitGuardWritePortException was thrown. This indicates that server could not write port to guard file."; case 39: return "LastCommitNotCleanException was thrown. This indicates that client died during transaction. Recovery action needed."; case 40: return "TerminatedException was thrown. This indicates that server died because it received terminating signal."; case 41: return "ExitGuardWriteStatusException was thrown. This indicates that server could not write exit status to guard file."; case 42: return "EndReadStatusException was thrown. This indicates that server died because it could not read end status from guard file."; case 43: return "EndStatusValueException was thrown. This indicates that server died because end value was not zero."; case 44: return "ExitGuardReadVersionException was thrown. This indicates that server died because server could not read version from guard file."; case 45: return "ExitGuardValueVersionException was thrown. This indicates that server died because server did not support guard file version."; case 46: return "ExitGuardWriteVersionException was thrown. This indicates that server died because server could not write version to guard file."; case 47: return "ListenPortException was thrown. This indicates that server died because server could not listen to given port or any other port."; case 48: return "JournalVersionException was thrown. This indicates that database was written by older version."; case 49: return "TailFileException was thrown. This indicates problems with handling of data involved with purge operation."; } } synchronized void start() throws ProCoreException { try { startInternal(); } catch (DatabaseLastExitException e) { throw e; // This can be analyzed and handled by start handler. } catch (Throwable t) { ProCoreException pe = null; if (t instanceof ProCoreException) pe = (ProCoreException)t; else { String msg = "ProCoreServer startup failed." + " Automatic rcovery handling not implemented." + " Database must be fixed manually."; pe = new ProCoreException(msg, t); } if (null == pe.getDbFolder()) pe.setDbFolder(proCoreProcess.builder.directory()); } } synchronized void stop() throws ProCoreException, InterruptedException{ if (null != databaseId) databaseIds.remove(databaseId); boolean dead = tryToStop(); if (!dead) throw new ProCoreException("Failed to stop ProCoreServer."); } public synchronized String getExitMessage() throws ProCoreException { Integer returnValue = guardFile.getExitValue(); if (null == returnValue) throw new ProCoreException("Exit message not available."); return getExitMessage(returnValue); } //@Override public synchronized ServerAddress getAddress() throws ProCoreException { return new ServerAddress(LOCALHOST, getPort()); } //@Override public synchronized int getPort() throws ProCoreException { int port = 0; if (proCoreClient.isConnected()) { port = proCoreClient.getPort(); if (0 != port) return port; } if (proCoreProcess.isActive()) { port = proCoreProcess.getPort(); if (0 != port) return port; } return guardFile.getPort(); } public synchronized TailFile.Data getTailData() throws ProCoreException { Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.TAIL_DIR); return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile()); } // public synchronized TailFile.Data getTailDataFromHead() // throws ProCoreException { // Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.BRANCH_DIR); // return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile()); // } public static class GuardFile { public static final String GUARD_FILE = ProCoreServer.GUARD_FILE; private static final int VERSION_LINE = 1; private static final int PID_LINE = 2; private static final int PORT_LINE = 3; private static final int EXIT_LINE = 4; private static final int END_LINE = 5; private final File guardFile; GuardFile(File workingDir) { guardFile = new File(workingDir, GUARD_FILE); } int getPort() throws ProCoreException { String[] lines = getFileLines(guardFile, END_LINE); if (lines.length < PORT_LINE) throw new ProCoreException("Server port is not available. " + guardFile.getAbsolutePath()); return parseInt(lines[PORT_LINE-1]); } private void checkVersion(String[] lines) throws ProCoreException { if (lines.length < VERSION_LINE) throw new ProCoreException("Guard file version is not available. " + guardFile.getAbsolutePath()); int version = parseInt(lines[VERSION_LINE-1]); if (1 != version) throw new GuardFileVersionException(guardFile, "Unsupported guard file version. version=" + version); } boolean delete() { if (guardFile.exists()) return guardFile.delete(); else return true; // Guard file is deleted i.e. does not exit. } void deleteLog() { if (!guardFile.exists()) return; try { Files.delete(guardFile.toPath()); } catch (Throwable t) { t.printStackTrace(); } } Integer getExitValue() { String lines[] = getFileLines(guardFile, END_LINE); try { checkVersion(lines); if (lines.length >= EXIT_LINE) return parseInt(lines[EXIT_LINE-1]); else if (lines.length >= PID_LINE && !isRunning(lines[PID_LINE-1])) return -1; // Server has died without writing exit status. return null; // Server might be running. } catch (Exception e) { Logger.defaultLogError("Could not get ProCoreServer exit value.", e); return null; // Server might be running. } } // void stopProCoreServer() throws ProCoreException { // if (!guardFile.exists()) // throw new ServerGuardNotFoundException(guardFile.getParentFile(), "Guard file does not exist."); // String lines[] = new String[0]; // for (int i=0; i= END_LINE) // return; // Server is dead according to guard file. // if (!isRunning(lines[PID_LINE-1])) // return; // ProCoreServer is not running. // if (lines.length >= PORT_LINE) // ProCoreServer is running according to pid. Either it's slow or pid is for some other file. // break; // } // if (lines.length < PORT_LINE) // throw new ProCoreException("Guard file does not contain port."); // try { // int port = parseInt(lines[PORT_LINE-1]); // if (port > 0) { // ProCoreClient lProCoreClient = new ProCoreClient(); // SessionAddress sa = new SessionAddressImpl(new InetSocketAddress(LOCALHOST, port), true); // lProCoreClient.connect(sa); // try { // String reply = lProCoreClient.execute(QUIT_CMD); // int nConnect = Integer.parseInt(reply); // if (nConnect > 1) // throw new ProCoreException("More than one active connection in server while giving quit command. folder=" + guardFile.getParent()); // } finally { // lProCoreClient.disconnect(); // } // } // } catch (Throwable t) { // throw new ProCoreException("Failed to stop procore server cleanly.", t); // } // // Managed to send quit command to server, let's wait and see if it has any effect. // boolean dead = wait4ServerToDie(null, LONG_SLEEP_COUNT); // if (!dead) // throw new ProCoreException("Unable to stop server."); // } boolean isActive(ProCoreClient proCoreClient) throws ProCoreException { if (proCoreClient.isConnected()) throw new ProCoreException("Illegal argument. ProCoreClient must not be connected."); String lines[] = getFileLines(guardFile, END_LINE); if (lines.length != PORT_LINE) return false; checkVersion(lines); try { int port = parseInt(lines[PORT_LINE-1]); if (port > 0) { try { // Checking if core is responsive. // This should be supported regardless of version. SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true); proCoreClient.connect(sa); proCoreClient.execute(""); // Dubious, not necessarily supported by all protocol versions. return true; } catch (ProCoreException e) { } finally { // We must disconnect or check protocol version. proCoreClient.disconnect(); } } } catch (Throwable t) { // Intentionally empty. } return false; } boolean isAlive() throws ProCoreException { String lines[] = getFileLines(guardFile, END_LINE); if (lines.length < 1) return false; // No lines, assuming server is dead. checkVersion(lines); if (lines.length < PID_LINE) return false; // No process id, assuming server is dead. else if (lines.length >= END_LINE) return false; // End status available, assuming server is dead. else if (lines.length >= EXIT_LINE) return false; // Exit status available, assuming server is dying. else if (isRunning(lines[PID_LINE-1])) return true; // Process with given id running. Server might be alive. else return false;// Process with given id not running. } } public static class TailFile { public static class Data { Data(long version, long nextChangeSetId, long nextFreeId) { this.version = version; this.nextChangeSetId = nextChangeSetId; this.nextFreeId = nextFreeId; } long version; public long nextChangeSetId; long nextFreeId; } public static void createTailFile(File file, long nextChangeSetId, long nextFreeId, String dbId) throws ProCoreException { if (file.exists()) throw new TailException("Tail file exists and thus cannot be created. file=" + file); FileOutputStream fos; try { fos = new FileOutputStream(file); } catch (FileNotFoundException e) { throw new TailException("Tail file cannot be created. file=" + file); } PrintStream ps = new PrintStream(fos); try { ps.println(1); // file version ps.println(nextChangeSetId); // next revision ps.println(nextFreeId); // next free cluster ps.println(dbId); // database id } finally { ps.close(); } } public static Data readTailFile(File file) throws ProCoreException { String lines[] = getFileLines(file, 3); if (lines.length < 3) throw new TailReadException("Could not read data from " + file); long ver = Long.parseLong(lines[0]); long cs = Long.parseLong(lines[1]); long id = Long.parseLong(lines[2]); return new Data(ver, cs, id); } } } class SessionAddress { private final InetSocketAddress socketAddress; private final boolean ignoreProtocolVersion; public SessionAddress(InetSocketAddress socketAddress, boolean ignoreProtocolVersion) { assert(null != socketAddress); this.socketAddress = socketAddress; this.ignoreProtocolVersion = ignoreProtocolVersion; } public InetSocketAddress getSocketAddress() { return socketAddress; } public boolean ignoreProtocolVersion() { return ignoreProtocolVersion; } @Override public String toString() { return socketAddress.toString(); } } class PacketQueue { private final LinkedBlockingQueue packets = new LinkedBlockingQueue(); PacketQueue() { } void addFirst(Packet packet) { packets.add(packet); } Packet takeFirst() throws InterruptedException { return packets.take(); } } class Header { Header(int[] ints) { lastTokenIn = ints[0]; token = ints[1]; messageNumber = ints[2]; inflateSize = ints[3]; deflateSize = ints[4]; } int lastTokenIn; int token; int messageNumber; int inflateSize; int deflateSize; @Override public String toString() { StringBuilder sb = new StringBuilder(80); sb.append("lastTokenIn="); sb.append(lastTokenIn); sb.append(" token="); sb.append(token); sb.append(" messageNumber="); sb.append(messageNumber); sb.append(" inflateSize="); sb.append(inflateSize); sb.append(" deflateSize="); sb.append(deflateSize); return sb.toString(); } } class Packet { Header header; ByteBuffer bytes; Packet(int[] header, ByteBuffer bytes) { this.header = new Header(header); this.bytes = bytes; } } class ConnectionThread { private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS; private static final int HEADER_N = 5; private static final int HEADER_SIZE = HEADER_N * 4; private static final int DEFLATE = 4; private static final int INCREMENT_SIZE = 8192; private static int instanceCount = 0; class Manager implements Runnable { private final int instance; private volatile boolean carryOn = true; private volatile int lastTokenIn = 0; private volatile MethodQueue methodQueue = new MethodQueue(); private volatile CountDownLatch running = new CountDownLatch(1); private final EventHandler defaultEventHandler = new DefaultEventHandler(); // private EventHandler eventHandler = defaultEventHandler; Manager() { instance = ++instanceCount; } @Override public String toString() { return "Connection " + instance; } InetSocketAddress getAddress() { return connection.getAddress(); } boolean isConnected() { return connection.isConnected(); } void reconnect() throws ProCoreException, InterruptedException { InetSocketAddress a = connection.getAddress(); if (connection.addressNotInitialized() || 0 == a.getPort()) throw new ProCoreException("Address not ok. address=" + connection.getAddress()); connect(a); } void connect(int port) throws ProCoreException, InterruptedException { InetSocketAddress a = connection.getAddress(); if (connection.addressNotInitialized() || 0 == a.getPort() || (port != 0 && port != a.getPort())) a = new InetSocketAddress(ProCoreClient.LOCALHOST, port); connect(a); } void connect(InetSocketAddress address) throws ProCoreException, InterruptedException { if (connection.isConnected()) { if (connection.equalsAddress(address)) return; else throw new ProCoreException("Already connected to different address. old=" + connection.getAddress() + " new=" + address); } connection.init4Connection(address); connectionManager.connect(connection); // compression = new Compression(); if (thread.isAlive()) return; thread.start(); running.await(); } void call(Method method) throws ProCoreException, InterruptedException, IOException { // eventHandler = method.getEventHandler(eventHandler); connection.sendMethod(methodQueue, method, lastTokenIn); } void disconnect() { methodQueue.close(); connection.disconnect(); } @Override public void run() { // MARK: CT.Manger.run try { running.countDown(); while (carryOn) { Packet packet = null; try { packet = inputQueue.takeFirst(); } catch (InterruptedException e) { Util.trace("Wait for input packet interrupted. this=" + this); continue; } if (null == packet) break; handlePacket(packet); } if (carryOn) Util.trace("Thread " + thread.getName() + " stopped with null packet."); else Util.trace("Thread " + thread.getName() + " stopped with false carryOn."); } catch (Throwable t) { Util.logError("Thread " + thread.getName() + " stopped to exception:", t); } finally { disconnect(); running.countDown(); } } private void handlePacket(Packet packet) throws ProCoreException { if (packet.header.token > lastTokenIn) { lastTokenIn = packet.header.token; if (DEBUG) Util.showDebug("Setting lastTokeinIn=" + lastTokenIn); } Method method = methodQueue.remove(packet.header.token); if (null == method) { try { switch (packet.header.messageNumber) { default: { Util.logError("Missing method. token=" + packet.header.token); } return; case MessageNumber.ChangeSetUpdateRequest: { defaultEventHandler.on(new ChangeSetUpdateEvent(packet)); } break; } } catch (Throwable t) { Util.logError("Event handler failed:", t); } return; } // else // eventHandler = method.getEventHandler(defaultEventHandler); switch (packet.header.messageNumber) { default: { String s = "Illegal message number " + packet.header.messageNumber; method.gotException(s); } break; case MessageNumber.ChangeSetUpdateRequest: method.getEventHandler(defaultEventHandler).on(new ChangeSetUpdateEvent(packet)); methodQueue.add(method); // Event caused by method. Actual method not yet handled. break; case MessageNumber.NullResponse: //Util.log("Null(ack) message. method=" + method + " packet=" + packet); break; case MessageNumber.ExceptionResponse: method.gotException(packet); break; // MARK: handlePacket case MessageNumber.AAAResponse: case MessageNumber.AskTransactionResponse: case MessageNumber.CancelCommitResponse: case MessageNumber.CloseClientSessionResponse: case MessageNumber.EchoResponse: case MessageNumber.EndTransactionResponse: case MessageNumber.ExecuteResponse: case MessageNumber.GetChangeSetContextResponse: case MessageNumber.GetChangeSetDataResponse: case MessageNumber.GetChangeSetsResponse: case MessageNumber.GetClusterNewResponse: case MessageNumber.GetRefreshResponse: case MessageNumber.GetResourceSegmentResponse: case MessageNumber.GetServerInfoResponse: case MessageNumber.OpenClientSessionResponse: case MessageNumber.RefreshResponse: case MessageNumber.ReserveIdsResponse: case MessageNumber.UndoResponse: case MessageNumber.ReconnectResponse: case MessageNumber.GetRefresh2Response: case MessageNumber.GetClusterChangesResponse: case MessageNumber.GetServerInfo2Response: case MessageNumber.ListClustersResponse: { method.gotPacket(packet); } break; } } } private final ConnectionManager connectionManager; private final PacketQueue inputQueue = new PacketQueue(); private final Connection connection = new Connection(this); private final Manager manager = new Manager(); private Thread thread = new Thread(manager, manager.toString()); private int[] header = null; private int[] INTS = new int[HEADER_N]; private int remaining = HEADER_SIZE; private int deflateSize; ConnectionThread(ConnectionManager connectionManager) { this.connectionManager = connectionManager; } ByteBuffer getInputBuffer() { return getInputBuffer(INCREMENT_SIZE); } ByteBuffer getInputBuffer(int sizeAtLeast) { ByteBuffer bb = ByteBuffer.allocate(Math.max(sizeAtLeast, INCREMENT_SIZE)); bb.order(ByteOrder.LITTLE_ENDIAN); return bb; } @Override public String toString() { return "header=" + header + " remaining=" + remaining; } private String toString(ByteBuffer buffer, int nRead) { return this + " nRead=" + nRead + " buffer=" + buffer; } ByteBuffer handleInput(ByteBuffer buffer, int nRead) // nRead > 0 throws IOException { // MARK: handleInput int position = buffer.position(); if (null == header) { if (position < HEADER_SIZE) return buffer; else { // Got header. buffer.position(0); IntBuffer ib = buffer.asIntBuffer(); ib.get(INTS); ib = null; header = INTS; deflateSize = header[DEFLATE]; if (deflateSize < 0) throw new IOException("Illegal deflate size=" + deflateSize); remaining += deflateSize; if (remaining > buffer.limit()) { // We need more space. This is not optimal but a simple way to handle this case. ByteBuffer bb = getInputBuffer(remaining); buffer.position(0); buffer.limit(position); bb.put(buffer); return bb; } buffer.position(position); } } if (position < remaining) return buffer; // Need more data. else if (position > remaining) { // Got whole packet and then some. This is not optimal but a simple way to handle this case. ByteBuffer bb = getInputBuffer(remaining); buffer.position(0); buffer.limit(remaining); bb.put(buffer); inputQueue.addFirst(new Packet(header, bb)); ByteBuffer b2 = getInputBuffer(buffer.limit()); buffer.position(remaining); buffer.limit(position); b2.put(buffer); header = null; remaining = HEADER_SIZE; return handleInput(b2, b2.position()); } else if (position != remaining) throw new IOException("Assertion error. this=" + toString(buffer, nRead)); // Got exactly one packet. inputQueue.addFirst(new Packet(header, buffer)); header = null; remaining = HEADER_SIZE; return getInputBuffer(INCREMENT_SIZE); } InetSocketAddress getAddress() { return manager.getAddress(); } boolean isConnected() { return manager.isConnected(); } void reconnect() throws ProCoreException, InterruptedException { manager.reconnect(); } void connect(int port) throws ProCoreException, InterruptedException { manager.connect(port); } void connect(InetSocketAddress address) throws ProCoreException, InterruptedException { manager.connect(address); } void disconnect() { manager.disconnect(); } Connection call(Method m) throws ProCoreException, InterruptedException, IOException { manager.call(m); return connection; } } class Client { private final ProCoreServer proCoreServer; private final ProCoreClient proCoreClient; private final ConnectionThread connectionThread; private boolean closed = false; Client(ProCoreServer proCoreServer, ProCoreClient proCoreClient, ConnectionThread connectionThread) { this.proCoreServer = proCoreServer; this.proCoreClient = proCoreClient; this.connectionThread = connectionThread; } void open() throws ProCoreException, InterruptedException { synchronized (proCoreClient) { if (!connectionThread.isConnected()) connectionThread.connect(proCoreServer.getPort()); } closed = false; } boolean isOpen() { synchronized (proCoreClient) { return connectionThread.isConnected(); } } boolean isClosed() { synchronized (proCoreClient) { return closed || !connectionThread.isConnected(); } } void close() { proCoreClient.deleteClient(this); closed = true; } String execute(String command) throws ProCoreException { return proCoreClient.execute(command); } void call(Method m) throws ProCoreException { proCoreClient.call(m); } } class ProCoreClient { public static final String LOCALHOST = ""; class Remote { private final ConnectionManager connectionManager = ConnectionManager.getInstance(); private final ConnectionThread connectionThread = new ConnectionThread(connectionManager); Remote() { } } private final Remote remote = new Remote(); private final Set clients = new HashSet(); private final ProCoreServer proCoreServer; public ProCoreClient(ProCoreServer proCoreServer) throws ProCoreException { this.proCoreServer = proCoreServer; } synchronized public int getPort() { return remote.connectionThread.getAddress().getPort(); } synchronized public boolean isConnected() { return remote.connectionThread.isConnected(); } synchronized public Client newClient() throws ProCoreException { return newClient(proCoreServer, remote.connectionThread.getAddress()); } synchronized public Client newClient(ProCoreServer proCoreServer, InetSocketAddress address) throws ProCoreException { if (!remote.connectionThread.isConnected()) try { remote.connectionThread.connect(address); } catch (InterruptedException e) { if (!remote.connectionThread.isConnected()) { remote.connectionThread.disconnect(); throw new ProCoreException("Failed to create new client."); } } else if (!address.equals(remote.connectionThread.getAddress())) throw new ProCoreException("Illegal newClient call. Already connected to different address."); Client client = new Client(proCoreServer, this, remote.connectionThread); clients.add(client); return client; } synchronized int deleteClient(Client client) { clients.remove(client); return clients.size(); } void closeClient(int aSessionId) throws ProCoreException { try { CloseClientSessionFunction f = new CloseClientSessionFunction(aSessionId); Method m = new Method(f, null, null); Connection c = remote.connectionThread.call(m); m.waitForReply(c); } catch (InterruptedException e) { throw new ProCoreException("Thread interrupted during method call.", e); } catch (IOException e) { throw new ProCoreException("IOException during method call.", e); } catch (Throwable e) { throw new ProCoreException("Throwable during method call.", e); } } synchronized public String execute(String command) throws ProCoreException { try { ExecuteFunction f = new ExecuteFunction(command); Method m = new Method(f, null, null); Connection c = remote.connectionThread.call(m); m.waitForReply(c); return f.out; } catch (InterruptedException e) { throw new ProCoreException("Thread interrupted during method call.", e); } catch (IOException e) { throw new ProCoreException("IOException during method call.", e); } catch (Throwable e) { throw new ProCoreException("Throwable during method call.", e); } } private void recall(Method method, IOException e) throws ProCoreException {// :) if (remote.connectionThread.isConnected()) throw new ProCoreException("IOException during method call.", e); try { remote.connectionThread.reconnect(); Connection c = remote.connectionThread.call(method); method.waitForReply(c); } catch (Throwable t) { throw new ProCoreException("IOException during method call. Recall failed.", e); } } synchronized public void call(Method method) throws ProCoreException { try { Connection c = remote.connectionThread.call(method); method.waitForReply(c); } catch (InterruptedException e) { throw new ProCoreException("Thread interrupted during method call.", e); } catch (IOException e) { recall(method, e); } catch (ProCoreException e) { throw e; } catch (Throwable e) { throw new ProCoreException("Throwable during method call.", e); } } synchronized public void connect(int port) throws ProCoreException, InterruptedException { remote.connectionThread.connect(port); } synchronized public void connect(SessionAddress sessionAddress) throws ProCoreException, InterruptedException { remote.connectionThread.connect(sessionAddress.getSocketAddress()); } synchronized public void disconnect() throws ProCoreException { remote.connectionThread.disconnect(); } } abstract class Event { final String name; final Packet packet; Event(String name, Packet packet) { this.name = name; this.packet = packet; } int getToken() { return packet.header.token; } void deserialize(org.simantics.db.server.protocol.AbstractEvent event) { event.deserialize(event.getRequestNumber(), getDataBuffer()); } private org.simantics.db.server.protocol.DataBuffer getDataBuffer() { this.packet.bytes.position(20); return new org.simantics.db.server.protocol.DataBuffer(this.packet.bytes); } @Override public String toString() { return name + " token=" + packet.header.token; } } class ChangeSetUpdateEvent extends Event { ChangeSetUpdateEvent(Packet packet) { super("ChangeSetUpdateEvent", packet); } } abstract class EventHandler { abstract void on(Event event); } class DefaultEventHandler extends EventHandler { void on(Event event) { Util.log("Missing event handler. event=" + event); } }