X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.server%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fserver%2Finternal%2FProCoreServer.java;h=cf1eb18ac95c8af984c6d7631d3b1e0ca279e35a;hb=HEAD;hp=989e2058b75167a53097bc2e8dcac81e994aa564;hpb=969bd23cab98a79ca9101af33334000879fb60c5;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ProCoreServer.java b/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ProCoreServer.java index 989e2058b..cf1eb18ac 100644 --- a/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ProCoreServer.java +++ b/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ProCoreServer.java @@ -1,1441 +1,1442 @@ -/******************************************************************************* - - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.db.server.internal; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.PrintStream; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.IntBuffer; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; - -import org.simantics.db.common.utils.Logger; -import org.simantics.db.server.DatabaseLastExitException; -import org.simantics.db.server.GuardFileVersionException; -import org.simantics.db.server.ProCoreException; -import org.simantics.db.server.ServerNotFoundException; -import org.simantics.db.server.protocol.CloseClientSessionFunction; -import org.simantics.db.server.protocol.ExecuteFunction; -import org.simantics.db.server.protocol.MessageNumber; - -public class ProCoreServer { - private static Map workingDirs = new HashMap(); - private static Map databaseIds = new HashMap(); - private static final String LOCALHOST = ProCoreClient.LOCALHOST; - public static final int VERSION_MAJOR = 1; - public static final int VERSION_MINOR = 4; // 2015-05-05 - public static final String BRANCH_DIR = "procore.headClusters.procore"; - public static final String TAIL_DIR = "procore.tailClusters.procore"; - public static final String TAIL_FILE = "procore.tail.procore"; - public static final String CONFIG_FILE = "procore.config.procore"; - public static final String GUARD_FILE = "procore.guard.procore"; - public static final String JOURNAL_FILE = "procore.journal.procore"; - public static final String LOG_FILE = "procore.log.procore"; - public static final String DCS_FILE = "procore.dumpChangeSets.procore"; - public static final String RECOVERY_NEEDED_FILE = "recovery.needed"; - public static final String RECOVERY_IGNORED_FILE = "recovery.ignored"; - public static final String PROTOCOL_IGNORED_FILE = "protocol.ignored"; - public static final String PAGE_FILE_PATTERN = "procore.page*.procore"; - public static final String DATA_PREFIX = "ClusterData."; - public static final String DATA_PATTERN = DATA_PREFIX + "*"; - public static final String INDEX_PREFIX = "ClusterIndex."; - public static final String INDEX_PATTERN = INDEX_PREFIX + "*"; - public static final String VALUE_PREFIX = "ExternalValue."; - public static final String VALUE_PATTERN = VALUE_PREFIX + "*"; - public static final String VALUE_SUFFIX = ".procore"; - public static final String DELETED_PREFIX = "ClusterDeleted."; - public static final String DELETED_PATTERN = DELETED_PREFIX + "*"; - private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS; - public static final String QUIT_CMD = "quit"; -// private static final int SHORT_SLEEP_COUNT = 1; -// private static final int LONG_SLEEP_COUNT = 10; -// static public void stopProCoreServer(File workingDir) throws ProCoreException { -// GuardFile guardFile = new GuardFile(workingDir); -// guardFile.stopProCoreServer(); -// } - private static boolean isRunning(String aPid) throws ProCoreException { - boolean isRunning = false; - try { - switch (OSType.calculate()) { - case WINDOWS: { - Process p = Runtime.getRuntime().exec("tasklist.exe /fo csv /nh /fi \"pid eq " + aPid + "\""); - BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); - String line = input.readLine(); - if (line.matches(".*ProCoreServer.exe.*")) - isRunning = true; - input.close(); - break; - } - case UNIX: { - Process p = Runtime.getRuntime().exec("ps -p " + aPid + " -o comm="); - BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); - String line = input.readLine(); - if (line.matches(".*ProCoreServer.*")) - isRunning = true; - input.close(); - break; - } - default: - } - } catch (Exception e) { - throw new ProCoreException("Could not get list of running processes.", e); - } - return isRunning; - } - static class ProCoreProcess { - private enum State { Created, Initialized, ThreadStarted, ProcessStarted, Prompted, GotPort, ThreadStopped }; - class Manager implements Runnable { - private State state = State.Created; - private CountDownLatch serverPrompted; - private LinkedBlockingQueue reply; - private volatile String command; - private volatile Integer exitValue; - private volatile boolean carryOn; - private volatile int port; - private final File dbFolder; // Just for exception i.e. error reporting. - Manager(File dbFolder) { - this.dbFolder = dbFolder; - init(); - } - private void init() { - serverPrompted = new CountDownLatch(1); - reply = new LinkedBlockingQueue(); - exitValue = null; - carryOn = true; - port = 0; - state = State.Initialized; - } - int getPort() { - return port; - } - void tryToStart() throws ProCoreException, InterruptedException { - if (thread.isAlive()) - return; - thread = new Thread(manager, manager.initAndGetName()); - thread.start(); - serverPrompted.await(); - if (!state.equals(State.Prompted)) - throw new DatabaseLastExitException(dbFolder, "Server did not prompt."); - String reply = giveCommand("print port"); - try { - port = Integer.parseInt(reply); - } catch (NumberFormatException e) { - if (null != exitValue && 0 != exitValue) - throw new DatabaseLastExitException(dbFolder, "Server did not prompt."); - if (reply.equals("End of server thread.")) - throw new ProCoreException("Server did not start (did not prompt). state=" + state); - else - throw new ProCoreException("Server did not start. Could not parse port. reply=" + reply); - } - state = State.GotPort; - } - String giveCommand(String command) throws ProCoreException, InterruptedException { - if (!thread.isAlive()) - throw new ProCoreException("Server thread not alive."); - if (null != this.command) - throw new ProCoreException("Old command pending."); - this.command = command; - return reply.take(); - } - private String initAndGetName() { - init(); - return "ProCoreProcess " + ++instanceCount + " " + builder.directory(); - } - boolean isActive() throws ProCoreException { - if (null == exitValue && thread.isAlive()) - return true; - else - return false; - } - boolean tryToStop() throws ProCoreException { - if (!thread.isAlive()) - return true; - carryOn = false; - thread.interrupt(); - try { - thread.join(10000); // milliseconds - } catch (InterruptedException e) { - } - return !thread.isAlive(); - } - @Override - public void run() { - state = State.ThreadStarted; - Util.log("ProCoreServer thread started."); - try { - Util.trace("ProCoreProcessManager start."); - Process process = builder.start(); - exitValue = exitValue(process); - if (null != exitValue) - Util.logError("Server process did not start."); - else { - state = State.ProcessStarted; - runProcessStarted(process); // Process start - } - } catch (IOException e) { - Util.logError("Failed during process watch."); - } finally { - reply.add("End of server thread."); - Util.trace("ProCoreServerThread stop."); - state = State.ThreadStopped; - serverPrompted.countDown(); - } - } - private void runProcessStarted(Process process) throws IOException { - try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); - final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));) - { - do { - try { - if (reader.ready()) - handleLine(reader.readLine(), reader, writer); - else if (null != command) { - writer.write(command); - command = null; - writer.newLine(); - writer.flush(); - String line = getReply(reader); - reply.add(line); - } else - Thread.sleep(100); // milliseconds - } catch (InterruptedException e) { - Util.trace("ProCoreServerThread interrupted."); - } - exitValue = exitValue(process); - if (null == exitValue && !carryOn) - try { - try { - writer.write(QUIT_CMD); - writer.newLine(); - writer.flush(); - } catch (Throwable t) { - Util.logError("Wait for process death was interrupted.", t); - } finally { - } - exitValue = process.waitFor(); - } catch (InterruptedException e) { - Util.logError("Wait for process death was interrupted.", e); - carryOn = true; - continue; - } - } while (null == exitValue); - } // End of resource try. - } - private String getReply(BufferedReader reader) throws IOException { - StringBuilder sb = new StringBuilder(); - while (true) { - String line = reader.readLine(); - if (null == line || line.startsWith("Waiting for input?")) - break; - sb.append(line); - } - return sb.toString(); - } - private void handleLine(String line, BufferedReader reader, BufferedWriter writer) throws IOException { - if (DEBUG) - Util.showDebug(line); - if (line.startsWith("Main end.")) { - carryOn = false; // Last line from server. - } else if (serverPrompted.getCount() > 0 && line.startsWith("Waiting for input?")) { - state = State.Prompted; - serverPrompted.countDown(); // Server has been started and is waiting for console input. - } else - Util.trace("From server. line=" + line); - } - private Integer exitValue(Process process) { - try { - return process.exitValue(); - } catch (IllegalThreadStateException e) { - return null; // Still alive. - } - } - } - private static int instanceCount; // Keeps count of thread instances. - private final ProcessBuilder builder; - private final Manager manager; - private Thread thread; - ProCoreProcess(File exeFile, File workingDir) { - String[] args = new String[0]; - String[] cmd = new String[1 + args.length]; - cmd[0] = exeFile.getAbsolutePath().toString(); - System.arraycopy(args, 0, cmd, 1, args.length); - builder = new ProcessBuilder(cmd); - builder.redirectErrorStream(true); - builder.directory(workingDir); - manager = new Manager(workingDir); - thread = new Thread(manager, manager.initAndGetName()); - } - public void tryToStart() throws ProCoreException, InterruptedException { - manager.tryToStart(); - } -// public int exitValue() throws IllegalStateException { -// Integer i = proCoreServerThread.getExitValue(); -// if (null == i) -// throw new IllegalStateException("ProCoreServerThread is still running."); -// return i; -// } -// public boolean ignoreProtocolVersion() { -// File wd = builder.directory(); -// if (null == wd) -// return false; -// Path ignore = wd.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE); -// if (Files.exists(ignore)) -// return true; -// return false; -// } - public boolean isActive() throws ProCoreException { - return manager.isActive(); - } - public boolean tryToStop() throws ProCoreException { - return manager.tryToStop(); - } - int getPort() throws ProCoreException { - return manager.getPort(); - } - } - private static enum OSType { - UNIX, WINDOWS, UNKNOWN; - - public static OSType calculate() { - String osName = System.getProperty("os.name"); - assert osName != null; - osName = osName.toLowerCase(); - if (osName.startsWith("windows")) - return WINDOWS; - if (osName.startsWith("mac os x") || osName.startsWith("linux") || osName.startsWith("sun")) - return UNIX; - return UNKNOWN; - } - } - static public boolean ignoreProtocolVersion(File dbFolder) { - if (null == dbFolder) - return false; - if (Files.exists(dbFolder.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE))) - return true; - return false; - } - static public ProCoreServer getProCoreServer(File serverDir, final File workingDir) - throws ProCoreException, ServerNotFoundException { - try { - ProCoreServer proCoreServer; - synchronized (workingDirs) { - proCoreServer = workingDirs.get(workingDir.getCanonicalPath()); - if (null != proCoreServer) - return proCoreServer; - if (!serverDir.isDirectory()) - throw new ProCoreException("Expected server directory as argument"); - if (!workingDir.isDirectory()) - throw new ProCoreException("Expected working directory as argument. wd=" + workingDir.getAbsolutePath()); - String exeSuffix = Activator.getExeSuffix(); - File executable = new File(serverDir, "ProCoreServer" + exeSuffix); - proCoreServer = new ProCoreServer(executable, workingDir); - workingDirs.put(workingDir.getCanonicalPath(), proCoreServer); - return proCoreServer; - } - } catch (IOException e) { - throw new ProCoreException("IOException", e); - } - } - - public static void logVersionInformation() { - StringBuilder msg = new StringBuilder(200); - String nl = " \n"; // Default log error removes newlines so I add space to make the text easier to read. - msg.append("ProCore version information:"); - msg.append(nl); - msg.append("ProCoreServer version=" + ProCoreServer.VERSION_MAJOR + "." + ProCoreServer.VERSION_MINOR + " protocol=" + MessageNumber.ProtocolVersionMajor + "." + MessageNumber.ProtocolVersionMinor); - try { - msg.append(" folder=" + org.simantics.db.server.internal.Activator.getServerFolder()); - } catch (IOException e) { - msg.append(" folder information not available"); - } - msg.append(nl); - msg.append("svn id=$Id: ProCoreServer.java r31684 2015-09-10 13:45:00Z $"); - String str = msg.toString(); - Logger.defaultLog(str); - } - private static String[] getFileLines(final File aFile, int maxLines) { - final String[] STRING_EMPTY_ARRAY = new String[0]; - try { - if (!aFile.canRead()) { - return STRING_EMPTY_ARRAY; - } - try (FileInputStream fis = new FileInputStream(aFile); - InputStreamReader isr = new InputStreamReader(fis, "US-ASCII"); - BufferedReader br = new BufferedReader(isr);){ - String line = null; - int count = 0; - ArrayList lines = new ArrayList(); - while ((line = br.readLine()) != null) { - if (++count > maxLines) - break; // Caller not interested in more lines. - lines.add(line); - } - return lines.toArray(STRING_EMPTY_ARRAY); - } catch (FileNotFoundException e) { - return STRING_EMPTY_ARRAY; - } - } catch (Throwable t) { - if (DEBUG) - t.printStackTrace(); - Logger.defaultLogError(t); - } - return new String[0]; - } - private static int parseInt(String s) - throws ProCoreException { - try { - return Integer.parseInt(s); - } catch (NumberFormatException e) { - String msg = "Could not convert string to number. string=" + s; - Logger.defaultLogError(msg); - throw new ProCoreException(msg, e); - } - } - private String databaseId = null; - private final GuardFile guardFile; - private final ProCoreClient proCoreClient; - ProCoreClient getProCoreClient() { - return proCoreClient; - } - private final ProCoreProcess proCoreProcess; - private ProCoreServer(File exeFile, File workingDir) - throws ProCoreException { - if (null == exeFile) - throw new ProCoreException("Illegal argument: exeFile is null."); - if (null == workingDir) - throw new ProCoreException("Illegal argument: workingDir is null."); - guardFile = new GuardFile(workingDir); - proCoreClient = new ProCoreClient(this); - proCoreProcess = new ProCoreProcess(exeFile, workingDir); - Util.log("ProCoreServer exe: " + exeFile); - Util.log("ProCoreServer folder: " + workingDir); - logVersionInformation(); - } -// //@Override - public synchronized String execute(String command) throws ProCoreException, InterruptedException { - return proCoreClient.execute(command); - } - private void startInternal() - throws ProCoreException { - if (proCoreClient.isConnected()) - return; - Exception exception; - try { - proCoreProcess.tryToStart(); - int port = proCoreProcess.getPort(); - SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true); - proCoreClient.connect(sa); - return; - } catch (InterruptedException e) { - Util.logError("ProCoreProcess start was interrupted.", e); - exception = e; - } catch (DatabaseLastExitException e) { - throw e; // This situation can be analyzed and handled by start handler. - } catch (ProCoreException e) { - Util.logError("Failed to start ProCoreServer process.", e); - exception = e; - } - boolean isActive = guardFile.isActive(proCoreClient); - if (!isActive) - throw new ProCoreException("Failed to connect to ProCoreServer.", exception); - } -// private int getWaitCount() throws ProCoreException { -// if (!proCoreProcess.isActive()) -// return SHORT_SLEEP_COUNT; // Short wait because server process is not running. -// else -// return LONG_SLEEP_COUNT; -// } -// private void startInternalReinit(int exitStatus) -// throws ProCoreException { -// switch (exitStatus) { -// default: // What has happened here? -// throw new DatabaseStartException(workingDir, getExitMessage(exitStatus)); -// case -1: // Exit status not available, not sure what has happened here. -// case 29: // Server died because it could not read exit status from guard file. -// case 30: // Server died because old exit status was not zero. -// case 40: // Server died because it received terminating signal. -// case 42: // Server died because it could not read end status from guard file. -// case 43: // Server died because end value was not zero. -// case 44: // Server died because server could not read version from guard file. -// case 45: // Server died because server did not support guard file version. -// case 47: // Server died because server could not listen to any port. -// break; -// case 32: // Server died becuase database version did not match. -// throw new DatabaseVersionException(workingDir, getExitMessage(exitStatus)); -// case 33: // Server died because database has been corrupted. -// throw new DatabaseCorruptedException(workingDir, getExitMessage(exitStatus)); -// case 39: // Server died because last exit was not clean. -// throw new DatabaseLastExitException(workingDir, getExitMessage(exitStatus)); -// } -// boolean dead = guardFile.wait4ServerToDie(proCoreClient, getWaitCount()); -// if (!dead) { -// StringBuilder sb = new StringBuilder(256); -// sb.append("ProCoreServer is running according to guard file. Giving up trying to start a new instance. Server exit message:\n"); -// sb.append(getExitMessage(exitStatus)); -// throw new DatabaseStartException(workingDir, sb.toString()); -// } -// boolean deleted = guardFile.delete(); -// if (!deleted) -// Logger.defaultLogError("ProCoreServer is dead but can not delete guard file. " + getExitMessage(exitStatus)); -// } - //@Override - public synchronized boolean isActive() - throws ProCoreException { - if (!proCoreClient.isConnected()) - return guardFile.isActive(proCoreClient); - proCoreClient.execute(""); - return true; - } - public synchronized boolean isAlive() - throws ProCoreException { - if (proCoreClient.isConnected()) - return true; - if (proCoreProcess.isActive()) - return true; - return guardFile.isAlive(); - } - Client newClient() throws ProCoreException { - if (proCoreClient.isConnected()) - return proCoreClient.newClient(); - int port = getPort(); - InetSocketAddress isa = new InetSocketAddress(LOCALHOST, port); - return proCoreClient.newClient(this, isa); - } - public synchronized boolean tryToStop() - throws ProCoreException { - synchronized (proCoreClient) { - ProCoreException ex = null; - if (!proCoreClient.isConnected() && !proCoreProcess.isActive()) { - try { - proCoreClient.connect(getPort()); - } catch (InterruptedException e) { - } - } - if (proCoreClient.isConnected()) { - String s = ""; - try { - s = proCoreClient.execute(QUIT_CMD); - String t = s.replaceAll("\n", ""); - int n = Integer.parseInt(t); - if (n != 1) - Util.log("Number of connections after quit command is " + n + "."); - proCoreClient.closeClient(0); - if (!isLocal() && n > 1) - throw new ProCoreException("More than one connection for remote. n=" + n); - } catch (NumberFormatException e) { - Util.logError("Failed to parse number of connections. s=" + s, e); - } catch (ProCoreException e) { - ex = e; - } finally { - proCoreClient.disconnect(); - } - } - try { - proCoreProcess.tryToStop(); - } finally { - if (proCoreProcess.isActive() && null == ex) - ex = new ProCoreException("Failed to stop ProCoreProcess."); - } - if (null != ex) - throw ex; - return !proCoreClient.isConnected() && !proCoreProcess.isActive(); - } - } - public synchronized boolean isConnected() - throws ProCoreException { - return proCoreClient.isConnected(); - } - public synchronized boolean isLocal() - throws ProCoreException { - return proCoreClient.isConnected() && proCoreProcess.isActive(); - } - public synchronized void disconnect() - throws ProCoreException { - proCoreClient.disconnect(); - } - public synchronized void connect() - throws ProCoreException, InterruptedException { - int port = getPort(); - if (0 == port) - throw new ProCoreException("Port 0 not supported as connection address."); - proCoreClient.connect(port); - } - private String getExitMessage(int exitValue) { - switch (exitValue) { - default: - return "Unexpected exit value = " + exitValue + "."; - case 1: - return "Exception was thrown. This indicates problems with program logic."; - case 2: - return "CallException was thrown. This indicates problems with calls to/from parent server."; - case 3: - return "ProtocolException was thrown. This indicates problems with executable versions between local and parent servers."; - case 4: - return "NoSuchElementException was thrown. This indicates problems with program logic."; - case 5: - return "IllegalArgumentException was thrown. This indicates problems with program logic."; - case 6: - return "IllegalResourceException was thrown. This indicates problems with server data."; - case 7: - return "IllegalClusterException was thrown. This indicates problems with server data."; - case 8: - return "ParseException was thrown. This indicates problems with/during parsing of the configuration file."; - case 9: - return "PortSemaphoreException was thrown. This indicates that some other server is using the same port number."; - case 10: - return "DatabaseSemaphoreException was thrown. This indicates that some other server is using the same database id."; - case 11: - return "SystemException was thrown. This indicates problems with program logic."; - case 12: - return "CppException was thrown. This indicates problems with program logic."; - case 13: - return "UnknownException was thrown. This indicates problems with program logic."; - case 14: - return "ThrowException was thrown. This indicates problems with program logic."; - case 15: - return "AssertException was thrown. This indicates problems with program logic."; - case 16: - return "ParentCheckException was thrown. This indicates problems with the database version of parent."; - case 17: - return "ParentConnectionException was thrown. This indicates problems with parent child relationship."; - case 18: - return "ConnectionException was thrown. This indicates problems with TCP/IP connections."; - case 19: - return "ProxyException was thrown. This indicates problems with TCP/IP connections to proxy."; - case 20: - return "NoSuchFileException was thrown. This indicates problems with a missing file."; - case 21: - return "JournalException was thrown. This indicates problems with journal mechanisim."; - case 22: - return "OutOfSpaceException was thrown. This indicates problems with memory."; - case 23: - return "ExternalValueException was thrown. This indicates problems with large value handling."; - case 24: - return "IllegalTransactionException was thrown. This indicates problems with transaction logic."; - case 25: - return "WriteTransactionException was thrown. This indicates that server failed during write transaction."; - case 26: - return "ConsoleException was thrown. This indicates that server console was closed without quit command."; - case 27: - return "ExitException was thrown. This indicates that server did not exit cleanly."; - case 28: - return "ExitReadPIdException was thrown. This indicates that server did not exit cleanly."; - case 29: - return "ExitReadStatusException was thrown. This indicates that server could not read exit status from guard file."; - case 30: - return "ExitStatusValueException was thrown. This indicates that server did not exit cleanly."; - case 31: - return "UpdateException was thrown. This indicates that server failed during update."; - case 32: - return "DatabaseVersionException was thrown. This indicates that server can not use given database."; - case 33: - return "DatabaseCorruptionException was thrown. This indicates that server has detected database corruption."; - case 34: - return "ExitReadPortException was thrown. This indicates that server did not start or exit cleanly."; - case 35: - return "ExitGuardOpenException was thrown. This indicates that server could not open guard file."; - case 36: - return "ExitGuardOpenWriteException was thrown. This indicates that server could not open guard file for writing."; - case 37: - return "ExitGuardWritePidException was thrown. This indicates that server could not write pid to guard file."; - case 38: - return "ExitGuardWritePortException was thrown. This indicates that server could not write port to guard file."; - case 39: - return "LastCommitNotCleanException was thrown. This indicates that client died during transaction. Recovery action needed."; - case 40: - return "TerminatedException was thrown. This indicates that server died because it received terminating signal."; - case 41: - return "ExitGuardWriteStatusException was thrown. This indicates that server could not write exit status to guard file."; - case 42: - return "EndReadStatusException was thrown. This indicates that server died because it could not read end status from guard file."; - case 43: - return "EndStatusValueException was thrown. This indicates that server died because end value was not zero."; - case 44: - return "ExitGuardReadVersionException was thrown. This indicates that server died because server could not read version from guard file."; - case 45: - return "ExitGuardValueVersionException was thrown. This indicates that server died because server did not support guard file version."; - case 46: - return "ExitGuardWriteVersionException was thrown. This indicates that server died because server could not write version to guard file."; - case 47: - return "ListenPortException was thrown. This indicates that server died because server could not listen to given port or any other port."; - case 48: - return "JournalVersionException was thrown. This indicates that database was written by older version."; - case 49: - return "TailFileException was thrown. This indicates problems with handling of data involved with purge operation."; - } - } - synchronized void start() - throws ProCoreException { - try { - startInternal(); - } catch (DatabaseLastExitException e) { - throw e; // This can be analyzed and handled by start handler. - } catch (Throwable t) { - ProCoreException pe = null; - if (t instanceof ProCoreException) - pe = (ProCoreException)t; - else { - String msg = "ProCoreServer startup failed." - + " Automatic rcovery handling not implemented." - + " Database must be fixed manually."; - pe = new ProCoreException(msg, t); - } - if (null == pe.getDbFolder()) - pe.setDbFolder(proCoreProcess.builder.directory()); - } - } - synchronized void stop() - throws ProCoreException, InterruptedException{ - if (null != databaseId) - databaseIds.remove(databaseId); - boolean dead = tryToStop(); - if (!dead) - throw new ProCoreException("Failed to stop ProCoreServer."); - } - public synchronized String getExitMessage() - throws ProCoreException { - Integer returnValue = guardFile.getExitValue(); - if (null == returnValue) - throw new ProCoreException("Exit message not available."); - return getExitMessage(returnValue); - } - //@Override - public synchronized ServerAddress getAddress() - throws ProCoreException { - return new ServerAddress(LOCALHOST, getPort()); - } - //@Override - public synchronized int getPort() - throws ProCoreException { - int port = 0; - if (proCoreClient.isConnected()) { - port = proCoreClient.getPort(); - if (0 != port) - return port; - } - if (proCoreProcess.isActive()) { - port = proCoreProcess.getPort(); - if (0 != port) - return port; - } - return guardFile.getPort(); - } - public synchronized TailFile.Data getTailData() - throws ProCoreException { - Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.TAIL_DIR); - return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile()); - } -// public synchronized TailFile.Data getTailDataFromHead() -// throws ProCoreException { -// Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.BRANCH_DIR); -// return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile()); -// } - public static class GuardFile { - public static final String GUARD_FILE = ProCoreServer.GUARD_FILE; - private static final int VERSION_LINE = 1; - private static final int PID_LINE = 2; - private static final int PORT_LINE = 3; - private static final int EXIT_LINE = 4; - private static final int END_LINE = 5; - private final File guardFile; - GuardFile(File workingDir) { - guardFile = new File(workingDir, GUARD_FILE); - } - int getPort() throws ProCoreException { - String[] lines = getFileLines(guardFile, END_LINE); - if (lines.length < PORT_LINE) - throw new ProCoreException("Server port is not available. " + guardFile.getAbsolutePath()); - return parseInt(lines[PORT_LINE-1]); - } - private void checkVersion(String[] lines) throws ProCoreException { - if (lines.length < VERSION_LINE) - throw new ProCoreException("Guard file version is not available. " + guardFile.getAbsolutePath()); - int version = parseInt(lines[VERSION_LINE-1]); - if (1 != version) - throw new GuardFileVersionException(guardFile, "Unsupported guard file version. version=" + version); - } - boolean delete() { - if (guardFile.exists()) - return guardFile.delete(); - else - return true; // Guard file is deleted i.e. does not exit. - } - void deleteLog() { - if (!guardFile.exists()) - return; - try { - Files.delete(guardFile.toPath()); - } catch (Throwable t) { - t.printStackTrace(); - } - } - Integer getExitValue() { - String lines[] = getFileLines(guardFile, END_LINE); - try { - checkVersion(lines); - if (lines.length >= EXIT_LINE) - return parseInt(lines[EXIT_LINE-1]); - else if (lines.length >= PID_LINE && !isRunning(lines[PID_LINE-1])) - return -1; // Server has died without writing exit status. - return null; // Server might be running. - } catch (Exception e) { - Logger.defaultLogError("Could not get ProCoreServer exit value.", e); - return null; // Server might be running. - } - } -// void stopProCoreServer() throws ProCoreException { -// if (!guardFile.exists()) -// throw new ServerGuardNotFoundException(guardFile.getParentFile(), "Guard file does not exist."); -// String lines[] = new String[0]; -// for (int i=0; i= END_LINE) -// return; // Server is dead according to guard file. -// if (!isRunning(lines[PID_LINE-1])) -// return; // ProCoreServer is not running. -// if (lines.length >= PORT_LINE) // ProCoreServer is running according to pid. Either it's slow or pid is for some other file. -// break; -// } -// if (lines.length < PORT_LINE) -// throw new ProCoreException("Guard file does not contain port."); -// try { -// int port = parseInt(lines[PORT_LINE-1]); -// if (port > 0) { -// ProCoreClient lProCoreClient = new ProCoreClient(); -// SessionAddress sa = new SessionAddressImpl(new InetSocketAddress(LOCALHOST, port), true); -// lProCoreClient.connect(sa); -// try { -// String reply = lProCoreClient.execute(QUIT_CMD); -// int nConnect = Integer.parseInt(reply); -// if (nConnect > 1) -// throw new ProCoreException("More than one active connection in server while giving quit command. folder=" + guardFile.getParent()); -// } finally { -// lProCoreClient.disconnect(); -// } -// } -// } catch (Throwable t) { -// throw new ProCoreException("Failed to stop procore server cleanly.", t); -// } -// // Managed to send quit command to server, let's wait and see if it has any effect. -// boolean dead = wait4ServerToDie(null, LONG_SLEEP_COUNT); -// if (!dead) -// throw new ProCoreException("Unable to stop server."); -// } - boolean isActive(ProCoreClient proCoreClient) throws ProCoreException { - if (proCoreClient.isConnected()) - throw new ProCoreException("Illegal argument. ProCoreClient must not be connected."); - String lines[] = getFileLines(guardFile, END_LINE); - if (lines.length != PORT_LINE) - return false; - checkVersion(lines); - try { - int port = parseInt(lines[PORT_LINE-1]); - if (port > 0) { - try { - // Checking if core is responsive. - // This should be supported regardless of version. - SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true); - proCoreClient.connect(sa); - proCoreClient.execute(""); // Dubious, not necessarily supported by all protocol versions. - return true; - } catch (ProCoreException e) { - } finally { // We must disconnect or check protocol version. - proCoreClient.disconnect(); - } - } - } catch (Throwable t) { - // Intentionally empty. - } - return false; - } - boolean isAlive() throws ProCoreException { - String lines[] = getFileLines(guardFile, END_LINE); - if (lines.length < 1) - return false; // No lines, assuming server is dead. - checkVersion(lines); - if (lines.length < PID_LINE) - return false; // No process id, assuming server is dead. - else if (lines.length >= END_LINE) - return false; // End status available, assuming server is dead. - else if (lines.length >= EXIT_LINE) - return false; // Exit status available, assuming server is dying. - else if (isRunning(lines[PID_LINE-1])) - return true; // Process with given id running. Server might be alive. - else - return false;// Process with given id not running. - } - } - public static class TailFile { - public static class Data { - Data(long version, long nextChangeSetId, long nextFreeId) { - this.version = version; - this.nextChangeSetId = nextChangeSetId; - this.nextFreeId = nextFreeId; - } - long version; - public long nextChangeSetId; - long nextFreeId; - } - public static void createTailFile(File file, long nextChangeSetId, long nextFreeId, String dbId) throws ProCoreException { - if (file.exists()) - throw new TailException("Tail file exists and thus cannot be created. file=" + file); - FileOutputStream fos; - try { - fos = new FileOutputStream(file); - } catch (FileNotFoundException e) { - throw new TailException("Tail file cannot be created. file=" + file); - } - PrintStream ps = new PrintStream(fos); - try { - ps.println(1); // file version - ps.println(nextChangeSetId); // next revision - ps.println(nextFreeId); // next free cluster - ps.println(dbId); // database id - } finally { - ps.close(); - } - } - public static Data readTailFile(File file) throws ProCoreException { - String lines[] = getFileLines(file, 3); - if (lines.length < 3) - throw new TailReadException("Could not read data from " + file); - long ver = Long.parseLong(lines[0]); - long cs = Long.parseLong(lines[1]); - long id = Long.parseLong(lines[2]); - return new Data(ver, cs, id); - } - } -} -class SessionAddress { - private final InetSocketAddress socketAddress; - private final boolean ignoreProtocolVersion; - public SessionAddress(InetSocketAddress socketAddress, boolean ignoreProtocolVersion) { - assert(null != socketAddress); - this.socketAddress = socketAddress; - this.ignoreProtocolVersion = ignoreProtocolVersion; - } - public InetSocketAddress getSocketAddress() { - return socketAddress; - } - public boolean ignoreProtocolVersion() { - return ignoreProtocolVersion; - } - @Override - public String toString() { - return socketAddress.toString(); - } -} -class PacketQueue { - private final LinkedBlockingQueue packets = new LinkedBlockingQueue(); - PacketQueue() { - } - void addFirst(Packet packet) { - packets.add(packet); - } - Packet takeFirst() throws InterruptedException { - return packets.take(); - } -} -class Header { - Header(int[] ints) { - lastTokenIn = ints[0]; - token = ints[1]; - messageNumber = ints[2]; - inflateSize = ints[3]; - deflateSize = ints[4]; - } - int lastTokenIn; - int token; - int messageNumber; - int inflateSize; - int deflateSize; - @Override - public String toString() { - StringBuilder sb = new StringBuilder(80); - sb.append("lastTokenIn="); - sb.append(lastTokenIn); - sb.append(" token="); - sb.append(token); - sb.append(" messageNumber="); - sb.append(messageNumber); - sb.append(" inflateSize="); - sb.append(inflateSize); - sb.append(" deflateSize="); - sb.append(deflateSize); - return sb.toString(); - } -} -class Packet { - Header header; - ByteBuffer bytes; - Packet(int[] header, ByteBuffer bytes) { - this.header = new Header(header); - this.bytes = bytes; - } -} -class ConnectionThread { - private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS; - private static final int HEADER_N = 5; - private static final int HEADER_SIZE = HEADER_N * 4; - private static final int DEFLATE = 4; - private static final int INCREMENT_SIZE = 8192; - private static int instanceCount = 0; - class Manager implements Runnable { - private final int instance; - private volatile boolean carryOn = true; - private volatile int lastTokenIn = 0; - private volatile MethodQueue methodQueue = new MethodQueue(); - private volatile CountDownLatch running = new CountDownLatch(1); - private final EventHandler defaultEventHandler = new DefaultEventHandler(); -// private EventHandler eventHandler = defaultEventHandler; - Manager() { - instance = ++instanceCount; - } - @Override - public String toString() { - return "Connection " + instance; - } - InetSocketAddress getAddress() { - return connection.getAddress(); - } - boolean isConnected() { - return connection.isConnected(); - } - void reconnect() throws ProCoreException, InterruptedException { - InetSocketAddress a = connection.getAddress(); - if (connection.addressNotInitialized() || 0 == a.getPort()) - throw new ProCoreException("Address not ok. address=" + connection.getAddress()); - connect(a); - } - void connect(int port) throws ProCoreException, InterruptedException { - InetSocketAddress a = connection.getAddress(); - if (connection.addressNotInitialized() || 0 == a.getPort() || (port != 0 && port != a.getPort())) - a = new InetSocketAddress(ProCoreClient.LOCALHOST, port); - connect(a); - } - void connect(InetSocketAddress address) throws ProCoreException, InterruptedException { - if (connection.isConnected()) { - if (connection.equalsAddress(address)) - return; - else - throw new ProCoreException("Already connected to different address. old=" + connection.getAddress() + " new=" + address); - } - connection.init4Connection(address); - connectionManager.connect(connection); -// compression = new Compression(); - if (thread.isAlive()) - return; - thread.start(); - running.await(); - } - void call(Method method) throws ProCoreException, InterruptedException, IOException { -// eventHandler = method.getEventHandler(eventHandler); - connection.sendMethod(methodQueue, method, lastTokenIn); - } - void disconnect() { - methodQueue.close(); - connection.disconnect(); - } - @Override - public void run() { // MARK: CT.Manger.run - try { - running.countDown(); - while (carryOn) { - Packet packet = null; - try { - packet = inputQueue.takeFirst(); - } catch (InterruptedException e) { - Util.trace("Wait for input packet interrupted. this=" + this); - continue; - } - if (null == packet) - break; - handlePacket(packet); - } - if (carryOn) - Util.trace("Thread " + thread.getName() + " stopped with null packet."); - else - Util.trace("Thread " + thread.getName() + " stopped with false carryOn."); - } catch (Throwable t) { - Util.logError("Thread " + thread.getName() + " stopped to exception:", t); - } finally { - disconnect(); - running.countDown(); - } - } - private void handlePacket(Packet packet) throws ProCoreException { - if (packet.header.token > lastTokenIn) { - lastTokenIn = packet.header.token; - if (DEBUG) - Util.showDebug("Setting lastTokeinIn=" + lastTokenIn); - } - Method method = methodQueue.remove(packet.header.token); - if (null == method) { - try { - switch (packet.header.messageNumber) { - default: { - Util.logError("Missing method. token=" + packet.header.token); - } return; - case MessageNumber.ChangeSetUpdateRequest: { - defaultEventHandler.on(new ChangeSetUpdateEvent(packet)); - } break; - } - } catch (Throwable t) { - Util.logError("Event handler failed:", t); - } - return; - } -// else -// eventHandler = method.getEventHandler(defaultEventHandler); - switch (packet.header.messageNumber) { - default: { - String s = "Illegal message number " + packet.header.messageNumber; - method.gotException(s); - } break; - case MessageNumber.ChangeSetUpdateRequest: - method.getEventHandler(defaultEventHandler).on(new ChangeSetUpdateEvent(packet)); - methodQueue.add(method); // Event caused by method. Actual method not yet handled. - break; - case MessageNumber.NullResponse: - //Util.log("Null(ack) message. method=" + method + " packet=" + packet); - break; - case MessageNumber.ExceptionResponse: - method.gotException(packet); - break; // MARK: handlePacket - case MessageNumber.AAAResponse: - case MessageNumber.AskTransactionResponse: - case MessageNumber.CancelCommitResponse: - case MessageNumber.CloseClientSessionResponse: - case MessageNumber.EchoResponse: - case MessageNumber.EndTransactionResponse: - case MessageNumber.ExecuteResponse: - case MessageNumber.GetChangeSetContextResponse: - case MessageNumber.GetChangeSetDataResponse: - case MessageNumber.GetChangeSetsResponse: - case MessageNumber.GetClusterNewResponse: - case MessageNumber.GetRefreshResponse: - case MessageNumber.GetResourceSegmentResponse: - case MessageNumber.GetServerInfoResponse: - case MessageNumber.OpenClientSessionResponse: - case MessageNumber.RefreshResponse: - case MessageNumber.ReserveIdsResponse: - case MessageNumber.UndoResponse: - case MessageNumber.ReconnectResponse: - case MessageNumber.GetRefresh2Response: - case MessageNumber.GetClusterChangesResponse: - case MessageNumber.GetServerInfo2Response: - case MessageNumber.ListClustersResponse: { - method.gotPacket(packet); - } break; - } - } - } - private final ConnectionManager connectionManager; - private final PacketQueue inputQueue = new PacketQueue(); - private final Connection connection = new Connection(this); - private final Manager manager = new Manager(); - private Thread thread = new Thread(manager, manager.toString()); - private int[] header = null; - private int[] INTS = new int[HEADER_N]; - private int remaining = HEADER_SIZE; - private int deflateSize; - ConnectionThread(ConnectionManager connectionManager) { - this.connectionManager = connectionManager; - } - ByteBuffer getInputBuffer() { - return getInputBuffer(INCREMENT_SIZE); - } - ByteBuffer getInputBuffer(int sizeAtLeast) { - ByteBuffer bb = ByteBuffer.allocate(Math.max(sizeAtLeast, INCREMENT_SIZE)); - bb.order(ByteOrder.LITTLE_ENDIAN); - return bb; - } - @Override - public String toString() { - return "header=" + header + " remaining=" + remaining; - } - private String toString(ByteBuffer buffer, int nRead) { - return this + " nRead=" + nRead + " buffer=" + buffer; - } - ByteBuffer handleInput(ByteBuffer buffer, int nRead) // nRead > 0 - throws IOException { // MARK: handleInput - int position = buffer.position(); - if (null == header) { - if (position < HEADER_SIZE) - return buffer; - else { // Got header. - buffer.position(0); - IntBuffer ib = buffer.asIntBuffer(); - ib.get(INTS); - ib = null; - header = INTS; - deflateSize = header[DEFLATE]; - if (deflateSize < 0) - throw new IOException("Illegal deflate size=" + deflateSize); - remaining += deflateSize; - if (remaining > buffer.limit()) { // We need more space. This is not optimal but a simple way to handle this case. - ByteBuffer bb = getInputBuffer(remaining); - buffer.position(0); - buffer.limit(position); - bb.put(buffer); - return bb; - } - buffer.position(position); - } - } - if (position < remaining) - return buffer; // Need more data. - else if (position > remaining) { // Got whole packet and then some. This is not optimal but a simple way to handle this case. - ByteBuffer bb = getInputBuffer(remaining); - buffer.position(0); - buffer.limit(remaining); - bb.put(buffer); - inputQueue.addFirst(new Packet(header, bb)); - ByteBuffer b2 = getInputBuffer(buffer.limit()); - buffer.position(remaining); - buffer.limit(position); - b2.put(buffer); - header = null; - remaining = HEADER_SIZE; - return handleInput(b2, b2.position()); - } else if (position != remaining) - throw new IOException("Assertion error. this=" + toString(buffer, nRead)); - // Got exactly one packet. - inputQueue.addFirst(new Packet(header, buffer)); - header = null; - remaining = HEADER_SIZE; - return getInputBuffer(INCREMENT_SIZE); - } - InetSocketAddress getAddress() { - return manager.getAddress(); - } - boolean isConnected() { - return manager.isConnected(); - } - void reconnect() throws ProCoreException, InterruptedException { - manager.reconnect(); - } - void connect(int port) throws ProCoreException, InterruptedException { - manager.connect(port); - } - void connect(InetSocketAddress address) throws ProCoreException, InterruptedException { - manager.connect(address); - } - void disconnect() { - manager.disconnect(); - } - Connection call(Method m) throws ProCoreException, InterruptedException, IOException { - manager.call(m); - return connection; - } -} -class Client { - private final ProCoreServer proCoreServer; - private final ProCoreClient proCoreClient; - private final ConnectionThread connectionThread; - private boolean closed = false; - Client(ProCoreServer proCoreServer, ProCoreClient proCoreClient, ConnectionThread connectionThread) { - this.proCoreServer = proCoreServer; - this.proCoreClient = proCoreClient; - this.connectionThread = connectionThread; - } - void open() throws ProCoreException, InterruptedException { - synchronized (proCoreClient) { - if (!connectionThread.isConnected()) - connectionThread.connect(proCoreServer.getPort()); - } - closed = false; - } - boolean isOpen() { - synchronized (proCoreClient) { - return connectionThread.isConnected(); - } - } - boolean isClosed() { - synchronized (proCoreClient) { - return closed || !connectionThread.isConnected(); - } - } - void close() { - proCoreClient.deleteClient(this); - closed = true; - } - String execute(String command) throws ProCoreException { - return proCoreClient.execute(command); - } - void call(Method m) throws ProCoreException { - proCoreClient.call(m); - } -} -class ProCoreClient { - public static final String LOCALHOST = "127.0.0.1"; - class Remote { - private final ConnectionManager connectionManager = ConnectionManager.getInstance(); - private final ConnectionThread connectionThread = new ConnectionThread(connectionManager); - Remote() { - } - } - private final Remote remote = new Remote(); - private final Set clients = new HashSet(); - private final ProCoreServer proCoreServer; - public ProCoreClient(ProCoreServer proCoreServer) throws ProCoreException { - this.proCoreServer = proCoreServer; - } - synchronized public int getPort() { - return remote.connectionThread.getAddress().getPort(); - } - synchronized public boolean isConnected() { - return remote.connectionThread.isConnected(); - } - synchronized public Client newClient() throws ProCoreException { - return newClient(proCoreServer, remote.connectionThread.getAddress()); - } - synchronized public Client newClient(ProCoreServer proCoreServer, InetSocketAddress address) throws ProCoreException { - if (!remote.connectionThread.isConnected()) - try { - remote.connectionThread.connect(address); - } catch (InterruptedException e) { - if (!remote.connectionThread.isConnected()) { - remote.connectionThread.disconnect(); - throw new ProCoreException("Failed to create new client."); - } - } - else if (!address.equals(remote.connectionThread.getAddress())) - throw new ProCoreException("Illegal newClient call. Already connected to different address."); - Client client = new Client(proCoreServer, this, remote.connectionThread); - clients.add(client); - return client; - } - synchronized int deleteClient(Client client) { - clients.remove(client); - return clients.size(); - } - void closeClient(int aSessionId) throws ProCoreException { - try { - CloseClientSessionFunction f = new CloseClientSessionFunction(aSessionId); - Method m = new Method(f, null, null); - Connection c = remote.connectionThread.call(m); - m.waitForReply(c); - } catch (InterruptedException e) { - throw new ProCoreException("Thread interrupted during method call.", e); - } catch (IOException e) { - throw new ProCoreException("IOException during method call.", e); - } catch (Throwable e) { - throw new ProCoreException("Throwable during method call.", e); - } - } - synchronized public String execute(String command) throws ProCoreException { - try { - ExecuteFunction f = new ExecuteFunction(command); - Method m = new Method(f, null, null); - Connection c = remote.connectionThread.call(m); - m.waitForReply(c); - return f.out; - } catch (InterruptedException e) { - throw new ProCoreException("Thread interrupted during method call.", e); - } catch (IOException e) { - throw new ProCoreException("IOException during method call.", e); - } catch (Throwable e) { - throw new ProCoreException("Throwable during method call.", e); - } - } - private void recall(Method method, IOException e) throws ProCoreException {// :) - if (remote.connectionThread.isConnected()) - throw new ProCoreException("IOException during method call.", e); - try { - remote.connectionThread.reconnect(); - Connection c = remote.connectionThread.call(method); - method.waitForReply(c); - } catch (Throwable t) { - throw new ProCoreException("IOException during method call. Recall failed.", e); - } - } - synchronized public void call(Method method) throws ProCoreException { - try { - Connection c = remote.connectionThread.call(method); - method.waitForReply(c); - } catch (InterruptedException e) { - throw new ProCoreException("Thread interrupted during method call.", e); - } catch (IOException e) { - recall(method, e); - } catch (ProCoreException e) { - throw e; - } catch (Throwable e) { - throw new ProCoreException("Throwable during method call.", e); - } - } - synchronized public void connect(int port) throws ProCoreException, InterruptedException { - remote.connectionThread.connect(port); - } - synchronized public void connect(SessionAddress sessionAddress) - throws ProCoreException, InterruptedException { - remote.connectionThread.connect(sessionAddress.getSocketAddress()); - } - synchronized public void disconnect() throws ProCoreException { - remote.connectionThread.disconnect(); - } -} -abstract class Event { - final String name; - final Packet packet; - Event(String name, Packet packet) { - this.name = name; - this.packet = packet; - } - int getToken() { - return packet.header.token; - } - void deserialize(org.simantics.db.server.protocol.AbstractEvent event) { - event.deserialize(event.getRequestNumber(), getDataBuffer()); - } - private org.simantics.db.server.protocol.DataBuffer getDataBuffer() { - this.packet.bytes.position(20); - return new org.simantics.db.server.protocol.DataBuffer(this.packet.bytes); - } - @Override public String toString() { - return name + " token=" + packet.header.token; - } -} -class ChangeSetUpdateEvent extends Event { - ChangeSetUpdateEvent(Packet packet) { - super("ChangeSetUpdateEvent", packet); - } -} -abstract class EventHandler { - abstract void on(Event event); -} -class DefaultEventHandler extends EventHandler { - void on(Event event) { - Util.log("Missing event handler. event=" + event); - } -} +/******************************************************************************* + + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.db.server.internal; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintStream; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.IntBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +import org.simantics.db.common.utils.Logger; +import org.simantics.db.server.DatabaseLastExitException; +import org.simantics.db.server.GuardFileVersionException; +import org.simantics.db.server.ProCoreException; +import org.simantics.db.server.ServerNotFoundException; +import org.simantics.db.server.protocol.CloseClientSessionFunction; +import org.simantics.db.server.protocol.ExecuteFunction; +import org.simantics.db.server.protocol.MessageNumber; +import org.slf4j.LoggerFactory; + +public class ProCoreServer { + private static Map workingDirs = new HashMap(); + private static Map databaseIds = new HashMap(); + private static final String LOCALHOST = ProCoreClient.LOCALHOST; + public static final int VERSION_MAJOR = 1; + public static final int VERSION_MINOR = 4; // 2015-05-05 + public static final String BRANCH_DIR = "procore.headClusters.procore"; + public static final String TAIL_DIR = "procore.tailClusters.procore"; + public static final String TAIL_FILE = "procore.tail.procore"; + public static final String CONFIG_FILE = "procore.config.procore"; + public static final String GUARD_FILE = "procore.guard.procore"; + public static final String JOURNAL_FILE = "procore.journal.procore"; + public static final String LOG_FILE = "procore.log.procore"; + public static final String DCS_FILE = "procore.dumpChangeSets.procore"; + public static final String RECOVERY_NEEDED_FILE = "recovery.needed"; + public static final String RECOVERY_IGNORED_FILE = "recovery.ignored"; + public static final String PROTOCOL_IGNORED_FILE = "protocol.ignored"; + public static final String PAGE_FILE_PATTERN = "procore.page*.procore"; + public static final String DATA_PREFIX = "ClusterData."; + public static final String DATA_PATTERN = DATA_PREFIX + "*"; + public static final String INDEX_PREFIX = "ClusterIndex."; + public static final String INDEX_PATTERN = INDEX_PREFIX + "*"; + public static final String VALUE_PREFIX = "ExternalValue."; + public static final String VALUE_PATTERN = VALUE_PREFIX + "*"; + public static final String VALUE_SUFFIX = ".procore"; + public static final String DELETED_PREFIX = "ClusterDeleted."; + public static final String DELETED_PATTERN = DELETED_PREFIX + "*"; + private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS; + public static final String QUIT_CMD = "quit"; +// private static final int SHORT_SLEEP_COUNT = 1; +// private static final int LONG_SLEEP_COUNT = 10; +// static public void stopProCoreServer(File workingDir) throws ProCoreException { +// GuardFile guardFile = new GuardFile(workingDir); +// guardFile.stopProCoreServer(); +// } + private static boolean isRunning(String aPid) throws ProCoreException { + boolean isRunning = false; + try { + switch (OSType.calculate()) { + case WINDOWS: { + Process p = Runtime.getRuntime().exec("tasklist.exe /fo csv /nh /fi \"pid eq " + aPid + "\""); + BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); + String line = input.readLine(); + if (line.matches(".*ProCoreServer.exe.*")) + isRunning = true; + input.close(); + break; + } + case UNIX: { + Process p = Runtime.getRuntime().exec("ps -p " + aPid + " -o comm="); + BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); + String line = input.readLine(); + if (line.matches(".*ProCoreServer.*")) + isRunning = true; + input.close(); + break; + } + default: + } + } catch (Exception e) { + throw new ProCoreException("Could not get list of running processes.", e); + } + return isRunning; + } + static class ProCoreProcess { + private enum State { Created, Initialized, ThreadStarted, ProcessStarted, Prompted, GotPort, ThreadStopped }; + class Manager implements Runnable { + private State state = State.Created; + private CountDownLatch serverPrompted; + private LinkedBlockingQueue reply; + private volatile String command; + private volatile Integer exitValue; + private volatile boolean carryOn; + private volatile int port; + private final File dbFolder; // Just for exception i.e. error reporting. + Manager(File dbFolder) { + this.dbFolder = dbFolder; + init(); + } + private void init() { + serverPrompted = new CountDownLatch(1); + reply = new LinkedBlockingQueue(); + exitValue = null; + carryOn = true; + port = 0; + state = State.Initialized; + } + int getPort() { + return port; + } + void tryToStart() throws ProCoreException, InterruptedException { + if (thread.isAlive()) + return; + thread = new Thread(manager, manager.initAndGetName()); + thread.start(); + serverPrompted.await(); + if (!state.equals(State.Prompted)) + throw new DatabaseLastExitException(dbFolder, "Server did not prompt."); + String reply = giveCommand("print port"); + try { + port = Integer.parseInt(reply); + } catch (NumberFormatException e) { + if (null != exitValue && 0 != exitValue) + throw new DatabaseLastExitException(dbFolder, "Server did not prompt."); + if (reply.equals("End of server thread.")) + throw new ProCoreException("Server did not start (did not prompt). state=" + state); + else + throw new ProCoreException("Server did not start. Could not parse port. reply=" + reply); + } + state = State.GotPort; + } + String giveCommand(String command) throws ProCoreException, InterruptedException { + if (!thread.isAlive()) + throw new ProCoreException("Server thread not alive."); + if (null != this.command) + throw new ProCoreException("Old command pending."); + this.command = command; + return reply.take(); + } + private String initAndGetName() { + init(); + return "ProCoreProcess " + ++instanceCount + " " + builder.directory(); + } + boolean isActive() throws ProCoreException { + if (null == exitValue && thread.isAlive()) + return true; + else + return false; + } + boolean tryToStop() throws ProCoreException { + if (!thread.isAlive()) + return true; + carryOn = false; + thread.interrupt(); + try { + thread.join(10000); // milliseconds + } catch (InterruptedException e) { + } + return !thread.isAlive(); + } + @Override + public void run() { + state = State.ThreadStarted; + Util.log("ProCoreServer thread started."); + try { + Util.trace("ProCoreProcessManager start."); + Process process = builder.start(); + exitValue = exitValue(process); + if (null != exitValue) + Util.logError("Server process did not start."); + else { + state = State.ProcessStarted; + runProcessStarted(process); // Process start + } + } catch (IOException e) { + Util.logError("Failed during process watch."); + } finally { + reply.add("End of server thread."); + Util.trace("ProCoreServerThread stop."); + state = State.ThreadStopped; + serverPrompted.countDown(); + } + } + private void runProcessStarted(Process process) throws IOException { + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));) + { + do { + try { + if (reader.ready()) + handleLine(reader.readLine(), reader, writer); + else if (null != command) { + writer.write(command); + command = null; + writer.newLine(); + writer.flush(); + String line = getReply(reader); + reply.add(line); + } else + Thread.sleep(100); // milliseconds + } catch (InterruptedException e) { + Util.trace("ProCoreServerThread interrupted."); + } + exitValue = exitValue(process); + if (null == exitValue && !carryOn) + try { + try { + writer.write(QUIT_CMD); + writer.newLine(); + writer.flush(); + } catch (Throwable t) { + Util.logError("Wait for process death was interrupted.", t); + } finally { + } + exitValue = process.waitFor(); + } catch (InterruptedException e) { + Util.logError("Wait for process death was interrupted.", e); + carryOn = true; + continue; + } + } while (null == exitValue); + } // End of resource try. + } + private String getReply(BufferedReader reader) throws IOException { + StringBuilder sb = new StringBuilder(); + while (true) { + String line = reader.readLine(); + if (null == line || line.startsWith("Waiting for input?")) + break; + sb.append(line); + } + return sb.toString(); + } + private void handleLine(String line, BufferedReader reader, BufferedWriter writer) throws IOException { + if (DEBUG) + Util.showDebug(line); + if (line.startsWith("Main end.")) { + carryOn = false; // Last line from server. + } else if (serverPrompted.getCount() > 0 && line.startsWith("Waiting for input?")) { + state = State.Prompted; + serverPrompted.countDown(); // Server has been started and is waiting for console input. + } else + Util.trace("From server. line=" + line); + } + private Integer exitValue(Process process) { + try { + return process.exitValue(); + } catch (IllegalThreadStateException e) { + return null; // Still alive. + } + } + } + private static int instanceCount; // Keeps count of thread instances. + private final ProcessBuilder builder; + private final Manager manager; + private Thread thread; + ProCoreProcess(File exeFile, File workingDir) { + String[] args = new String[0]; + String[] cmd = new String[1 + args.length]; + cmd[0] = exeFile.getAbsolutePath().toString(); + System.arraycopy(args, 0, cmd, 1, args.length); + builder = new ProcessBuilder(cmd); + builder.redirectErrorStream(true); + builder.directory(workingDir); + manager = new Manager(workingDir); + thread = new Thread(manager, manager.initAndGetName()); + } + public void tryToStart() throws ProCoreException, InterruptedException { + manager.tryToStart(); + } +// public int exitValue() throws IllegalStateException { +// Integer i = proCoreServerThread.getExitValue(); +// if (null == i) +// throw new IllegalStateException("ProCoreServerThread is still running."); +// return i; +// } +// public boolean ignoreProtocolVersion() { +// File wd = builder.directory(); +// if (null == wd) +// return false; +// Path ignore = wd.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE); +// if (Files.exists(ignore)) +// return true; +// return false; +// } + public boolean isActive() throws ProCoreException { + return manager.isActive(); + } + public boolean tryToStop() throws ProCoreException { + return manager.tryToStop(); + } + int getPort() throws ProCoreException { + return manager.getPort(); + } + } + private static enum OSType { + UNIX, WINDOWS, UNKNOWN; + + public static OSType calculate() { + String osName = System.getProperty("os.name"); + assert osName != null; + osName = osName.toLowerCase(); + if (osName.startsWith("windows")) + return WINDOWS; + if (osName.startsWith("mac os x") || osName.startsWith("linux") || osName.startsWith("sun")) + return UNIX; + return UNKNOWN; + } + } + static public boolean ignoreProtocolVersion(File dbFolder) { + if (null == dbFolder) + return false; + if (Files.exists(dbFolder.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE))) + return true; + return false; + } + static public ProCoreServer getProCoreServer(File serverDir, final File workingDir) + throws ProCoreException, ServerNotFoundException { + try { + ProCoreServer proCoreServer; + synchronized (workingDirs) { + proCoreServer = workingDirs.get(workingDir.getCanonicalPath()); + if (null != proCoreServer) + return proCoreServer; + if (!serverDir.isDirectory()) + throw new ProCoreException("Expected server directory as argument"); + if (!workingDir.isDirectory()) + throw new ProCoreException("Expected working directory as argument. wd=" + workingDir.getAbsolutePath()); + String exeSuffix = Activator.getExeSuffix(); + File executable = new File(serverDir, "ProCoreServer" + exeSuffix); + proCoreServer = new ProCoreServer(executable, workingDir); + workingDirs.put(workingDir.getCanonicalPath(), proCoreServer); + return proCoreServer; + } + } catch (IOException e) { + throw new ProCoreException("IOException", e); + } + } + + public static void logVersionInformation() { + StringBuilder msg = new StringBuilder(200); + String nl = " \n"; // Default log error removes newlines so I add space to make the text easier to read. + msg.append("ProCore version information:"); + msg.append(nl); + msg.append("ProCoreServer version=" + ProCoreServer.VERSION_MAJOR + "." + ProCoreServer.VERSION_MINOR + " protocol=" + MessageNumber.ProtocolVersionMajor + "." + MessageNumber.ProtocolVersionMinor); + try { + msg.append(" folder=" + org.simantics.db.server.internal.Activator.getServerFolder()); + } catch (IOException e) { + msg.append(" folder information not available"); + } + msg.append(nl); + msg.append("svn id=$Id: ProCoreServer.java r31684 2015-09-10 13:45:00Z $"); + String str = msg.toString(); + LoggerFactory.getLogger(ProCoreServer.class).info(str); + } + private static String[] getFileLines(final File aFile, int maxLines) { + final String[] STRING_EMPTY_ARRAY = new String[0]; + try { + if (!aFile.canRead()) { + return STRING_EMPTY_ARRAY; + } + try (FileInputStream fis = new FileInputStream(aFile); + InputStreamReader isr = new InputStreamReader(fis, "US-ASCII"); + BufferedReader br = new BufferedReader(isr);){ + String line = null; + int count = 0; + ArrayList lines = new ArrayList(); + while ((line = br.readLine()) != null) { + if (++count > maxLines) + break; // Caller not interested in more lines. + lines.add(line); + } + return lines.toArray(STRING_EMPTY_ARRAY); + } catch (FileNotFoundException e) { + return STRING_EMPTY_ARRAY; + } + } catch (Throwable t) { + if (DEBUG) + t.printStackTrace(); + Logger.defaultLogError(t); + } + return new String[0]; + } + private static int parseInt(String s) + throws ProCoreException { + try { + return Integer.parseInt(s); + } catch (NumberFormatException e) { + String msg = "Could not convert string to number. string=" + s; + Logger.defaultLogError(msg); + throw new ProCoreException(msg, e); + } + } + private String databaseId = null; + private final GuardFile guardFile; + private final ProCoreClient proCoreClient; + ProCoreClient getProCoreClient() { + return proCoreClient; + } + private final ProCoreProcess proCoreProcess; + private ProCoreServer(File exeFile, File workingDir) + throws ProCoreException { + if (null == exeFile) + throw new ProCoreException("Illegal argument: exeFile is null."); + if (null == workingDir) + throw new ProCoreException("Illegal argument: workingDir is null."); + guardFile = new GuardFile(workingDir); + proCoreClient = new ProCoreClient(this); + proCoreProcess = new ProCoreProcess(exeFile, workingDir); + Util.log("ProCoreServer exe: " + exeFile); + Util.log("ProCoreServer folder: " + workingDir); + logVersionInformation(); + } +// //@Override + public synchronized String execute(String command) throws ProCoreException, InterruptedException { + return proCoreClient.execute(command); + } + private void startInternal() + throws ProCoreException { + if (proCoreClient.isConnected()) + return; + Exception exception; + try { + proCoreProcess.tryToStart(); + int port = proCoreProcess.getPort(); + SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true); + proCoreClient.connect(sa); + return; + } catch (InterruptedException e) { + Util.logError("ProCoreProcess start was interrupted.", e); + exception = e; + } catch (DatabaseLastExitException e) { + throw e; // This situation can be analyzed and handled by start handler. + } catch (ProCoreException e) { + Util.logError("Failed to start ProCoreServer process.", e); + exception = e; + } + boolean isActive = guardFile.isActive(proCoreClient); + if (!isActive) + throw new ProCoreException("Failed to connect to ProCoreServer.", exception); + } +// private int getWaitCount() throws ProCoreException { +// if (!proCoreProcess.isActive()) +// return SHORT_SLEEP_COUNT; // Short wait because server process is not running. +// else +// return LONG_SLEEP_COUNT; +// } +// private void startInternalReinit(int exitStatus) +// throws ProCoreException { +// switch (exitStatus) { +// default: // What has happened here? +// throw new DatabaseStartException(workingDir, getExitMessage(exitStatus)); +// case -1: // Exit status not available, not sure what has happened here. +// case 29: // Server died because it could not read exit status from guard file. +// case 30: // Server died because old exit status was not zero. +// case 40: // Server died because it received terminating signal. +// case 42: // Server died because it could not read end status from guard file. +// case 43: // Server died because end value was not zero. +// case 44: // Server died because server could not read version from guard file. +// case 45: // Server died because server did not support guard file version. +// case 47: // Server died because server could not listen to any port. +// break; +// case 32: // Server died becuase database version did not match. +// throw new DatabaseVersionException(workingDir, getExitMessage(exitStatus)); +// case 33: // Server died because database has been corrupted. +// throw new DatabaseCorruptedException(workingDir, getExitMessage(exitStatus)); +// case 39: // Server died because last exit was not clean. +// throw new DatabaseLastExitException(workingDir, getExitMessage(exitStatus)); +// } +// boolean dead = guardFile.wait4ServerToDie(proCoreClient, getWaitCount()); +// if (!dead) { +// StringBuilder sb = new StringBuilder(256); +// sb.append("ProCoreServer is running according to guard file. Giving up trying to start a new instance. Server exit message:\n"); +// sb.append(getExitMessage(exitStatus)); +// throw new DatabaseStartException(workingDir, sb.toString()); +// } +// boolean deleted = guardFile.delete(); +// if (!deleted) +// Logger.defaultLogError("ProCoreServer is dead but can not delete guard file. " + getExitMessage(exitStatus)); +// } + //@Override + public synchronized boolean isActive() + throws ProCoreException { + if (!proCoreClient.isConnected()) + return guardFile.isActive(proCoreClient); + proCoreClient.execute(""); + return true; + } + public synchronized boolean isAlive() + throws ProCoreException { + if (proCoreClient.isConnected()) + return true; + if (proCoreProcess.isActive()) + return true; + return guardFile.isAlive(); + } + Client newClient() throws ProCoreException { + if (proCoreClient.isConnected()) + return proCoreClient.newClient(); + int port = getPort(); + InetSocketAddress isa = new InetSocketAddress(LOCALHOST, port); + return proCoreClient.newClient(this, isa); + } + public synchronized boolean tryToStop() + throws ProCoreException { + synchronized (proCoreClient) { + ProCoreException ex = null; + if (!proCoreClient.isConnected() && !proCoreProcess.isActive()) { + try { + proCoreClient.connect(getPort()); + } catch (InterruptedException e) { + } + } + if (proCoreClient.isConnected()) { + String s = ""; + try { + s = proCoreClient.execute(QUIT_CMD); + String t = s.replaceAll("\n", ""); + int n = Integer.parseInt(t); + if (n != 1) + Util.log("Number of connections after quit command is " + n + "."); + proCoreClient.closeClient(0); + if (!isLocal() && n > 1) + throw new ProCoreException("More than one connection for remote. n=" + n); + } catch (NumberFormatException e) { + Util.logError("Failed to parse number of connections. s=" + s, e); + } catch (ProCoreException e) { + ex = e; + } finally { + proCoreClient.disconnect(); + } + } + try { + proCoreProcess.tryToStop(); + } finally { + if (proCoreProcess.isActive() && null == ex) + ex = new ProCoreException("Failed to stop ProCoreProcess."); + } + if (null != ex) + throw ex; + return !proCoreClient.isConnected() && !proCoreProcess.isActive(); + } + } + public synchronized boolean isConnected() + throws ProCoreException { + return proCoreClient.isConnected(); + } + public synchronized boolean isLocal() + throws ProCoreException { + return proCoreClient.isConnected() && proCoreProcess.isActive(); + } + public synchronized void disconnect() + throws ProCoreException { + proCoreClient.disconnect(); + } + public synchronized void connect() + throws ProCoreException, InterruptedException { + int port = getPort(); + if (0 == port) + throw new ProCoreException("Port 0 not supported as connection address."); + proCoreClient.connect(port); + } + private String getExitMessage(int exitValue) { + switch (exitValue) { + default: + return "Unexpected exit value = " + exitValue + "."; + case 1: + return "Exception was thrown. This indicates problems with program logic."; + case 2: + return "CallException was thrown. This indicates problems with calls to/from parent server."; + case 3: + return "ProtocolException was thrown. This indicates problems with executable versions between local and parent servers."; + case 4: + return "NoSuchElementException was thrown. This indicates problems with program logic."; + case 5: + return "IllegalArgumentException was thrown. This indicates problems with program logic."; + case 6: + return "IllegalResourceException was thrown. This indicates problems with server data."; + case 7: + return "IllegalClusterException was thrown. This indicates problems with server data."; + case 8: + return "ParseException was thrown. This indicates problems with/during parsing of the configuration file."; + case 9: + return "PortSemaphoreException was thrown. This indicates that some other server is using the same port number."; + case 10: + return "DatabaseSemaphoreException was thrown. This indicates that some other server is using the same database id."; + case 11: + return "SystemException was thrown. This indicates problems with program logic."; + case 12: + return "CppException was thrown. This indicates problems with program logic."; + case 13: + return "UnknownException was thrown. This indicates problems with program logic."; + case 14: + return "ThrowException was thrown. This indicates problems with program logic."; + case 15: + return "AssertException was thrown. This indicates problems with program logic."; + case 16: + return "ParentCheckException was thrown. This indicates problems with the database version of parent."; + case 17: + return "ParentConnectionException was thrown. This indicates problems with parent child relationship."; + case 18: + return "ConnectionException was thrown. This indicates problems with TCP/IP connections."; + case 19: + return "ProxyException was thrown. This indicates problems with TCP/IP connections to proxy."; + case 20: + return "NoSuchFileException was thrown. This indicates problems with a missing file."; + case 21: + return "JournalException was thrown. This indicates problems with journal mechanisim."; + case 22: + return "OutOfSpaceException was thrown. This indicates problems with memory."; + case 23: + return "ExternalValueException was thrown. This indicates problems with large value handling."; + case 24: + return "IllegalTransactionException was thrown. This indicates problems with transaction logic."; + case 25: + return "WriteTransactionException was thrown. This indicates that server failed during write transaction."; + case 26: + return "ConsoleException was thrown. This indicates that server console was closed without quit command."; + case 27: + return "ExitException was thrown. This indicates that server did not exit cleanly."; + case 28: + return "ExitReadPIdException was thrown. This indicates that server did not exit cleanly."; + case 29: + return "ExitReadStatusException was thrown. This indicates that server could not read exit status from guard file."; + case 30: + return "ExitStatusValueException was thrown. This indicates that server did not exit cleanly."; + case 31: + return "UpdateException was thrown. This indicates that server failed during update."; + case 32: + return "DatabaseVersionException was thrown. This indicates that server can not use given database."; + case 33: + return "DatabaseCorruptionException was thrown. This indicates that server has detected database corruption."; + case 34: + return "ExitReadPortException was thrown. This indicates that server did not start or exit cleanly."; + case 35: + return "ExitGuardOpenException was thrown. This indicates that server could not open guard file."; + case 36: + return "ExitGuardOpenWriteException was thrown. This indicates that server could not open guard file for writing."; + case 37: + return "ExitGuardWritePidException was thrown. This indicates that server could not write pid to guard file."; + case 38: + return "ExitGuardWritePortException was thrown. This indicates that server could not write port to guard file."; + case 39: + return "LastCommitNotCleanException was thrown. This indicates that client died during transaction. Recovery action needed."; + case 40: + return "TerminatedException was thrown. This indicates that server died because it received terminating signal."; + case 41: + return "ExitGuardWriteStatusException was thrown. This indicates that server could not write exit status to guard file."; + case 42: + return "EndReadStatusException was thrown. This indicates that server died because it could not read end status from guard file."; + case 43: + return "EndStatusValueException was thrown. This indicates that server died because end value was not zero."; + case 44: + return "ExitGuardReadVersionException was thrown. This indicates that server died because server could not read version from guard file."; + case 45: + return "ExitGuardValueVersionException was thrown. This indicates that server died because server did not support guard file version."; + case 46: + return "ExitGuardWriteVersionException was thrown. This indicates that server died because server could not write version to guard file."; + case 47: + return "ListenPortException was thrown. This indicates that server died because server could not listen to given port or any other port."; + case 48: + return "JournalVersionException was thrown. This indicates that database was written by older version."; + case 49: + return "TailFileException was thrown. This indicates problems with handling of data involved with purge operation."; + } + } + synchronized void start() + throws ProCoreException { + try { + startInternal(); + } catch (DatabaseLastExitException e) { + throw e; // This can be analyzed and handled by start handler. + } catch (Throwable t) { + ProCoreException pe = null; + if (t instanceof ProCoreException) + pe = (ProCoreException)t; + else { + String msg = "ProCoreServer startup failed." + + " Automatic rcovery handling not implemented." + + " Database must be fixed manually."; + pe = new ProCoreException(msg, t); + } + if (null == pe.getDbFolder()) + pe.setDbFolder(proCoreProcess.builder.directory()); + } + } + synchronized void stop() + throws ProCoreException, InterruptedException{ + if (null != databaseId) + databaseIds.remove(databaseId); + boolean dead = tryToStop(); + if (!dead) + throw new ProCoreException("Failed to stop ProCoreServer."); + } + public synchronized String getExitMessage() + throws ProCoreException { + Integer returnValue = guardFile.getExitValue(); + if (null == returnValue) + throw new ProCoreException("Exit message not available."); + return getExitMessage(returnValue); + } + //@Override + public synchronized ServerAddress getAddress() + throws ProCoreException { + return new ServerAddress(LOCALHOST, getPort()); + } + //@Override + public synchronized int getPort() + throws ProCoreException { + int port = 0; + if (proCoreClient.isConnected()) { + port = proCoreClient.getPort(); + if (0 != port) + return port; + } + if (proCoreProcess.isActive()) { + port = proCoreProcess.getPort(); + if (0 != port) + return port; + } + return guardFile.getPort(); + } + public synchronized TailFile.Data getTailData() + throws ProCoreException { + Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.TAIL_DIR); + return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile()); + } +// public synchronized TailFile.Data getTailDataFromHead() +// throws ProCoreException { +// Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.BRANCH_DIR); +// return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile()); +// } + public static class GuardFile { + public static final String GUARD_FILE = ProCoreServer.GUARD_FILE; + private static final int VERSION_LINE = 1; + private static final int PID_LINE = 2; + private static final int PORT_LINE = 3; + private static final int EXIT_LINE = 4; + private static final int END_LINE = 5; + private final File guardFile; + GuardFile(File workingDir) { + guardFile = new File(workingDir, GUARD_FILE); + } + int getPort() throws ProCoreException { + String[] lines = getFileLines(guardFile, END_LINE); + if (lines.length < PORT_LINE) + throw new ProCoreException("Server port is not available. " + guardFile.getAbsolutePath()); + return parseInt(lines[PORT_LINE-1]); + } + private void checkVersion(String[] lines) throws ProCoreException { + if (lines.length < VERSION_LINE) + throw new ProCoreException("Guard file version is not available. " + guardFile.getAbsolutePath()); + int version = parseInt(lines[VERSION_LINE-1]); + if (1 != version) + throw new GuardFileVersionException(guardFile, "Unsupported guard file version. version=" + version); + } + boolean delete() { + if (guardFile.exists()) + return guardFile.delete(); + else + return true; // Guard file is deleted i.e. does not exit. + } + void deleteLog() { + if (!guardFile.exists()) + return; + try { + Files.delete(guardFile.toPath()); + } catch (Throwable t) { + t.printStackTrace(); + } + } + Integer getExitValue() { + String lines[] = getFileLines(guardFile, END_LINE); + try { + checkVersion(lines); + if (lines.length >= EXIT_LINE) + return parseInt(lines[EXIT_LINE-1]); + else if (lines.length >= PID_LINE && !isRunning(lines[PID_LINE-1])) + return -1; // Server has died without writing exit status. + return null; // Server might be running. + } catch (Exception e) { + Logger.defaultLogError("Could not get ProCoreServer exit value.", e); + return null; // Server might be running. + } + } +// void stopProCoreServer() throws ProCoreException { +// if (!guardFile.exists()) +// throw new ServerGuardNotFoundException(guardFile.getParentFile(), "Guard file does not exist."); +// String lines[] = new String[0]; +// for (int i=0; i= END_LINE) +// return; // Server is dead according to guard file. +// if (!isRunning(lines[PID_LINE-1])) +// return; // ProCoreServer is not running. +// if (lines.length >= PORT_LINE) // ProCoreServer is running according to pid. Either it's slow or pid is for some other file. +// break; +// } +// if (lines.length < PORT_LINE) +// throw new ProCoreException("Guard file does not contain port."); +// try { +// int port = parseInt(lines[PORT_LINE-1]); +// if (port > 0) { +// ProCoreClient lProCoreClient = new ProCoreClient(); +// SessionAddress sa = new SessionAddressImpl(new InetSocketAddress(LOCALHOST, port), true); +// lProCoreClient.connect(sa); +// try { +// String reply = lProCoreClient.execute(QUIT_CMD); +// int nConnect = Integer.parseInt(reply); +// if (nConnect > 1) +// throw new ProCoreException("More than one active connection in server while giving quit command. folder=" + guardFile.getParent()); +// } finally { +// lProCoreClient.disconnect(); +// } +// } +// } catch (Throwable t) { +// throw new ProCoreException("Failed to stop procore server cleanly.", t); +// } +// // Managed to send quit command to server, let's wait and see if it has any effect. +// boolean dead = wait4ServerToDie(null, LONG_SLEEP_COUNT); +// if (!dead) +// throw new ProCoreException("Unable to stop server."); +// } + boolean isActive(ProCoreClient proCoreClient) throws ProCoreException { + if (proCoreClient.isConnected()) + throw new ProCoreException("Illegal argument. ProCoreClient must not be connected."); + String lines[] = getFileLines(guardFile, END_LINE); + if (lines.length != PORT_LINE) + return false; + checkVersion(lines); + try { + int port = parseInt(lines[PORT_LINE-1]); + if (port > 0) { + try { + // Checking if core is responsive. + // This should be supported regardless of version. + SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true); + proCoreClient.connect(sa); + proCoreClient.execute(""); // Dubious, not necessarily supported by all protocol versions. + return true; + } catch (ProCoreException e) { + } finally { // We must disconnect or check protocol version. + proCoreClient.disconnect(); + } + } + } catch (Throwable t) { + // Intentionally empty. + } + return false; + } + boolean isAlive() throws ProCoreException { + String lines[] = getFileLines(guardFile, END_LINE); + if (lines.length < 1) + return false; // No lines, assuming server is dead. + checkVersion(lines); + if (lines.length < PID_LINE) + return false; // No process id, assuming server is dead. + else if (lines.length >= END_LINE) + return false; // End status available, assuming server is dead. + else if (lines.length >= EXIT_LINE) + return false; // Exit status available, assuming server is dying. + else if (isRunning(lines[PID_LINE-1])) + return true; // Process with given id running. Server might be alive. + else + return false;// Process with given id not running. + } + } + public static class TailFile { + public static class Data { + Data(long version, long nextChangeSetId, long nextFreeId) { + this.version = version; + this.nextChangeSetId = nextChangeSetId; + this.nextFreeId = nextFreeId; + } + long version; + public long nextChangeSetId; + long nextFreeId; + } + public static void createTailFile(File file, long nextChangeSetId, long nextFreeId, String dbId) throws ProCoreException { + if (file.exists()) + throw new TailException("Tail file exists and thus cannot be created. file=" + file); + FileOutputStream fos; + try { + fos = new FileOutputStream(file); + } catch (FileNotFoundException e) { + throw new TailException("Tail file cannot be created. file=" + file); + } + PrintStream ps = new PrintStream(fos); + try { + ps.println(1); // file version + ps.println(nextChangeSetId); // next revision + ps.println(nextFreeId); // next free cluster + ps.println(dbId); // database id + } finally { + ps.close(); + } + } + public static Data readTailFile(File file) throws ProCoreException { + String lines[] = getFileLines(file, 3); + if (lines.length < 3) + throw new TailReadException("Could not read data from " + file); + long ver = Long.parseLong(lines[0]); + long cs = Long.parseLong(lines[1]); + long id = Long.parseLong(lines[2]); + return new Data(ver, cs, id); + } + } +} +class SessionAddress { + private final InetSocketAddress socketAddress; + private final boolean ignoreProtocolVersion; + public SessionAddress(InetSocketAddress socketAddress, boolean ignoreProtocolVersion) { + assert(null != socketAddress); + this.socketAddress = socketAddress; + this.ignoreProtocolVersion = ignoreProtocolVersion; + } + public InetSocketAddress getSocketAddress() { + return socketAddress; + } + public boolean ignoreProtocolVersion() { + return ignoreProtocolVersion; + } + @Override + public String toString() { + return socketAddress.toString(); + } +} +class PacketQueue { + private final LinkedBlockingQueue packets = new LinkedBlockingQueue(); + PacketQueue() { + } + void addFirst(Packet packet) { + packets.add(packet); + } + Packet takeFirst() throws InterruptedException { + return packets.take(); + } +} +class Header { + Header(int[] ints) { + lastTokenIn = ints[0]; + token = ints[1]; + messageNumber = ints[2]; + inflateSize = ints[3]; + deflateSize = ints[4]; + } + int lastTokenIn; + int token; + int messageNumber; + int inflateSize; + int deflateSize; + @Override + public String toString() { + StringBuilder sb = new StringBuilder(80); + sb.append("lastTokenIn="); + sb.append(lastTokenIn); + sb.append(" token="); + sb.append(token); + sb.append(" messageNumber="); + sb.append(messageNumber); + sb.append(" inflateSize="); + sb.append(inflateSize); + sb.append(" deflateSize="); + sb.append(deflateSize); + return sb.toString(); + } +} +class Packet { + Header header; + ByteBuffer bytes; + Packet(int[] header, ByteBuffer bytes) { + this.header = new Header(header); + this.bytes = bytes; + } +} +class ConnectionThread { + private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS; + private static final int HEADER_N = 5; + private static final int HEADER_SIZE = HEADER_N * 4; + private static final int DEFLATE = 4; + private static final int INCREMENT_SIZE = 8192; + private static int instanceCount = 0; + class Manager implements Runnable { + private final int instance; + private volatile boolean carryOn = true; + private volatile int lastTokenIn = 0; + private volatile MethodQueue methodQueue = new MethodQueue(); + private volatile CountDownLatch running = new CountDownLatch(1); + private final EventHandler defaultEventHandler = new DefaultEventHandler(); +// private EventHandler eventHandler = defaultEventHandler; + Manager() { + instance = ++instanceCount; + } + @Override + public String toString() { + return "Connection " + instance; + } + InetSocketAddress getAddress() { + return connection.getAddress(); + } + boolean isConnected() { + return connection.isConnected(); + } + void reconnect() throws ProCoreException, InterruptedException { + InetSocketAddress a = connection.getAddress(); + if (connection.addressNotInitialized() || 0 == a.getPort()) + throw new ProCoreException("Address not ok. address=" + connection.getAddress()); + connect(a); + } + void connect(int port) throws ProCoreException, InterruptedException { + InetSocketAddress a = connection.getAddress(); + if (connection.addressNotInitialized() || 0 == a.getPort() || (port != 0 && port != a.getPort())) + a = new InetSocketAddress(ProCoreClient.LOCALHOST, port); + connect(a); + } + void connect(InetSocketAddress address) throws ProCoreException, InterruptedException { + if (connection.isConnected()) { + if (connection.equalsAddress(address)) + return; + else + throw new ProCoreException("Already connected to different address. old=" + connection.getAddress() + " new=" + address); + } + connection.init4Connection(address); + connectionManager.connect(connection); +// compression = new Compression(); + if (thread.isAlive()) + return; + thread.start(); + running.await(); + } + void call(Method method) throws ProCoreException, InterruptedException, IOException { +// eventHandler = method.getEventHandler(eventHandler); + connection.sendMethod(methodQueue, method, lastTokenIn); + } + void disconnect() { + methodQueue.close(); + connection.disconnect(); + } + @Override + public void run() { // MARK: CT.Manger.run + try { + running.countDown(); + while (carryOn) { + Packet packet = null; + try { + packet = inputQueue.takeFirst(); + } catch (InterruptedException e) { + Util.trace("Wait for input packet interrupted. this=" + this); + continue; + } + if (null == packet) + break; + handlePacket(packet); + } + if (carryOn) + Util.trace("Thread " + thread.getName() + " stopped with null packet."); + else + Util.trace("Thread " + thread.getName() + " stopped with false carryOn."); + } catch (Throwable t) { + Util.logError("Thread " + thread.getName() + " stopped to exception:", t); + } finally { + disconnect(); + running.countDown(); + } + } + private void handlePacket(Packet packet) throws ProCoreException { + if (packet.header.token > lastTokenIn) { + lastTokenIn = packet.header.token; + if (DEBUG) + Util.showDebug("Setting lastTokeinIn=" + lastTokenIn); + } + Method method = methodQueue.remove(packet.header.token); + if (null == method) { + try { + switch (packet.header.messageNumber) { + default: { + Util.logError("Missing method. token=" + packet.header.token); + } return; + case MessageNumber.ChangeSetUpdateRequest: { + defaultEventHandler.on(new ChangeSetUpdateEvent(packet)); + } break; + } + } catch (Throwable t) { + Util.logError("Event handler failed:", t); + } + return; + } +// else +// eventHandler = method.getEventHandler(defaultEventHandler); + switch (packet.header.messageNumber) { + default: { + String s = "Illegal message number " + packet.header.messageNumber; + method.gotException(s); + } break; + case MessageNumber.ChangeSetUpdateRequest: + method.getEventHandler(defaultEventHandler).on(new ChangeSetUpdateEvent(packet)); + methodQueue.add(method); // Event caused by method. Actual method not yet handled. + break; + case MessageNumber.NullResponse: + //Util.log("Null(ack) message. method=" + method + " packet=" + packet); + break; + case MessageNumber.ExceptionResponse: + method.gotException(packet); + break; // MARK: handlePacket + case MessageNumber.AAAResponse: + case MessageNumber.AskTransactionResponse: + case MessageNumber.CancelCommitResponse: + case MessageNumber.CloseClientSessionResponse: + case MessageNumber.EchoResponse: + case MessageNumber.EndTransactionResponse: + case MessageNumber.ExecuteResponse: + case MessageNumber.GetChangeSetContextResponse: + case MessageNumber.GetChangeSetDataResponse: + case MessageNumber.GetChangeSetsResponse: + case MessageNumber.GetClusterNewResponse: + case MessageNumber.GetRefreshResponse: + case MessageNumber.GetResourceSegmentResponse: + case MessageNumber.GetServerInfoResponse: + case MessageNumber.OpenClientSessionResponse: + case MessageNumber.RefreshResponse: + case MessageNumber.ReserveIdsResponse: + case MessageNumber.UndoResponse: + case MessageNumber.ReconnectResponse: + case MessageNumber.GetRefresh2Response: + case MessageNumber.GetClusterChangesResponse: + case MessageNumber.GetServerInfo2Response: + case MessageNumber.ListClustersResponse: { + method.gotPacket(packet); + } break; + } + } + } + private final ConnectionManager connectionManager; + private final PacketQueue inputQueue = new PacketQueue(); + private final Connection connection = new Connection(this); + private final Manager manager = new Manager(); + private Thread thread = new Thread(manager, manager.toString()); + private int[] header = null; + private int[] INTS = new int[HEADER_N]; + private int remaining = HEADER_SIZE; + private int deflateSize; + ConnectionThread(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + } + ByteBuffer getInputBuffer() { + return getInputBuffer(INCREMENT_SIZE); + } + ByteBuffer getInputBuffer(int sizeAtLeast) { + ByteBuffer bb = ByteBuffer.allocate(Math.max(sizeAtLeast, INCREMENT_SIZE)); + bb.order(ByteOrder.LITTLE_ENDIAN); + return bb; + } + @Override + public String toString() { + return "header=" + header + " remaining=" + remaining; + } + private String toString(ByteBuffer buffer, int nRead) { + return this + " nRead=" + nRead + " buffer=" + buffer; + } + ByteBuffer handleInput(ByteBuffer buffer, int nRead) // nRead > 0 + throws IOException { // MARK: handleInput + int position = buffer.position(); + if (null == header) { + if (position < HEADER_SIZE) + return buffer; + else { // Got header. + buffer.position(0); + IntBuffer ib = buffer.asIntBuffer(); + ib.get(INTS); + ib = null; + header = INTS; + deflateSize = header[DEFLATE]; + if (deflateSize < 0) + throw new IOException("Illegal deflate size=" + deflateSize); + remaining += deflateSize; + if (remaining > buffer.limit()) { // We need more space. This is not optimal but a simple way to handle this case. + ByteBuffer bb = getInputBuffer(remaining); + buffer.position(0); + buffer.limit(position); + bb.put(buffer); + return bb; + } + buffer.position(position); + } + } + if (position < remaining) + return buffer; // Need more data. + else if (position > remaining) { // Got whole packet and then some. This is not optimal but a simple way to handle this case. + ByteBuffer bb = getInputBuffer(remaining); + buffer.position(0); + buffer.limit(remaining); + bb.put(buffer); + inputQueue.addFirst(new Packet(header, bb)); + ByteBuffer b2 = getInputBuffer(buffer.limit()); + buffer.position(remaining); + buffer.limit(position); + b2.put(buffer); + header = null; + remaining = HEADER_SIZE; + return handleInput(b2, b2.position()); + } else if (position != remaining) + throw new IOException("Assertion error. this=" + toString(buffer, nRead)); + // Got exactly one packet. + inputQueue.addFirst(new Packet(header, buffer)); + header = null; + remaining = HEADER_SIZE; + return getInputBuffer(INCREMENT_SIZE); + } + InetSocketAddress getAddress() { + return manager.getAddress(); + } + boolean isConnected() { + return manager.isConnected(); + } + void reconnect() throws ProCoreException, InterruptedException { + manager.reconnect(); + } + void connect(int port) throws ProCoreException, InterruptedException { + manager.connect(port); + } + void connect(InetSocketAddress address) throws ProCoreException, InterruptedException { + manager.connect(address); + } + void disconnect() { + manager.disconnect(); + } + Connection call(Method m) throws ProCoreException, InterruptedException, IOException { + manager.call(m); + return connection; + } +} +class Client { + private final ProCoreServer proCoreServer; + private final ProCoreClient proCoreClient; + private final ConnectionThread connectionThread; + private boolean closed = false; + Client(ProCoreServer proCoreServer, ProCoreClient proCoreClient, ConnectionThread connectionThread) { + this.proCoreServer = proCoreServer; + this.proCoreClient = proCoreClient; + this.connectionThread = connectionThread; + } + void open() throws ProCoreException, InterruptedException { + synchronized (proCoreClient) { + if (!connectionThread.isConnected()) + connectionThread.connect(proCoreServer.getPort()); + } + closed = false; + } + boolean isOpen() { + synchronized (proCoreClient) { + return connectionThread.isConnected(); + } + } + boolean isClosed() { + synchronized (proCoreClient) { + return closed || !connectionThread.isConnected(); + } + } + void close() { + proCoreClient.deleteClient(this); + closed = true; + } + String execute(String command) throws ProCoreException { + return proCoreClient.execute(command); + } + void call(Method m) throws ProCoreException { + proCoreClient.call(m); + } +} +class ProCoreClient { + public static final String LOCALHOST = "127.0.0.1"; + class Remote { + private final ConnectionManager connectionManager = ConnectionManager.getInstance(); + private final ConnectionThread connectionThread = new ConnectionThread(connectionManager); + Remote() { + } + } + private final Remote remote = new Remote(); + private final Set clients = new HashSet(); + private final ProCoreServer proCoreServer; + public ProCoreClient(ProCoreServer proCoreServer) throws ProCoreException { + this.proCoreServer = proCoreServer; + } + synchronized public int getPort() { + return remote.connectionThread.getAddress().getPort(); + } + synchronized public boolean isConnected() { + return remote.connectionThread.isConnected(); + } + synchronized public Client newClient() throws ProCoreException { + return newClient(proCoreServer, remote.connectionThread.getAddress()); + } + synchronized public Client newClient(ProCoreServer proCoreServer, InetSocketAddress address) throws ProCoreException { + if (!remote.connectionThread.isConnected()) + try { + remote.connectionThread.connect(address); + } catch (InterruptedException e) { + if (!remote.connectionThread.isConnected()) { + remote.connectionThread.disconnect(); + throw new ProCoreException("Failed to create new client."); + } + } + else if (!address.equals(remote.connectionThread.getAddress())) + throw new ProCoreException("Illegal newClient call. Already connected to different address."); + Client client = new Client(proCoreServer, this, remote.connectionThread); + clients.add(client); + return client; + } + synchronized int deleteClient(Client client) { + clients.remove(client); + return clients.size(); + } + void closeClient(int aSessionId) throws ProCoreException { + try { + CloseClientSessionFunction f = new CloseClientSessionFunction(aSessionId); + Method m = new Method(f, null, null); + Connection c = remote.connectionThread.call(m); + m.waitForReply(c); + } catch (InterruptedException e) { + throw new ProCoreException("Thread interrupted during method call.", e); + } catch (IOException e) { + throw new ProCoreException("IOException during method call.", e); + } catch (Throwable e) { + throw new ProCoreException("Throwable during method call.", e); + } + } + synchronized public String execute(String command) throws ProCoreException { + try { + ExecuteFunction f = new ExecuteFunction(command); + Method m = new Method(f, null, null); + Connection c = remote.connectionThread.call(m); + m.waitForReply(c); + return f.out; + } catch (InterruptedException e) { + throw new ProCoreException("Thread interrupted during method call.", e); + } catch (IOException e) { + throw new ProCoreException("IOException during method call.", e); + } catch (Throwable e) { + throw new ProCoreException("Throwable during method call.", e); + } + } + private void recall(Method method, IOException e) throws ProCoreException {// :) + if (remote.connectionThread.isConnected()) + throw new ProCoreException("IOException during method call.", e); + try { + remote.connectionThread.reconnect(); + Connection c = remote.connectionThread.call(method); + method.waitForReply(c); + } catch (Throwable t) { + throw new ProCoreException("IOException during method call. Recall failed.", e); + } + } + synchronized public void call(Method method) throws ProCoreException { + try { + Connection c = remote.connectionThread.call(method); + method.waitForReply(c); + } catch (InterruptedException e) { + throw new ProCoreException("Thread interrupted during method call.", e); + } catch (IOException e) { + recall(method, e); + } catch (ProCoreException e) { + throw e; + } catch (Throwable e) { + throw new ProCoreException("Throwable during method call.", e); + } + } + synchronized public void connect(int port) throws ProCoreException, InterruptedException { + remote.connectionThread.connect(port); + } + synchronized public void connect(SessionAddress sessionAddress) + throws ProCoreException, InterruptedException { + remote.connectionThread.connect(sessionAddress.getSocketAddress()); + } + synchronized public void disconnect() throws ProCoreException { + remote.connectionThread.disconnect(); + } +} +abstract class Event { + final String name; + final Packet packet; + Event(String name, Packet packet) { + this.name = name; + this.packet = packet; + } + int getToken() { + return packet.header.token; + } + void deserialize(org.simantics.db.server.protocol.AbstractEvent event) { + event.deserialize(event.getRequestNumber(), getDataBuffer()); + } + private org.simantics.db.server.protocol.DataBuffer getDataBuffer() { + this.packet.bytes.position(20); + return new org.simantics.db.server.protocol.DataBuffer(this.packet.bytes); + } + @Override public String toString() { + return name + " token=" + packet.header.token; + } +} +class ChangeSetUpdateEvent extends Event { + ChangeSetUpdateEvent(Packet packet) { + super("ChangeSetUpdateEvent", packet); + } +} +abstract class EventHandler { + abstract void on(Event event); +} +class DefaultEventHandler extends EventHandler { + void on(Event event) { + Util.log("Missing event handler. event=" + event); + } +}