-/*******************************************************************************\r
-\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.db.server.internal;\r
-\r
-import java.io.BufferedReader;\r
-import java.io.BufferedWriter;\r
-import java.io.File;\r
-import java.io.FileInputStream;\r
-import java.io.FileNotFoundException;\r
-import java.io.FileOutputStream;\r
-import java.io.IOException;\r
-import java.io.InputStreamReader;\r
-import java.io.OutputStreamWriter;\r
-import java.io.PrintStream;\r
-import java.net.InetSocketAddress;\r
-import java.nio.ByteBuffer;\r
-import java.nio.ByteOrder;\r
-import java.nio.IntBuffer;\r
-import java.nio.file.Files;\r
-import java.nio.file.Path;\r
-import java.util.ArrayList;\r
-import java.util.HashMap;\r
-import java.util.HashSet;\r
-import java.util.Map;\r
-import java.util.Set;\r
-import java.util.concurrent.CountDownLatch;\r
-import java.util.concurrent.LinkedBlockingQueue;\r
-\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.server.DatabaseLastExitException;\r
-import org.simantics.db.server.GuardFileVersionException;\r
-import org.simantics.db.server.ProCoreException;\r
-import org.simantics.db.server.ServerNotFoundException;\r
-import org.simantics.db.server.protocol.CloseClientSessionFunction;\r
-import org.simantics.db.server.protocol.ExecuteFunction;\r
-import org.simantics.db.server.protocol.MessageNumber;\r
-\r
-public class ProCoreServer {\r
- private static Map<String, ProCoreServer> workingDirs = new HashMap<String, ProCoreServer>();\r
- private static Map<String, ProCoreServer> databaseIds = new HashMap<String, ProCoreServer>();\r
- private static final String LOCALHOST = ProCoreClient.LOCALHOST;\r
- public static final int VERSION_MAJOR = 1;\r
- public static final int VERSION_MINOR = 4; // 2015-05-05\r
- public static final String BRANCH_DIR = "procore.headClusters.procore";\r
- public static final String TAIL_DIR = "procore.tailClusters.procore";\r
- public static final String TAIL_FILE = "procore.tail.procore";\r
- public static final String CONFIG_FILE = "procore.config.procore";\r
- public static final String GUARD_FILE = "procore.guard.procore";\r
- public static final String JOURNAL_FILE = "procore.journal.procore";\r
- public static final String LOG_FILE = "procore.log.procore";\r
- public static final String DCS_FILE = "procore.dumpChangeSets.procore";\r
- public static final String RECOVERY_NEEDED_FILE = "recovery.needed";\r
- public static final String RECOVERY_IGNORED_FILE = "recovery.ignored";\r
- public static final String PROTOCOL_IGNORED_FILE = "protocol.ignored";\r
- public static final String PAGE_FILE_PATTERN = "procore.page*.procore";\r
- public static final String DATA_PREFIX = "ClusterData.";\r
- public static final String DATA_PATTERN = DATA_PREFIX + "*";\r
- public static final String INDEX_PREFIX = "ClusterIndex.";\r
- public static final String INDEX_PATTERN = INDEX_PREFIX + "*";\r
- public static final String VALUE_PREFIX = "ExternalValue.";\r
- public static final String VALUE_PATTERN = VALUE_PREFIX + "*";\r
- public static final String VALUE_SUFFIX = ".procore";\r
- public static final String DELETED_PREFIX = "ClusterDeleted.";\r
- public static final String DELETED_PATTERN = DELETED_PREFIX + "*";\r
- private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;\r
- public static final String QUIT_CMD = "quit";\r
-// private static final int SHORT_SLEEP_COUNT = 1;\r
-// private static final int LONG_SLEEP_COUNT = 10;\r
-// static public void stopProCoreServer(File workingDir) throws ProCoreException {\r
-// GuardFile guardFile = new GuardFile(workingDir);\r
-// guardFile.stopProCoreServer();\r
-// }\r
- private static boolean isRunning(String aPid) throws ProCoreException {\r
- boolean isRunning = false;\r
- try {\r
- switch (OSType.calculate()) {\r
- case WINDOWS: {\r
- Process p = Runtime.getRuntime().exec("tasklist.exe /fo csv /nh /fi \"pid eq " + aPid + "\"");\r
- BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));\r
- String line = input.readLine();\r
- if (line.matches(".*ProCoreServer.exe.*"))\r
- isRunning = true;\r
- input.close();\r
- break;\r
- }\r
- case UNIX: {\r
- Process p = Runtime.getRuntime().exec("ps -p " + aPid + " -o comm=");\r
- BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));\r
- String line = input.readLine();\r
- if (line.matches(".*ProCoreServer.*"))\r
- isRunning = true;\r
- input.close();\r
- break;\r
- }\r
- default:\r
- }\r
- } catch (Exception e) {\r
- throw new ProCoreException("Could not get list of running processes.", e);\r
- }\r
- return isRunning;\r
- }\r
- static class ProCoreProcess {\r
- private enum State { Created, Initialized, ThreadStarted, ProcessStarted, Prompted, GotPort, ThreadStopped };\r
- class Manager implements Runnable {\r
- private State state = State.Created;\r
- private CountDownLatch serverPrompted;\r
- private LinkedBlockingQueue<String> reply;\r
- private volatile String command;\r
- private volatile Integer exitValue;\r
- private volatile boolean carryOn;\r
- private volatile int port;\r
- private final File dbFolder; // Just for exception i.e. error reporting.\r
- Manager(File dbFolder) {\r
- this.dbFolder = dbFolder;\r
- init();\r
- }\r
- private void init() {\r
- serverPrompted = new CountDownLatch(1);\r
- reply = new LinkedBlockingQueue<String>();\r
- exitValue = null;\r
- carryOn = true;\r
- port = 0;\r
- state = State.Initialized;\r
- }\r
- int getPort() {\r
- return port;\r
- }\r
- void tryToStart() throws ProCoreException, InterruptedException {\r
- if (thread.isAlive())\r
- return;\r
- thread = new Thread(manager, manager.initAndGetName());\r
- thread.start();\r
- serverPrompted.await();\r
- if (!state.equals(State.Prompted))\r
- throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");\r
- String reply = giveCommand("print port");\r
- try {\r
- port = Integer.parseInt(reply);\r
- } catch (NumberFormatException e) {\r
- if (null != exitValue && 0 != exitValue)\r
- throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");\r
- if (reply.equals("End of server thread."))\r
- throw new ProCoreException("Server did not start (did not prompt). state=" + state);\r
- else\r
- throw new ProCoreException("Server did not start. Could not parse port. reply=" + reply);\r
- }\r
- state = State.GotPort;\r
- }\r
- String giveCommand(String command) throws ProCoreException, InterruptedException {\r
- if (!thread.isAlive())\r
- throw new ProCoreException("Server thread not alive.");\r
- if (null != this.command)\r
- throw new ProCoreException("Old command pending.");\r
- this.command = command;\r
- return reply.take();\r
- }\r
- private String initAndGetName() {\r
- init();\r
- return "ProCoreProcess " + ++instanceCount + " " + builder.directory();\r
- }\r
- boolean isActive() throws ProCoreException {\r
- if (null == exitValue && thread.isAlive())\r
- return true;\r
- else\r
- return false;\r
- }\r
- boolean tryToStop() throws ProCoreException {\r
- if (!thread.isAlive())\r
- return true;\r
- carryOn = false;\r
- thread.interrupt();\r
- try {\r
- thread.join(10000); // milliseconds\r
- } catch (InterruptedException e) {\r
- }\r
- return !thread.isAlive();\r
- }\r
- @Override\r
- public void run() {\r
- state = State.ThreadStarted;\r
- Util.log("ProCoreServer thread started.");\r
- try {\r
- Util.trace("ProCoreProcessManager start.");\r
- Process process = builder.start();\r
- exitValue = exitValue(process);\r
- if (null != exitValue)\r
- Util.logError("Server process did not start.");\r
- else {\r
- state = State.ProcessStarted;\r
- runProcessStarted(process); // Process start\r
- }\r
- } catch (IOException e) {\r
- Util.logError("Failed during process watch.");\r
- } finally {\r
- reply.add("End of server thread.");\r
- Util.trace("ProCoreServerThread stop.");\r
- state = State.ThreadStopped;\r
- serverPrompted.countDown();\r
- }\r
- }\r
- private void runProcessStarted(Process process) throws IOException {\r
- try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));\r
- final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));)\r
- {\r
- do {\r
- try {\r
- if (reader.ready())\r
- handleLine(reader.readLine(), reader, writer);\r
- else if (null != command) {\r
- writer.write(command);\r
- command = null;\r
- writer.newLine();\r
- writer.flush();\r
- String line = getReply(reader);\r
- reply.add(line);\r
- } else\r
- Thread.sleep(100); // milliseconds\r
- } catch (InterruptedException e) {\r
- Util.trace("ProCoreServerThread interrupted.");\r
- }\r
- exitValue = exitValue(process);\r
- if (null == exitValue && !carryOn)\r
- try {\r
- try {\r
- writer.write(QUIT_CMD);\r
- writer.newLine();\r
- writer.flush();\r
- } catch (Throwable t) {\r
- Util.logError("Wait for process death was interrupted.", t);\r
- } finally {\r
- }\r
- exitValue = process.waitFor();\r
- } catch (InterruptedException e) {\r
- Util.logError("Wait for process death was interrupted.", e);\r
- carryOn = true;\r
- continue;\r
- }\r
- } while (null == exitValue);\r
- } // End of resource try.\r
- }\r
- private String getReply(BufferedReader reader) throws IOException {\r
- StringBuilder sb = new StringBuilder();\r
- while (true) {\r
- String line = reader.readLine();\r
- if (null == line || line.startsWith("Waiting for input?"))\r
- break;\r
- sb.append(line);\r
- }\r
- return sb.toString();\r
- }\r
- private void handleLine(String line, BufferedReader reader, BufferedWriter writer) throws IOException {\r
- if (DEBUG)\r
- Util.showDebug(line);\r
- if (line.startsWith("Main end.")) {\r
- carryOn = false; // Last line from server.\r
- } else if (serverPrompted.getCount() > 0 && line.startsWith("Waiting for input?")) {\r
- state = State.Prompted;\r
- serverPrompted.countDown(); // Server has been started and is waiting for console input.\r
- } else\r
- Util.trace("From server. line=" + line);\r
- }\r
- private Integer exitValue(Process process) {\r
- try {\r
- return process.exitValue();\r
- } catch (IllegalThreadStateException e) {\r
- return null; // Still alive.\r
- }\r
- }\r
- }\r
- private static int instanceCount; // Keeps count of thread instances.\r
- private final ProcessBuilder builder;\r
- private final Manager manager;\r
- private Thread thread;\r
- ProCoreProcess(File exeFile, File workingDir) {\r
- String[] args = new String[0];\r
- String[] cmd = new String[1 + args.length];\r
- cmd[0] = exeFile.getAbsolutePath().toString();\r
- System.arraycopy(args, 0, cmd, 1, args.length);\r
- builder = new ProcessBuilder(cmd);\r
- builder.redirectErrorStream(true);\r
- builder.directory(workingDir);\r
- manager = new Manager(workingDir);\r
- thread = new Thread(manager, manager.initAndGetName());\r
- }\r
- public void tryToStart() throws ProCoreException, InterruptedException {\r
- manager.tryToStart();\r
- }\r
-// public int exitValue() throws IllegalStateException {\r
-// Integer i = proCoreServerThread.getExitValue();\r
-// if (null == i)\r
-// throw new IllegalStateException("ProCoreServerThread is still running.");\r
-// return i;\r
-// }\r
-// public boolean ignoreProtocolVersion() {\r
-// File wd = builder.directory();\r
-// if (null == wd)\r
-// return false;\r
-// Path ignore = wd.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE);\r
-// if (Files.exists(ignore))\r
-// return true;\r
-// return false;\r
-// }\r
- public boolean isActive() throws ProCoreException {\r
- return manager.isActive();\r
- }\r
- public boolean tryToStop() throws ProCoreException {\r
- return manager.tryToStop();\r
- }\r
- int getPort() throws ProCoreException {\r
- return manager.getPort();\r
- }\r
- }\r
- private static enum OSType {\r
- UNIX, WINDOWS, UNKNOWN;\r
-\r
- public static OSType calculate() {\r
- String osName = System.getProperty("os.name");\r
- assert osName != null;\r
- osName = osName.toLowerCase();\r
- if (osName.startsWith("windows"))\r
- return WINDOWS;\r
- if (osName.startsWith("mac os x") || osName.startsWith("linux") || osName.startsWith("sun"))\r
- return UNIX;\r
- return UNKNOWN;\r
- }\r
- }\r
- static public boolean ignoreProtocolVersion(File dbFolder) {\r
- if (null == dbFolder)\r
- return false;\r
- if (Files.exists(dbFolder.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE)))\r
- return true;\r
- return false;\r
- }\r
- static public ProCoreServer getProCoreServer(File serverDir, final File workingDir)\r
- throws ProCoreException, ServerNotFoundException {\r
- try {\r
- ProCoreServer proCoreServer;\r
- synchronized (workingDirs) {\r
- proCoreServer = workingDirs.get(workingDir.getCanonicalPath());\r
- if (null != proCoreServer)\r
- return proCoreServer;\r
- if (!serverDir.isDirectory())\r
- throw new ProCoreException("Expected server directory as argument");\r
- if (!workingDir.isDirectory())\r
- throw new ProCoreException("Expected working directory as argument. wd=" + workingDir.getAbsolutePath());\r
- String exeSuffix = Activator.getExeSuffix();\r
- File executable = new File(serverDir, "ProCoreServer" + exeSuffix);\r
- proCoreServer = new ProCoreServer(executable, workingDir);\r
- workingDirs.put(workingDir.getCanonicalPath(), proCoreServer);\r
- return proCoreServer;\r
- }\r
- } catch (IOException e) {\r
- throw new ProCoreException("IOException", e);\r
- }\r
- }\r
-\r
- public static void logVersionInformation() {\r
- StringBuilder msg = new StringBuilder(200);\r
- String nl = " \n"; // Default log error removes newlines so I add space to make the text easier to read.\r
- msg.append("ProCore version information:");\r
- msg.append(nl);\r
- msg.append("ProCoreServer version=" + ProCoreServer.VERSION_MAJOR + "." + ProCoreServer.VERSION_MINOR + " protocol=" + MessageNumber.ProtocolVersionMajor + "." + MessageNumber.ProtocolVersionMinor);\r
- try {\r
- msg.append(" folder=" + org.simantics.db.server.internal.Activator.getServerFolder());\r
- } catch (IOException e) {\r
- msg.append(" folder information not available");\r
- }\r
- msg.append(nl);\r
- msg.append("svn id=$Id: ProCoreServer.java r31684 2015-09-10 13:45:00Z $");\r
- String str = msg.toString();\r
- Logger.defaultLog(str);\r
- }\r
- private static String[] getFileLines(final File aFile, int maxLines) {\r
- final String[] STRING_EMPTY_ARRAY = new String[0];\r
- try {\r
- if (!aFile.canRead()) {\r
- return STRING_EMPTY_ARRAY;\r
- }\r
- try (FileInputStream fis = new FileInputStream(aFile);\r
- InputStreamReader isr = new InputStreamReader(fis, "US-ASCII");\r
- BufferedReader br = new BufferedReader(isr);){\r
- String line = null;\r
- int count = 0;\r
- ArrayList<String> lines = new ArrayList<String>();\r
- while ((line = br.readLine()) != null) {\r
- if (++count > maxLines)\r
- break; // Caller not interested in more lines.\r
- lines.add(line);\r
- }\r
- return lines.toArray(STRING_EMPTY_ARRAY);\r
- } catch (FileNotFoundException e) {\r
- return STRING_EMPTY_ARRAY;\r
- }\r
- } catch (Throwable t) {\r
- if (DEBUG)\r
- t.printStackTrace();\r
- Logger.defaultLogError(t);\r
- }\r
- return new String[0];\r
- }\r
- private static int parseInt(String s)\r
- throws ProCoreException {\r
- try {\r
- return Integer.parseInt(s);\r
- } catch (NumberFormatException e) {\r
- String msg = "Could not convert string to number. string=" + s;\r
- Logger.defaultLogError(msg);\r
- throw new ProCoreException(msg, e);\r
- }\r
- }\r
- private String databaseId = null;\r
- private final GuardFile guardFile;\r
- private final ProCoreClient proCoreClient;\r
- ProCoreClient getProCoreClient() {\r
- return proCoreClient;\r
- }\r
- private final ProCoreProcess proCoreProcess;\r
- private ProCoreServer(File exeFile, File workingDir)\r
- throws ProCoreException {\r
- if (null == exeFile)\r
- throw new ProCoreException("Illegal argument: exeFile is null.");\r
- if (null == workingDir)\r
- throw new ProCoreException("Illegal argument: workingDir is null.");\r
- guardFile = new GuardFile(workingDir);\r
- proCoreClient = new ProCoreClient(this);\r
- proCoreProcess = new ProCoreProcess(exeFile, workingDir);\r
- Util.log("ProCoreServer exe: " + exeFile);\r
- Util.log("ProCoreServer folder: " + workingDir);\r
- logVersionInformation();\r
- }\r
-// //@Override\r
- public synchronized String execute(String command) throws ProCoreException, InterruptedException {\r
- return proCoreClient.execute(command);\r
- }\r
- private void startInternal()\r
- throws ProCoreException {\r
- if (proCoreClient.isConnected())\r
- return;\r
- Exception exception;\r
- try {\r
- proCoreProcess.tryToStart();\r
- int port = proCoreProcess.getPort();\r
- SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);\r
- proCoreClient.connect(sa);\r
- return;\r
- } catch (InterruptedException e) {\r
- Util.logError("ProCoreProcess start was interrupted.", e);\r
- exception = e;\r
- } catch (DatabaseLastExitException e) {\r
- throw e; // This situation can be analyzed and handled by start handler.\r
- } catch (ProCoreException e) {\r
- Util.logError("Failed to start ProCoreServer process.", e);\r
- exception = e;\r
- }\r
- boolean isActive = guardFile.isActive(proCoreClient);\r
- if (!isActive)\r
- throw new ProCoreException("Failed to connect to ProCoreServer.", exception);\r
- }\r
-// private int getWaitCount() throws ProCoreException {\r
-// if (!proCoreProcess.isActive())\r
-// return SHORT_SLEEP_COUNT; // Short wait because server process is not running.\r
-// else\r
-// return LONG_SLEEP_COUNT;\r
-// }\r
-// private void startInternalReinit(int exitStatus)\r
-// throws ProCoreException {\r
-// switch (exitStatus) {\r
-// default: // What has happened here?\r
-// throw new DatabaseStartException(workingDir, getExitMessage(exitStatus));\r
-// case -1: // Exit status not available, not sure what has happened here.\r
-// case 29: // Server died because it could not read exit status from guard file.\r
-// case 30: // Server died because old exit status was not zero.\r
-// case 40: // Server died because it received terminating signal.\r
-// case 42: // Server died because it could not read end status from guard file.\r
-// case 43: // Server died because end value was not zero.\r
-// case 44: // Server died because server could not read version from guard file.\r
-// case 45: // Server died because server did not support guard file version.\r
-// case 47: // Server died because server could not listen to any port.\r
-// break;\r
-// case 32: // Server died becuase database version did not match.\r
-// throw new DatabaseVersionException(workingDir, getExitMessage(exitStatus));\r
-// case 33: // Server died because database has been corrupted.\r
-// throw new DatabaseCorruptedException(workingDir, getExitMessage(exitStatus));\r
-// case 39: // Server died because last exit was not clean.\r
-// throw new DatabaseLastExitException(workingDir, getExitMessage(exitStatus));\r
-// }\r
-// boolean dead = guardFile.wait4ServerToDie(proCoreClient, getWaitCount());\r
-// if (!dead) {\r
-// StringBuilder sb = new StringBuilder(256);\r
-// sb.append("ProCoreServer is running according to guard file. Giving up trying to start a new instance. Server exit message:\n");\r
-// sb.append(getExitMessage(exitStatus));\r
-// throw new DatabaseStartException(workingDir, sb.toString());\r
-// }\r
-// boolean deleted = guardFile.delete();\r
-// if (!deleted)\r
-// Logger.defaultLogError("ProCoreServer is dead but can not delete guard file. " + getExitMessage(exitStatus));\r
-// }\r
- //@Override\r
- public synchronized boolean isActive()\r
- throws ProCoreException {\r
- if (!proCoreClient.isConnected())\r
- return guardFile.isActive(proCoreClient);\r
- proCoreClient.execute("");\r
- return true;\r
- }\r
- public synchronized boolean isAlive()\r
- throws ProCoreException {\r
- if (proCoreClient.isConnected())\r
- return true;\r
- if (proCoreProcess.isActive())\r
- return true;\r
- return guardFile.isAlive();\r
- }\r
- Client newClient() throws ProCoreException {\r
- if (proCoreClient.isConnected())\r
- return proCoreClient.newClient();\r
- int port = getPort();\r
- InetSocketAddress isa = new InetSocketAddress(LOCALHOST, port);\r
- return proCoreClient.newClient(this, isa);\r
- }\r
- public synchronized boolean tryToStop()\r
- throws ProCoreException {\r
- synchronized (proCoreClient) {\r
- ProCoreException ex = null;\r
- if (!proCoreClient.isConnected() && !proCoreProcess.isActive()) {\r
- try {\r
- proCoreClient.connect(getPort());\r
- } catch (InterruptedException e) {\r
- }\r
- }\r
- if (proCoreClient.isConnected()) {\r
- String s = "";\r
- try {\r
- s = proCoreClient.execute(QUIT_CMD);\r
- String t = s.replaceAll("\n", "");\r
- int n = Integer.parseInt(t);\r
- if (n != 1)\r
- Util.log("Number of connections after quit command is " + n + ".");\r
- proCoreClient.closeClient(0);\r
- if (!isLocal() && n > 1)\r
- throw new ProCoreException("More than one connection for remote. n=" + n);\r
- } catch (NumberFormatException e) {\r
- Util.logError("Failed to parse number of connections. s=" + s, e);\r
- } catch (ProCoreException e) {\r
- ex = e;\r
- } finally {\r
- proCoreClient.disconnect();\r
- }\r
- }\r
- try {\r
- proCoreProcess.tryToStop();\r
- } finally {\r
- if (proCoreProcess.isActive() && null == ex)\r
- ex = new ProCoreException("Failed to stop ProCoreProcess.");\r
- }\r
- if (null != ex)\r
- throw ex;\r
- return !proCoreClient.isConnected() && !proCoreProcess.isActive();\r
- }\r
- }\r
- public synchronized boolean isConnected()\r
- throws ProCoreException {\r
- return proCoreClient.isConnected();\r
- }\r
- public synchronized boolean isLocal()\r
- throws ProCoreException {\r
- return proCoreClient.isConnected() && proCoreProcess.isActive();\r
- }\r
- public synchronized void disconnect()\r
- throws ProCoreException {\r
- proCoreClient.disconnect();\r
- }\r
- public synchronized void connect()\r
- throws ProCoreException, InterruptedException {\r
- int port = getPort();\r
- if (0 == port)\r
- throw new ProCoreException("Port 0 not supported as connection address.");\r
- proCoreClient.connect(port);\r
- }\r
- private String getExitMessage(int exitValue) {\r
- switch (exitValue) {\r
- default:\r
- return "Unexpected exit value = " + exitValue + ".";\r
- case 1:\r
- return "Exception was thrown. This indicates problems with program logic.";\r
- case 2:\r
- return "CallException was thrown. This indicates problems with calls to/from parent server.";\r
- case 3:\r
- return "ProtocolException was thrown. This indicates problems with executable versions between local and parent servers.";\r
- case 4:\r
- return "NoSuchElementException was thrown. This indicates problems with program logic.";\r
- case 5:\r
- return "IllegalArgumentException was thrown. This indicates problems with program logic.";\r
- case 6:\r
- return "IllegalResourceException was thrown. This indicates problems with server data.";\r
- case 7:\r
- return "IllegalClusterException was thrown. This indicates problems with server data.";\r
- case 8:\r
- return "ParseException was thrown. This indicates problems with/during parsing of the configuration file.";\r
- case 9:\r
- return "PortSemaphoreException was thrown. This indicates that some other server is using the same port number.";\r
- case 10:\r
- return "DatabaseSemaphoreException was thrown. This indicates that some other server is using the same database id.";\r
- case 11:\r
- return "SystemException was thrown. This indicates problems with program logic.";\r
- case 12:\r
- return "CppException was thrown. This indicates problems with program logic.";\r
- case 13:\r
- return "UnknownException was thrown. This indicates problems with program logic.";\r
- case 14:\r
- return "ThrowException was thrown. This indicates problems with program logic.";\r
- case 15:\r
- return "AssertException was thrown. This indicates problems with program logic.";\r
- case 16:\r
- return "ParentCheckException was thrown. This indicates problems with the database version of parent.";\r
- case 17:\r
- return "ParentConnectionException was thrown. This indicates problems with parent child relationship.";\r
- case 18:\r
- return "ConnectionException was thrown. This indicates problems with TCP/IP connections.";\r
- case 19:\r
- return "ProxyException was thrown. This indicates problems with TCP/IP connections to proxy.";\r
- case 20:\r
- return "NoSuchFileException was thrown. This indicates problems with a missing file.";\r
- case 21:\r
- return "JournalException was thrown. This indicates problems with journal mechanisim.";\r
- case 22:\r
- return "OutOfSpaceException was thrown. This indicates problems with memory.";\r
- case 23:\r
- return "ExternalValueException was thrown. This indicates problems with large value handling.";\r
- case 24:\r
- return "IllegalTransactionException was thrown. This indicates problems with transaction logic.";\r
- case 25:\r
- return "WriteTransactionException was thrown. This indicates that server failed during write transaction.";\r
- case 26:\r
- return "ConsoleException was thrown. This indicates that server console was closed without quit command.";\r
- case 27:\r
- return "ExitException was thrown. This indicates that server did not exit cleanly.";\r
- case 28:\r
- return "ExitReadPIdException was thrown. This indicates that server did not exit cleanly.";\r
- case 29:\r
- return "ExitReadStatusException was thrown. This indicates that server could not read exit status from guard file.";\r
- case 30:\r
- return "ExitStatusValueException was thrown. This indicates that server did not exit cleanly.";\r
- case 31:\r
- return "UpdateException was thrown. This indicates that server failed during update.";\r
- case 32:\r
- return "DatabaseVersionException was thrown. This indicates that server can not use given database.";\r
- case 33:\r
- return "DatabaseCorruptionException was thrown. This indicates that server has detected database corruption.";\r
- case 34:\r
- return "ExitReadPortException was thrown. This indicates that server did not start or exit cleanly.";\r
- case 35:\r
- return "ExitGuardOpenException was thrown. This indicates that server could not open guard file.";\r
- case 36:\r
- return "ExitGuardOpenWriteException was thrown. This indicates that server could not open guard file for writing.";\r
- case 37:\r
- return "ExitGuardWritePidException was thrown. This indicates that server could not write pid to guard file.";\r
- case 38:\r
- return "ExitGuardWritePortException was thrown. This indicates that server could not write port to guard file.";\r
- case 39:\r
- return "LastCommitNotCleanException was thrown. This indicates that client died during transaction. Recovery action needed.";\r
- case 40:\r
- return "TerminatedException was thrown. This indicates that server died because it received terminating signal.";\r
- case 41:\r
- return "ExitGuardWriteStatusException was thrown. This indicates that server could not write exit status to guard file.";\r
- case 42:\r
- return "EndReadStatusException was thrown. This indicates that server died because it could not read end status from guard file.";\r
- case 43:\r
- return "EndStatusValueException was thrown. This indicates that server died because end value was not zero.";\r
- case 44:\r
- return "ExitGuardReadVersionException was thrown. This indicates that server died because server could not read version from guard file.";\r
- case 45:\r
- return "ExitGuardValueVersionException was thrown. This indicates that server died because server did not support guard file version.";\r
- case 46:\r
- return "ExitGuardWriteVersionException was thrown. This indicates that server died because server could not write version to guard file.";\r
- case 47:\r
- return "ListenPortException was thrown. This indicates that server died because server could not listen to given port or any other port.";\r
- case 48:\r
- return "JournalVersionException was thrown. This indicates that database was written by older version.";\r
- case 49:\r
- return "TailFileException was thrown. This indicates problems with handling of data involved with purge operation.";\r
- }\r
- }\r
- synchronized void start()\r
- throws ProCoreException {\r
- try {\r
- startInternal();\r
- } catch (DatabaseLastExitException e) {\r
- throw e; // This can be analyzed and handled by start handler.\r
- } catch (Throwable t) {\r
- ProCoreException pe = null;\r
- if (t instanceof ProCoreException)\r
- pe = (ProCoreException)t;\r
- else {\r
- String msg = "ProCoreServer startup failed."\r
- + " Automatic rcovery handling not implemented."\r
- + " Database must be fixed manually.";\r
- pe = new ProCoreException(msg, t);\r
- }\r
- if (null == pe.getDbFolder())\r
- pe.setDbFolder(proCoreProcess.builder.directory());\r
- }\r
- }\r
- synchronized void stop()\r
- throws ProCoreException, InterruptedException{\r
- if (null != databaseId)\r
- databaseIds.remove(databaseId);\r
- boolean dead = tryToStop();\r
- if (!dead)\r
- throw new ProCoreException("Failed to stop ProCoreServer.");\r
- }\r
- public synchronized String getExitMessage()\r
- throws ProCoreException {\r
- Integer returnValue = guardFile.getExitValue();\r
- if (null == returnValue)\r
- throw new ProCoreException("Exit message not available.");\r
- return getExitMessage(returnValue);\r
- }\r
- //@Override\r
- public synchronized ServerAddress getAddress()\r
- throws ProCoreException {\r
- return new ServerAddress(LOCALHOST, getPort());\r
- }\r
- //@Override\r
- public synchronized int getPort()\r
- throws ProCoreException {\r
- int port = 0;\r
- if (proCoreClient.isConnected()) {\r
- port = proCoreClient.getPort();\r
- if (0 != port)\r
- return port;\r
- }\r
- if (proCoreProcess.isActive()) {\r
- port = proCoreProcess.getPort();\r
- if (0 != port)\r
- return port;\r
- }\r
- return guardFile.getPort();\r
- }\r
- public synchronized TailFile.Data getTailData()\r
- throws ProCoreException {\r
- Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.TAIL_DIR);\r
- return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());\r
- }\r
-// public synchronized TailFile.Data getTailDataFromHead()\r
-// throws ProCoreException {\r
-// Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.BRANCH_DIR);\r
-// return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());\r
-// }\r
- public static class GuardFile {\r
- public static final String GUARD_FILE = ProCoreServer.GUARD_FILE;\r
- private static final int VERSION_LINE = 1;\r
- private static final int PID_LINE = 2;\r
- private static final int PORT_LINE = 3;\r
- private static final int EXIT_LINE = 4;\r
- private static final int END_LINE = 5;\r
- private final File guardFile;\r
- GuardFile(File workingDir) {\r
- guardFile = new File(workingDir, GUARD_FILE);\r
- }\r
- int getPort() throws ProCoreException {\r
- String[] lines = getFileLines(guardFile, END_LINE);\r
- if (lines.length < PORT_LINE)\r
- throw new ProCoreException("Server port is not available. " + guardFile.getAbsolutePath());\r
- return parseInt(lines[PORT_LINE-1]);\r
- }\r
- private void checkVersion(String[] lines) throws ProCoreException {\r
- if (lines.length < VERSION_LINE)\r
- throw new ProCoreException("Guard file version is not available. " + guardFile.getAbsolutePath());\r
- int version = parseInt(lines[VERSION_LINE-1]);\r
- if (1 != version)\r
- throw new GuardFileVersionException(guardFile, "Unsupported guard file version. version=" + version);\r
- }\r
- boolean delete() {\r
- if (guardFile.exists())\r
- return guardFile.delete();\r
- else\r
- return true; // Guard file is deleted i.e. does not exit.\r
- }\r
- void deleteLog() {\r
- if (!guardFile.exists())\r
- return;\r
- try {\r
- Files.delete(guardFile.toPath());\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
- }\r
- Integer getExitValue() {\r
- String lines[] = getFileLines(guardFile, END_LINE);\r
- try {\r
- checkVersion(lines);\r
- if (lines.length >= EXIT_LINE)\r
- return parseInt(lines[EXIT_LINE-1]);\r
- else if (lines.length >= PID_LINE && !isRunning(lines[PID_LINE-1]))\r
- return -1; // Server has died without writing exit status.\r
- return null; // Server might be running.\r
- } catch (Exception e) {\r
- Logger.defaultLogError("Could not get ProCoreServer exit value.", e);\r
- return null; // Server might be running.\r
- }\r
- }\r
-// void stopProCoreServer() throws ProCoreException {\r
-// if (!guardFile.exists())\r
-// throw new ServerGuardNotFoundException(guardFile.getParentFile(), "Guard file does not exist.");\r
-// String lines[] = new String[0];\r
-// for (int i=0; i<LONG_SLEEP_COUNT; ++i, sleep(i, LONG_SLEEP_COUNT)) {\r
-// lines = getFileLines(guardFile, END_LINE);\r
-// if (lines.length < 1)\r
-// throw new ProCoreException("Guard file does not contain lines.");\r
-// checkVersion(lines);\r
-// if (lines.length >= END_LINE)\r
-// return; // Server is dead according to guard file.\r
-// if (!isRunning(lines[PID_LINE-1]))\r
-// return; // ProCoreServer is not running.\r
-// if (lines.length >= PORT_LINE) // ProCoreServer is running according to pid. Either it's slow or pid is for some other file.\r
-// break;\r
-// }\r
-// if (lines.length < PORT_LINE)\r
-// throw new ProCoreException("Guard file does not contain port.");\r
-// try {\r
-// int port = parseInt(lines[PORT_LINE-1]);\r
-// if (port > 0) {\r
-// ProCoreClient lProCoreClient = new ProCoreClient();\r
-// SessionAddress sa = new SessionAddressImpl(new InetSocketAddress(LOCALHOST, port), true);\r
-// lProCoreClient.connect(sa);\r
-// try {\r
-// String reply = lProCoreClient.execute(QUIT_CMD);\r
-// int nConnect = Integer.parseInt(reply);\r
-// if (nConnect > 1)\r
-// throw new ProCoreException("More than one active connection in server while giving quit command. folder=" + guardFile.getParent());\r
-// } finally {\r
-// lProCoreClient.disconnect();\r
-// }\r
-// }\r
-// } catch (Throwable t) {\r
-// throw new ProCoreException("Failed to stop procore server cleanly.", t);\r
-// }\r
-// // Managed to send quit command to server, let's wait and see if it has any effect.\r
-// boolean dead = wait4ServerToDie(null, LONG_SLEEP_COUNT);\r
-// if (!dead)\r
-// throw new ProCoreException("Unable to stop server.");\r
-// }\r
- boolean isActive(ProCoreClient proCoreClient) throws ProCoreException {\r
- if (proCoreClient.isConnected())\r
- throw new ProCoreException("Illegal argument. ProCoreClient must not be connected.");\r
- String lines[] = getFileLines(guardFile, END_LINE);\r
- if (lines.length != PORT_LINE)\r
- return false;\r
- checkVersion(lines);\r
- try {\r
- int port = parseInt(lines[PORT_LINE-1]);\r
- if (port > 0) {\r
- try {\r
- // Checking if core is responsive.\r
- // This should be supported regardless of version.\r
- SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);\r
- proCoreClient.connect(sa);\r
- proCoreClient.execute(""); // Dubious, not necessarily supported by all protocol versions.\r
- return true;\r
- } catch (ProCoreException e) {\r
- } finally { // We must disconnect or check protocol version.\r
- proCoreClient.disconnect();\r
- }\r
- }\r
- } catch (Throwable t) {\r
- // Intentionally empty.\r
- }\r
- return false;\r
- }\r
- boolean isAlive() throws ProCoreException {\r
- String lines[] = getFileLines(guardFile, END_LINE);\r
- if (lines.length < 1)\r
- return false; // No lines, assuming server is dead.\r
- checkVersion(lines);\r
- if (lines.length < PID_LINE)\r
- return false; // No process id, assuming server is dead.\r
- else if (lines.length >= END_LINE)\r
- return false; // End status available, assuming server is dead.\r
- else if (lines.length >= EXIT_LINE)\r
- return false; // Exit status available, assuming server is dying.\r
- else if (isRunning(lines[PID_LINE-1]))\r
- return true; // Process with given id running. Server might be alive.\r
- else\r
- return false;// Process with given id not running.\r
- }\r
- }\r
- public static class TailFile {\r
- public static class Data {\r
- Data(long version, long nextChangeSetId, long nextFreeId) {\r
- this.version = version;\r
- this.nextChangeSetId = nextChangeSetId;\r
- this.nextFreeId = nextFreeId;\r
- }\r
- long version;\r
- public long nextChangeSetId;\r
- long nextFreeId;\r
- }\r
- public static void createTailFile(File file, long nextChangeSetId, long nextFreeId, String dbId) throws ProCoreException {\r
- if (file.exists())\r
- throw new TailException("Tail file exists and thus cannot be created. file=" + file);\r
- FileOutputStream fos;\r
- try {\r
- fos = new FileOutputStream(file);\r
- } catch (FileNotFoundException e) {\r
- throw new TailException("Tail file cannot be created. file=" + file);\r
- }\r
- PrintStream ps = new PrintStream(fos);\r
- try {\r
- ps.println(1); // file version\r
- ps.println(nextChangeSetId); // next revision\r
- ps.println(nextFreeId); // next free cluster\r
- ps.println(dbId); // database id\r
- } finally {\r
- ps.close();\r
- }\r
- }\r
- public static Data readTailFile(File file) throws ProCoreException {\r
- String lines[] = getFileLines(file, 3);\r
- if (lines.length < 3)\r
- throw new TailReadException("Could not read data from " + file);\r
- long ver = Long.parseLong(lines[0]);\r
- long cs = Long.parseLong(lines[1]);\r
- long id = Long.parseLong(lines[2]);\r
- return new Data(ver, cs, id);\r
- }\r
- }\r
-}\r
-class SessionAddress {\r
- private final InetSocketAddress socketAddress;\r
- private final boolean ignoreProtocolVersion;\r
- public SessionAddress(InetSocketAddress socketAddress, boolean ignoreProtocolVersion) {\r
- assert(null != socketAddress);\r
- this.socketAddress = socketAddress;\r
- this.ignoreProtocolVersion = ignoreProtocolVersion;\r
- }\r
- public InetSocketAddress getSocketAddress() {\r
- return socketAddress;\r
- }\r
- public boolean ignoreProtocolVersion() {\r
- return ignoreProtocolVersion;\r
- }\r
- @Override\r
- public String toString() {\r
- return socketAddress.toString();\r
- }\r
-}\r
-class PacketQueue {\r
- private final LinkedBlockingQueue<Packet> packets = new LinkedBlockingQueue<Packet>();\r
- PacketQueue() {\r
- }\r
- void addFirst(Packet packet) {\r
- packets.add(packet);\r
- }\r
- Packet takeFirst() throws InterruptedException {\r
- return packets.take();\r
- }\r
-}\r
-class Header {\r
- Header(int[] ints) {\r
- lastTokenIn = ints[0];\r
- token = ints[1];\r
- messageNumber = ints[2];\r
- inflateSize = ints[3];\r
- deflateSize = ints[4];\r
- }\r
- int lastTokenIn;\r
- int token;\r
- int messageNumber;\r
- int inflateSize;\r
- int deflateSize;\r
- @Override\r
- public String toString() {\r
- StringBuilder sb = new StringBuilder(80);\r
- sb.append("lastTokenIn=");\r
- sb.append(lastTokenIn);\r
- sb.append(" token=");\r
- sb.append(token);\r
- sb.append(" messageNumber=");\r
- sb.append(messageNumber);\r
- sb.append(" inflateSize=");\r
- sb.append(inflateSize);\r
- sb.append(" deflateSize=");\r
- sb.append(deflateSize);\r
- return sb.toString();\r
- }\r
-}\r
-class Packet {\r
- Header header;\r
- ByteBuffer bytes;\r
- Packet(int[] header, ByteBuffer bytes) {\r
- this.header = new Header(header);\r
- this.bytes = bytes;\r
- }\r
-}\r
-class ConnectionThread {\r
- private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;\r
- private static final int HEADER_N = 5;\r
- private static final int HEADER_SIZE = HEADER_N * 4;\r
- private static final int DEFLATE = 4;\r
- private static final int INCREMENT_SIZE = 8192;\r
- private static int instanceCount = 0;\r
- class Manager implements Runnable {\r
- private final int instance;\r
- private volatile boolean carryOn = true;\r
- private volatile int lastTokenIn = 0;\r
- private volatile MethodQueue methodQueue = new MethodQueue();\r
- private volatile CountDownLatch running = new CountDownLatch(1);\r
- private final EventHandler defaultEventHandler = new DefaultEventHandler();\r
-// private EventHandler eventHandler = defaultEventHandler;\r
- Manager() {\r
- instance = ++instanceCount;\r
- }\r
- @Override\r
- public String toString() {\r
- return "Connection " + instance;\r
- }\r
- InetSocketAddress getAddress() {\r
- return connection.getAddress();\r
- }\r
- boolean isConnected() {\r
- return connection.isConnected();\r
- }\r
- void reconnect() throws ProCoreException, InterruptedException {\r
- InetSocketAddress a = connection.getAddress();\r
- if (connection.addressNotInitialized() || 0 == a.getPort())\r
- throw new ProCoreException("Address not ok. address=" + connection.getAddress());\r
- connect(a);\r
- }\r
- void connect(int port) throws ProCoreException, InterruptedException {\r
- InetSocketAddress a = connection.getAddress();\r
- if (connection.addressNotInitialized() || 0 == a.getPort() || (port != 0 && port != a.getPort()))\r
- a = new InetSocketAddress(ProCoreClient.LOCALHOST, port);\r
- connect(a);\r
- }\r
- void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {\r
- if (connection.isConnected()) {\r
- if (connection.equalsAddress(address))\r
- return;\r
- else\r
- throw new ProCoreException("Already connected to different address. old=" + connection.getAddress() + " new=" + address);\r
- }\r
- connection.init4Connection(address);\r
- connectionManager.connect(connection);\r
-// compression = new Compression();\r
- if (thread.isAlive())\r
- return;\r
- thread.start();\r
- running.await();\r
- }\r
- void call(Method method) throws ProCoreException, InterruptedException, IOException {\r
-// eventHandler = method.getEventHandler(eventHandler);\r
- connection.sendMethod(methodQueue, method, lastTokenIn);\r
- }\r
- void disconnect() {\r
- methodQueue.close();\r
- connection.disconnect();\r
- }\r
- @Override\r
- public void run() { // MARK: CT.Manger.run\r
- try {\r
- running.countDown();\r
- while (carryOn) {\r
- Packet packet = null;\r
- try {\r
- packet = inputQueue.takeFirst();\r
- } catch (InterruptedException e) {\r
- Util.trace("Wait for input packet interrupted. this=" + this);\r
- continue;\r
- }\r
- if (null == packet)\r
- break;\r
- handlePacket(packet);\r
- }\r
- if (carryOn)\r
- Util.trace("Thread " + thread.getName() + " stopped with null packet.");\r
- else\r
- Util.trace("Thread " + thread.getName() + " stopped with false carryOn.");\r
- } catch (Throwable t) {\r
- Util.logError("Thread " + thread.getName() + " stopped to exception:", t);\r
- } finally {\r
- disconnect();\r
- running.countDown();\r
- }\r
- }\r
- private void handlePacket(Packet packet) throws ProCoreException {\r
- if (packet.header.token > lastTokenIn) {\r
- lastTokenIn = packet.header.token;\r
- if (DEBUG)\r
- Util.showDebug("Setting lastTokeinIn=" + lastTokenIn);\r
- }\r
- Method method = methodQueue.remove(packet.header.token);\r
- if (null == method) {\r
- try {\r
- switch (packet.header.messageNumber) {\r
- default: {\r
- Util.logError("Missing method. token=" + packet.header.token);\r
- } return;\r
- case MessageNumber.ChangeSetUpdateRequest: {\r
- defaultEventHandler.on(new ChangeSetUpdateEvent(packet));\r
- } break;\r
- }\r
- } catch (Throwable t) {\r
- Util.logError("Event handler failed:", t);\r
- }\r
- return;\r
- }\r
-// else\r
-// eventHandler = method.getEventHandler(defaultEventHandler);\r
- switch (packet.header.messageNumber) {\r
- default: {\r
- String s = "Illegal message number " + packet.header.messageNumber;\r
- method.gotException(s);\r
- } break;\r
- case MessageNumber.ChangeSetUpdateRequest:\r
- method.getEventHandler(defaultEventHandler).on(new ChangeSetUpdateEvent(packet));\r
- methodQueue.add(method); // Event caused by method. Actual method not yet handled.\r
- break;\r
- case MessageNumber.NullResponse:\r
- //Util.log("Null(ack) message. method=" + method + " packet=" + packet);\r
- break;\r
- case MessageNumber.ExceptionResponse:\r
- method.gotException(packet);\r
- break; // MARK: handlePacket\r
- case MessageNumber.AAAResponse:\r
- case MessageNumber.AskTransactionResponse:\r
- case MessageNumber.CancelCommitResponse:\r
- case MessageNumber.CloseClientSessionResponse:\r
- case MessageNumber.EchoResponse:\r
- case MessageNumber.EndTransactionResponse:\r
- case MessageNumber.ExecuteResponse:\r
- case MessageNumber.GetChangeSetContextResponse:\r
- case MessageNumber.GetChangeSetDataResponse:\r
- case MessageNumber.GetChangeSetsResponse:\r
- case MessageNumber.GetClusterNewResponse:\r
- case MessageNumber.GetRefreshResponse:\r
- case MessageNumber.GetResourceSegmentResponse:\r
- case MessageNumber.GetServerInfoResponse:\r
- case MessageNumber.OpenClientSessionResponse:\r
- case MessageNumber.RefreshResponse:\r
- case MessageNumber.ReserveIdsResponse:\r
- case MessageNumber.UndoResponse:\r
- case MessageNumber.ReconnectResponse:\r
- case MessageNumber.GetRefresh2Response:\r
- case MessageNumber.GetClusterChangesResponse:\r
- case MessageNumber.GetServerInfo2Response:\r
- case MessageNumber.ListClustersResponse: {\r
- method.gotPacket(packet);\r
- } break;\r
- }\r
- }\r
- }\r
- private final ConnectionManager connectionManager;\r
- private final PacketQueue inputQueue = new PacketQueue();\r
- private final Connection connection = new Connection(this);\r
- private final Manager manager = new Manager();\r
- private Thread thread = new Thread(manager, manager.toString());\r
- private int[] header = null;\r
- private int[] INTS = new int[HEADER_N];\r
- private int remaining = HEADER_SIZE;\r
- private int deflateSize;\r
- ConnectionThread(ConnectionManager connectionManager) {\r
- this.connectionManager = connectionManager;\r
- }\r
- ByteBuffer getInputBuffer() {\r
- return getInputBuffer(INCREMENT_SIZE);\r
- }\r
- ByteBuffer getInputBuffer(int sizeAtLeast) {\r
- ByteBuffer bb = ByteBuffer.allocate(Math.max(sizeAtLeast, INCREMENT_SIZE));\r
- bb.order(ByteOrder.LITTLE_ENDIAN);\r
- return bb;\r
- }\r
- @Override\r
- public String toString() {\r
- return "header=" + header + " remaining=" + remaining;\r
- }\r
- private String toString(ByteBuffer buffer, int nRead) {\r
- return this + " nRead=" + nRead + " buffer=" + buffer;\r
- }\r
- ByteBuffer handleInput(ByteBuffer buffer, int nRead) // nRead > 0\r
- throws IOException { // MARK: handleInput\r
- int position = buffer.position();\r
- if (null == header) {\r
- if (position < HEADER_SIZE)\r
- return buffer;\r
- else { // Got header.\r
- buffer.position(0);\r
- IntBuffer ib = buffer.asIntBuffer();\r
- ib.get(INTS);\r
- ib = null;\r
- header = INTS;\r
- deflateSize = header[DEFLATE];\r
- if (deflateSize < 0)\r
- throw new IOException("Illegal deflate size=" + deflateSize);\r
- remaining += deflateSize;\r
- if (remaining > buffer.limit()) { // We need more space. This is not optimal but a simple way to handle this case.\r
- ByteBuffer bb = getInputBuffer(remaining);\r
- buffer.position(0);\r
- buffer.limit(position);\r
- bb.put(buffer);\r
- return bb;\r
- }\r
- buffer.position(position);\r
- }\r
- }\r
- if (position < remaining)\r
- return buffer; // Need more data.\r
- else if (position > remaining) { // Got whole packet and then some. This is not optimal but a simple way to handle this case.\r
- ByteBuffer bb = getInputBuffer(remaining);\r
- buffer.position(0);\r
- buffer.limit(remaining);\r
- bb.put(buffer);\r
- inputQueue.addFirst(new Packet(header, bb));\r
- ByteBuffer b2 = getInputBuffer(buffer.limit());\r
- buffer.position(remaining);\r
- buffer.limit(position);\r
- b2.put(buffer);\r
- header = null;\r
- remaining = HEADER_SIZE;\r
- return handleInput(b2, b2.position());\r
- } else if (position != remaining)\r
- throw new IOException("Assertion error. this=" + toString(buffer, nRead));\r
- // Got exactly one packet.\r
- inputQueue.addFirst(new Packet(header, buffer));\r
- header = null;\r
- remaining = HEADER_SIZE;\r
- return getInputBuffer(INCREMENT_SIZE);\r
- }\r
- InetSocketAddress getAddress() {\r
- return manager.getAddress();\r
- }\r
- boolean isConnected() {\r
- return manager.isConnected();\r
- }\r
- void reconnect() throws ProCoreException, InterruptedException {\r
- manager.reconnect();\r
- }\r
- void connect(int port) throws ProCoreException, InterruptedException {\r
- manager.connect(port);\r
- }\r
- void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {\r
- manager.connect(address);\r
- }\r
- void disconnect() {\r
- manager.disconnect();\r
- }\r
- Connection call(Method m) throws ProCoreException, InterruptedException, IOException {\r
- manager.call(m);\r
- return connection;\r
- }\r
-}\r
-class Client {\r
- private final ProCoreServer proCoreServer;\r
- private final ProCoreClient proCoreClient;\r
- private final ConnectionThread connectionThread;\r
- private boolean closed = false;\r
- Client(ProCoreServer proCoreServer, ProCoreClient proCoreClient, ConnectionThread connectionThread) {\r
- this.proCoreServer = proCoreServer;\r
- this.proCoreClient = proCoreClient;\r
- this.connectionThread = connectionThread;\r
- }\r
- void open() throws ProCoreException, InterruptedException {\r
- synchronized (proCoreClient) {\r
- if (!connectionThread.isConnected())\r
- connectionThread.connect(proCoreServer.getPort());\r
- }\r
- closed = false;\r
- }\r
- boolean isOpen() {\r
- synchronized (proCoreClient) {\r
- return connectionThread.isConnected();\r
- }\r
- }\r
- boolean isClosed() {\r
- synchronized (proCoreClient) {\r
- return closed || !connectionThread.isConnected();\r
- }\r
- }\r
- void close() {\r
- proCoreClient.deleteClient(this);\r
- closed = true;\r
- }\r
- String execute(String command) throws ProCoreException {\r
- return proCoreClient.execute(command);\r
- }\r
- void call(Method m) throws ProCoreException {\r
- proCoreClient.call(m);\r
- }\r
-}\r
-class ProCoreClient {\r
- public static final String LOCALHOST = "127.0.0.1";\r
- class Remote {\r
- private final ConnectionManager connectionManager = ConnectionManager.getInstance();\r
- private final ConnectionThread connectionThread = new ConnectionThread(connectionManager);\r
- Remote() {\r
- }\r
- }\r
- private final Remote remote = new Remote();\r
- private final Set<Client> clients = new HashSet<Client>();\r
- private final ProCoreServer proCoreServer;\r
- public ProCoreClient(ProCoreServer proCoreServer) throws ProCoreException {\r
- this.proCoreServer = proCoreServer;\r
- }\r
- synchronized public int getPort() {\r
- return remote.connectionThread.getAddress().getPort();\r
- }\r
- synchronized public boolean isConnected() {\r
- return remote.connectionThread.isConnected();\r
- }\r
- synchronized public Client newClient() throws ProCoreException {\r
- return newClient(proCoreServer, remote.connectionThread.getAddress());\r
- }\r
- synchronized public Client newClient(ProCoreServer proCoreServer, InetSocketAddress address) throws ProCoreException {\r
- if (!remote.connectionThread.isConnected())\r
- try {\r
- remote.connectionThread.connect(address);\r
- } catch (InterruptedException e) {\r
- if (!remote.connectionThread.isConnected()) {\r
- remote.connectionThread.disconnect();\r
- throw new ProCoreException("Failed to create new client.");\r
- }\r
- }\r
- else if (!address.equals(remote.connectionThread.getAddress()))\r
- throw new ProCoreException("Illegal newClient call. Already connected to different address.");\r
- Client client = new Client(proCoreServer, this, remote.connectionThread);\r
- clients.add(client);\r
- return client;\r
- }\r
- synchronized int deleteClient(Client client) {\r
- clients.remove(client);\r
- return clients.size();\r
- }\r
- void closeClient(int aSessionId) throws ProCoreException {\r
- try {\r
- CloseClientSessionFunction f = new CloseClientSessionFunction(aSessionId);\r
- Method m = new Method(f, null, null);\r
- Connection c = remote.connectionThread.call(m);\r
- m.waitForReply(c);\r
- } catch (InterruptedException e) {\r
- throw new ProCoreException("Thread interrupted during method call.", e);\r
- } catch (IOException e) {\r
- throw new ProCoreException("IOException during method call.", e);\r
- } catch (Throwable e) {\r
- throw new ProCoreException("Throwable during method call.", e);\r
- }\r
- }\r
- synchronized public String execute(String command) throws ProCoreException {\r
- try {\r
- ExecuteFunction f = new ExecuteFunction(command);\r
- Method m = new Method(f, null, null);\r
- Connection c = remote.connectionThread.call(m);\r
- m.waitForReply(c);\r
- return f.out;\r
- } catch (InterruptedException e) {\r
- throw new ProCoreException("Thread interrupted during method call.", e);\r
- } catch (IOException e) {\r
- throw new ProCoreException("IOException during method call.", e);\r
- } catch (Throwable e) {\r
- throw new ProCoreException("Throwable during method call.", e);\r
- }\r
- }\r
- private void recall(Method method, IOException e) throws ProCoreException {// :)\r
- if (remote.connectionThread.isConnected())\r
- throw new ProCoreException("IOException during method call.", e);\r
- try {\r
- remote.connectionThread.reconnect();\r
- Connection c = remote.connectionThread.call(method);\r
- method.waitForReply(c);\r
- } catch (Throwable t) {\r
- throw new ProCoreException("IOException during method call. Recall failed.", e);\r
- }\r
- }\r
- synchronized public void call(Method method) throws ProCoreException {\r
- try {\r
- Connection c = remote.connectionThread.call(method);\r
- method.waitForReply(c);\r
- } catch (InterruptedException e) {\r
- throw new ProCoreException("Thread interrupted during method call.", e);\r
- } catch (IOException e) {\r
- recall(method, e);\r
- } catch (ProCoreException e) {\r
- throw e;\r
- } catch (Throwable e) {\r
- throw new ProCoreException("Throwable during method call.", e);\r
- }\r
- }\r
- synchronized public void connect(int port) throws ProCoreException, InterruptedException {\r
- remote.connectionThread.connect(port);\r
- }\r
- synchronized public void connect(SessionAddress sessionAddress)\r
- throws ProCoreException, InterruptedException {\r
- remote.connectionThread.connect(sessionAddress.getSocketAddress());\r
- }\r
- synchronized public void disconnect() throws ProCoreException {\r
- remote.connectionThread.disconnect();\r
- }\r
-}\r
-abstract class Event {\r
- final String name;\r
- final Packet packet;\r
- Event(String name, Packet packet) {\r
- this.name = name;\r
- this.packet = packet;\r
- }\r
- int getToken() {\r
- return packet.header.token;\r
- }\r
- void deserialize(org.simantics.db.server.protocol.AbstractEvent event) {\r
- event.deserialize(event.getRequestNumber(), getDataBuffer());\r
- }\r
- private org.simantics.db.server.protocol.DataBuffer getDataBuffer() {\r
- this.packet.bytes.position(20);\r
- return new org.simantics.db.server.protocol.DataBuffer(this.packet.bytes);\r
- }\r
- @Override public String toString() {\r
- return name + " token=" + packet.header.token;\r
- }\r
-}\r
-class ChangeSetUpdateEvent extends Event {\r
- ChangeSetUpdateEvent(Packet packet) {\r
- super("ChangeSetUpdateEvent", packet);\r
- }\r
-}\r
-abstract class EventHandler {\r
- abstract void on(Event event);\r
-}\r
-class DefaultEventHandler extends EventHandler {\r
- void on(Event event) {\r
- Util.log("Missing event handler. event=" + event);\r
- }\r
-}\r
+/*******************************************************************************
+
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.db.server.internal;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.server.DatabaseLastExitException;
+import org.simantics.db.server.GuardFileVersionException;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.db.server.ServerNotFoundException;
+import org.simantics.db.server.protocol.CloseClientSessionFunction;
+import org.simantics.db.server.protocol.ExecuteFunction;
+import org.simantics.db.server.protocol.MessageNumber;
+
+public class ProCoreServer {
+ private static Map<String, ProCoreServer> workingDirs = new HashMap<String, ProCoreServer>();
+ private static Map<String, ProCoreServer> databaseIds = new HashMap<String, ProCoreServer>();
+ private static final String LOCALHOST = ProCoreClient.LOCALHOST;
+ public static final int VERSION_MAJOR = 1;
+ public static final int VERSION_MINOR = 4; // 2015-05-05
+ public static final String BRANCH_DIR = "procore.headClusters.procore";
+ public static final String TAIL_DIR = "procore.tailClusters.procore";
+ public static final String TAIL_FILE = "procore.tail.procore";
+ public static final String CONFIG_FILE = "procore.config.procore";
+ public static final String GUARD_FILE = "procore.guard.procore";
+ public static final String JOURNAL_FILE = "procore.journal.procore";
+ public static final String LOG_FILE = "procore.log.procore";
+ public static final String DCS_FILE = "procore.dumpChangeSets.procore";
+ public static final String RECOVERY_NEEDED_FILE = "recovery.needed";
+ public static final String RECOVERY_IGNORED_FILE = "recovery.ignored";
+ public static final String PROTOCOL_IGNORED_FILE = "protocol.ignored";
+ public static final String PAGE_FILE_PATTERN = "procore.page*.procore";
+ public static final String DATA_PREFIX = "ClusterData.";
+ public static final String DATA_PATTERN = DATA_PREFIX + "*";
+ public static final String INDEX_PREFIX = "ClusterIndex.";
+ public static final String INDEX_PATTERN = INDEX_PREFIX + "*";
+ public static final String VALUE_PREFIX = "ExternalValue.";
+ public static final String VALUE_PATTERN = VALUE_PREFIX + "*";
+ public static final String VALUE_SUFFIX = ".procore";
+ public static final String DELETED_PREFIX = "ClusterDeleted.";
+ public static final String DELETED_PATTERN = DELETED_PREFIX + "*";
+ private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;
+ public static final String QUIT_CMD = "quit";
+// private static final int SHORT_SLEEP_COUNT = 1;
+// private static final int LONG_SLEEP_COUNT = 10;
+// static public void stopProCoreServer(File workingDir) throws ProCoreException {
+// GuardFile guardFile = new GuardFile(workingDir);
+// guardFile.stopProCoreServer();
+// }
+ private static boolean isRunning(String aPid) throws ProCoreException {
+ boolean isRunning = false;
+ try {
+ switch (OSType.calculate()) {
+ case WINDOWS: {
+ Process p = Runtime.getRuntime().exec("tasklist.exe /fo csv /nh /fi \"pid eq " + aPid + "\"");
+ BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+ String line = input.readLine();
+ if (line.matches(".*ProCoreServer.exe.*"))
+ isRunning = true;
+ input.close();
+ break;
+ }
+ case UNIX: {
+ Process p = Runtime.getRuntime().exec("ps -p " + aPid + " -o comm=");
+ BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+ String line = input.readLine();
+ if (line.matches(".*ProCoreServer.*"))
+ isRunning = true;
+ input.close();
+ break;
+ }
+ default:
+ }
+ } catch (Exception e) {
+ throw new ProCoreException("Could not get list of running processes.", e);
+ }
+ return isRunning;
+ }
+ static class ProCoreProcess {
+ private enum State { Created, Initialized, ThreadStarted, ProcessStarted, Prompted, GotPort, ThreadStopped };
+ class Manager implements Runnable {
+ private State state = State.Created;
+ private CountDownLatch serverPrompted;
+ private LinkedBlockingQueue<String> reply;
+ private volatile String command;
+ private volatile Integer exitValue;
+ private volatile boolean carryOn;
+ private volatile int port;
+ private final File dbFolder; // Just for exception i.e. error reporting.
+ Manager(File dbFolder) {
+ this.dbFolder = dbFolder;
+ init();
+ }
+ private void init() {
+ serverPrompted = new CountDownLatch(1);
+ reply = new LinkedBlockingQueue<String>();
+ exitValue = null;
+ carryOn = true;
+ port = 0;
+ state = State.Initialized;
+ }
+ int getPort() {
+ return port;
+ }
+ void tryToStart() throws ProCoreException, InterruptedException {
+ if (thread.isAlive())
+ return;
+ thread = new Thread(manager, manager.initAndGetName());
+ thread.start();
+ serverPrompted.await();
+ if (!state.equals(State.Prompted))
+ throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");
+ String reply = giveCommand("print port");
+ try {
+ port = Integer.parseInt(reply);
+ } catch (NumberFormatException e) {
+ if (null != exitValue && 0 != exitValue)
+ throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");
+ if (reply.equals("End of server thread."))
+ throw new ProCoreException("Server did not start (did not prompt). state=" + state);
+ else
+ throw new ProCoreException("Server did not start. Could not parse port. reply=" + reply);
+ }
+ state = State.GotPort;
+ }
+ String giveCommand(String command) throws ProCoreException, InterruptedException {
+ if (!thread.isAlive())
+ throw new ProCoreException("Server thread not alive.");
+ if (null != this.command)
+ throw new ProCoreException("Old command pending.");
+ this.command = command;
+ return reply.take();
+ }
+ private String initAndGetName() {
+ init();
+ return "ProCoreProcess " + ++instanceCount + " " + builder.directory();
+ }
+ boolean isActive() throws ProCoreException {
+ if (null == exitValue && thread.isAlive())
+ return true;
+ else
+ return false;
+ }
+ boolean tryToStop() throws ProCoreException {
+ if (!thread.isAlive())
+ return true;
+ carryOn = false;
+ thread.interrupt();
+ try {
+ thread.join(10000); // milliseconds
+ } catch (InterruptedException e) {
+ }
+ return !thread.isAlive();
+ }
+ @Override
+ public void run() {
+ state = State.ThreadStarted;
+ Util.log("ProCoreServer thread started.");
+ try {
+ Util.trace("ProCoreProcessManager start.");
+ Process process = builder.start();
+ exitValue = exitValue(process);
+ if (null != exitValue)
+ Util.logError("Server process did not start.");
+ else {
+ state = State.ProcessStarted;
+ runProcessStarted(process); // Process start
+ }
+ } catch (IOException e) {
+ Util.logError("Failed during process watch.");
+ } finally {
+ reply.add("End of server thread.");
+ Util.trace("ProCoreServerThread stop.");
+ state = State.ThreadStopped;
+ serverPrompted.countDown();
+ }
+ }
+ private void runProcessStarted(Process process) throws IOException {
+ try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));)
+ {
+ do {
+ try {
+ if (reader.ready())
+ handleLine(reader.readLine(), reader, writer);
+ else if (null != command) {
+ writer.write(command);
+ command = null;
+ writer.newLine();
+ writer.flush();
+ String line = getReply(reader);
+ reply.add(line);
+ } else
+ Thread.sleep(100); // milliseconds
+ } catch (InterruptedException e) {
+ Util.trace("ProCoreServerThread interrupted.");
+ }
+ exitValue = exitValue(process);
+ if (null == exitValue && !carryOn)
+ try {
+ try {
+ writer.write(QUIT_CMD);
+ writer.newLine();
+ writer.flush();
+ } catch (Throwable t) {
+ Util.logError("Wait for process death was interrupted.", t);
+ } finally {
+ }
+ exitValue = process.waitFor();
+ } catch (InterruptedException e) {
+ Util.logError("Wait for process death was interrupted.", e);
+ carryOn = true;
+ continue;
+ }
+ } while (null == exitValue);
+ } // End of resource try.
+ }
+ private String getReply(BufferedReader reader) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ while (true) {
+ String line = reader.readLine();
+ if (null == line || line.startsWith("Waiting for input?"))
+ break;
+ sb.append(line);
+ }
+ return sb.toString();
+ }
+ private void handleLine(String line, BufferedReader reader, BufferedWriter writer) throws IOException {
+ if (DEBUG)
+ Util.showDebug(line);
+ if (line.startsWith("Main end.")) {
+ carryOn = false; // Last line from server.
+ } else if (serverPrompted.getCount() > 0 && line.startsWith("Waiting for input?")) {
+ state = State.Prompted;
+ serverPrompted.countDown(); // Server has been started and is waiting for console input.
+ } else
+ Util.trace("From server. line=" + line);
+ }
+ private Integer exitValue(Process process) {
+ try {
+ return process.exitValue();
+ } catch (IllegalThreadStateException e) {
+ return null; // Still alive.
+ }
+ }
+ }
+ private static int instanceCount; // Keeps count of thread instances.
+ private final ProcessBuilder builder;
+ private final Manager manager;
+ private Thread thread;
+ ProCoreProcess(File exeFile, File workingDir) {
+ String[] args = new String[0];
+ String[] cmd = new String[1 + args.length];
+ cmd[0] = exeFile.getAbsolutePath().toString();
+ System.arraycopy(args, 0, cmd, 1, args.length);
+ builder = new ProcessBuilder(cmd);
+ builder.redirectErrorStream(true);
+ builder.directory(workingDir);
+ manager = new Manager(workingDir);
+ thread = new Thread(manager, manager.initAndGetName());
+ }
+ public void tryToStart() throws ProCoreException, InterruptedException {
+ manager.tryToStart();
+ }
+// public int exitValue() throws IllegalStateException {
+// Integer i = proCoreServerThread.getExitValue();
+// if (null == i)
+// throw new IllegalStateException("ProCoreServerThread is still running.");
+// return i;
+// }
+// public boolean ignoreProtocolVersion() {
+// File wd = builder.directory();
+// if (null == wd)
+// return false;
+// Path ignore = wd.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE);
+// if (Files.exists(ignore))
+// return true;
+// return false;
+// }
+ public boolean isActive() throws ProCoreException {
+ return manager.isActive();
+ }
+ public boolean tryToStop() throws ProCoreException {
+ return manager.tryToStop();
+ }
+ int getPort() throws ProCoreException {
+ return manager.getPort();
+ }
+ }
+ private static enum OSType {
+ UNIX, WINDOWS, UNKNOWN;
+
+ public static OSType calculate() {
+ String osName = System.getProperty("os.name");
+ assert osName != null;
+ osName = osName.toLowerCase();
+ if (osName.startsWith("windows"))
+ return WINDOWS;
+ if (osName.startsWith("mac os x") || osName.startsWith("linux") || osName.startsWith("sun"))
+ return UNIX;
+ return UNKNOWN;
+ }
+ }
+ static public boolean ignoreProtocolVersion(File dbFolder) {
+ if (null == dbFolder)
+ return false;
+ if (Files.exists(dbFolder.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE)))
+ return true;
+ return false;
+ }
+ static public ProCoreServer getProCoreServer(File serverDir, final File workingDir)
+ throws ProCoreException, ServerNotFoundException {
+ try {
+ ProCoreServer proCoreServer;
+ synchronized (workingDirs) {
+ proCoreServer = workingDirs.get(workingDir.getCanonicalPath());
+ if (null != proCoreServer)
+ return proCoreServer;
+ if (!serverDir.isDirectory())
+ throw new ProCoreException("Expected server directory as argument");
+ if (!workingDir.isDirectory())
+ throw new ProCoreException("Expected working directory as argument. wd=" + workingDir.getAbsolutePath());
+ String exeSuffix = Activator.getExeSuffix();
+ File executable = new File(serverDir, "ProCoreServer" + exeSuffix);
+ proCoreServer = new ProCoreServer(executable, workingDir);
+ workingDirs.put(workingDir.getCanonicalPath(), proCoreServer);
+ return proCoreServer;
+ }
+ } catch (IOException e) {
+ throw new ProCoreException("IOException", e);
+ }
+ }
+
+ public static void logVersionInformation() {
+ StringBuilder msg = new StringBuilder(200);
+ String nl = " \n"; // Default log error removes newlines so I add space to make the text easier to read.
+ msg.append("ProCore version information:");
+ msg.append(nl);
+ msg.append("ProCoreServer version=" + ProCoreServer.VERSION_MAJOR + "." + ProCoreServer.VERSION_MINOR + " protocol=" + MessageNumber.ProtocolVersionMajor + "." + MessageNumber.ProtocolVersionMinor);
+ try {
+ msg.append(" folder=" + org.simantics.db.server.internal.Activator.getServerFolder());
+ } catch (IOException e) {
+ msg.append(" folder information not available");
+ }
+ msg.append(nl);
+ msg.append("svn id=$Id: ProCoreServer.java r31684 2015-09-10 13:45:00Z $");
+ String str = msg.toString();
+ Logger.defaultLog(str);
+ }
+ private static String[] getFileLines(final File aFile, int maxLines) {
+ final String[] STRING_EMPTY_ARRAY = new String[0];
+ try {
+ if (!aFile.canRead()) {
+ return STRING_EMPTY_ARRAY;
+ }
+ try (FileInputStream fis = new FileInputStream(aFile);
+ InputStreamReader isr = new InputStreamReader(fis, "US-ASCII");
+ BufferedReader br = new BufferedReader(isr);){
+ String line = null;
+ int count = 0;
+ ArrayList<String> lines = new ArrayList<String>();
+ while ((line = br.readLine()) != null) {
+ if (++count > maxLines)
+ break; // Caller not interested in more lines.
+ lines.add(line);
+ }
+ return lines.toArray(STRING_EMPTY_ARRAY);
+ } catch (FileNotFoundException e) {
+ return STRING_EMPTY_ARRAY;
+ }
+ } catch (Throwable t) {
+ if (DEBUG)
+ t.printStackTrace();
+ Logger.defaultLogError(t);
+ }
+ return new String[0];
+ }
+ private static int parseInt(String s)
+ throws ProCoreException {
+ try {
+ return Integer.parseInt(s);
+ } catch (NumberFormatException e) {
+ String msg = "Could not convert string to number. string=" + s;
+ Logger.defaultLogError(msg);
+ throw new ProCoreException(msg, e);
+ }
+ }
+ private String databaseId = null;
+ private final GuardFile guardFile;
+ private final ProCoreClient proCoreClient;
+ ProCoreClient getProCoreClient() {
+ return proCoreClient;
+ }
+ private final ProCoreProcess proCoreProcess;
+ private ProCoreServer(File exeFile, File workingDir)
+ throws ProCoreException {
+ if (null == exeFile)
+ throw new ProCoreException("Illegal argument: exeFile is null.");
+ if (null == workingDir)
+ throw new ProCoreException("Illegal argument: workingDir is null.");
+ guardFile = new GuardFile(workingDir);
+ proCoreClient = new ProCoreClient(this);
+ proCoreProcess = new ProCoreProcess(exeFile, workingDir);
+ Util.log("ProCoreServer exe: " + exeFile);
+ Util.log("ProCoreServer folder: " + workingDir);
+ logVersionInformation();
+ }
+// //@Override
+ public synchronized String execute(String command) throws ProCoreException, InterruptedException {
+ return proCoreClient.execute(command);
+ }
+ private void startInternal()
+ throws ProCoreException {
+ if (proCoreClient.isConnected())
+ return;
+ Exception exception;
+ try {
+ proCoreProcess.tryToStart();
+ int port = proCoreProcess.getPort();
+ SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);
+ proCoreClient.connect(sa);
+ return;
+ } catch (InterruptedException e) {
+ Util.logError("ProCoreProcess start was interrupted.", e);
+ exception = e;
+ } catch (DatabaseLastExitException e) {
+ throw e; // This situation can be analyzed and handled by start handler.
+ } catch (ProCoreException e) {
+ Util.logError("Failed to start ProCoreServer process.", e);
+ exception = e;
+ }
+ boolean isActive = guardFile.isActive(proCoreClient);
+ if (!isActive)
+ throw new ProCoreException("Failed to connect to ProCoreServer.", exception);
+ }
+// private int getWaitCount() throws ProCoreException {
+// if (!proCoreProcess.isActive())
+// return SHORT_SLEEP_COUNT; // Short wait because server process is not running.
+// else
+// return LONG_SLEEP_COUNT;
+// }
+// private void startInternalReinit(int exitStatus)
+// throws ProCoreException {
+// switch (exitStatus) {
+// default: // What has happened here?
+// throw new DatabaseStartException(workingDir, getExitMessage(exitStatus));
+// case -1: // Exit status not available, not sure what has happened here.
+// case 29: // Server died because it could not read exit status from guard file.
+// case 30: // Server died because old exit status was not zero.
+// case 40: // Server died because it received terminating signal.
+// case 42: // Server died because it could not read end status from guard file.
+// case 43: // Server died because end value was not zero.
+// case 44: // Server died because server could not read version from guard file.
+// case 45: // Server died because server did not support guard file version.
+// case 47: // Server died because server could not listen to any port.
+// break;
+// case 32: // Server died becuase database version did not match.
+// throw new DatabaseVersionException(workingDir, getExitMessage(exitStatus));
+// case 33: // Server died because database has been corrupted.
+// throw new DatabaseCorruptedException(workingDir, getExitMessage(exitStatus));
+// case 39: // Server died because last exit was not clean.
+// throw new DatabaseLastExitException(workingDir, getExitMessage(exitStatus));
+// }
+// boolean dead = guardFile.wait4ServerToDie(proCoreClient, getWaitCount());
+// if (!dead) {
+// StringBuilder sb = new StringBuilder(256);
+// sb.append("ProCoreServer is running according to guard file. Giving up trying to start a new instance. Server exit message:\n");
+// sb.append(getExitMessage(exitStatus));
+// throw new DatabaseStartException(workingDir, sb.toString());
+// }
+// boolean deleted = guardFile.delete();
+// if (!deleted)
+// Logger.defaultLogError("ProCoreServer is dead but can not delete guard file. " + getExitMessage(exitStatus));
+// }
+ //@Override
+ public synchronized boolean isActive()
+ throws ProCoreException {
+ if (!proCoreClient.isConnected())
+ return guardFile.isActive(proCoreClient);
+ proCoreClient.execute("");
+ return true;
+ }
+ public synchronized boolean isAlive()
+ throws ProCoreException {
+ if (proCoreClient.isConnected())
+ return true;
+ if (proCoreProcess.isActive())
+ return true;
+ return guardFile.isAlive();
+ }
+ Client newClient() throws ProCoreException {
+ if (proCoreClient.isConnected())
+ return proCoreClient.newClient();
+ int port = getPort();
+ InetSocketAddress isa = new InetSocketAddress(LOCALHOST, port);
+ return proCoreClient.newClient(this, isa);
+ }
+ public synchronized boolean tryToStop()
+ throws ProCoreException {
+ synchronized (proCoreClient) {
+ ProCoreException ex = null;
+ if (!proCoreClient.isConnected() && !proCoreProcess.isActive()) {
+ try {
+ proCoreClient.connect(getPort());
+ } catch (InterruptedException e) {
+ }
+ }
+ if (proCoreClient.isConnected()) {
+ String s = "";
+ try {
+ s = proCoreClient.execute(QUIT_CMD);
+ String t = s.replaceAll("\n", "");
+ int n = Integer.parseInt(t);
+ if (n != 1)
+ Util.log("Number of connections after quit command is " + n + ".");
+ proCoreClient.closeClient(0);
+ if (!isLocal() && n > 1)
+ throw new ProCoreException("More than one connection for remote. n=" + n);
+ } catch (NumberFormatException e) {
+ Util.logError("Failed to parse number of connections. s=" + s, e);
+ } catch (ProCoreException e) {
+ ex = e;
+ } finally {
+ proCoreClient.disconnect();
+ }
+ }
+ try {
+ proCoreProcess.tryToStop();
+ } finally {
+ if (proCoreProcess.isActive() && null == ex)
+ ex = new ProCoreException("Failed to stop ProCoreProcess.");
+ }
+ if (null != ex)
+ throw ex;
+ return !proCoreClient.isConnected() && !proCoreProcess.isActive();
+ }
+ }
+ public synchronized boolean isConnected()
+ throws ProCoreException {
+ return proCoreClient.isConnected();
+ }
+ public synchronized boolean isLocal()
+ throws ProCoreException {
+ return proCoreClient.isConnected() && proCoreProcess.isActive();
+ }
+ public synchronized void disconnect()
+ throws ProCoreException {
+ proCoreClient.disconnect();
+ }
+ public synchronized void connect()
+ throws ProCoreException, InterruptedException {
+ int port = getPort();
+ if (0 == port)
+ throw new ProCoreException("Port 0 not supported as connection address.");
+ proCoreClient.connect(port);
+ }
+ private String getExitMessage(int exitValue) {
+ switch (exitValue) {
+ default:
+ return "Unexpected exit value = " + exitValue + ".";
+ case 1:
+ return "Exception was thrown. This indicates problems with program logic.";
+ case 2:
+ return "CallException was thrown. This indicates problems with calls to/from parent server.";
+ case 3:
+ return "ProtocolException was thrown. This indicates problems with executable versions between local and parent servers.";
+ case 4:
+ return "NoSuchElementException was thrown. This indicates problems with program logic.";
+ case 5:
+ return "IllegalArgumentException was thrown. This indicates problems with program logic.";
+ case 6:
+ return "IllegalResourceException was thrown. This indicates problems with server data.";
+ case 7:
+ return "IllegalClusterException was thrown. This indicates problems with server data.";
+ case 8:
+ return "ParseException was thrown. This indicates problems with/during parsing of the configuration file.";
+ case 9:
+ return "PortSemaphoreException was thrown. This indicates that some other server is using the same port number.";
+ case 10:
+ return "DatabaseSemaphoreException was thrown. This indicates that some other server is using the same database id.";
+ case 11:
+ return "SystemException was thrown. This indicates problems with program logic.";
+ case 12:
+ return "CppException was thrown. This indicates problems with program logic.";
+ case 13:
+ return "UnknownException was thrown. This indicates problems with program logic.";
+ case 14:
+ return "ThrowException was thrown. This indicates problems with program logic.";
+ case 15:
+ return "AssertException was thrown. This indicates problems with program logic.";
+ case 16:
+ return "ParentCheckException was thrown. This indicates problems with the database version of parent.";
+ case 17:
+ return "ParentConnectionException was thrown. This indicates problems with parent child relationship.";
+ case 18:
+ return "ConnectionException was thrown. This indicates problems with TCP/IP connections.";
+ case 19:
+ return "ProxyException was thrown. This indicates problems with TCP/IP connections to proxy.";
+ case 20:
+ return "NoSuchFileException was thrown. This indicates problems with a missing file.";
+ case 21:
+ return "JournalException was thrown. This indicates problems with journal mechanisim.";
+ case 22:
+ return "OutOfSpaceException was thrown. This indicates problems with memory.";
+ case 23:
+ return "ExternalValueException was thrown. This indicates problems with large value handling.";
+ case 24:
+ return "IllegalTransactionException was thrown. This indicates problems with transaction logic.";
+ case 25:
+ return "WriteTransactionException was thrown. This indicates that server failed during write transaction.";
+ case 26:
+ return "ConsoleException was thrown. This indicates that server console was closed without quit command.";
+ case 27:
+ return "ExitException was thrown. This indicates that server did not exit cleanly.";
+ case 28:
+ return "ExitReadPIdException was thrown. This indicates that server did not exit cleanly.";
+ case 29:
+ return "ExitReadStatusException was thrown. This indicates that server could not read exit status from guard file.";
+ case 30:
+ return "ExitStatusValueException was thrown. This indicates that server did not exit cleanly.";
+ case 31:
+ return "UpdateException was thrown. This indicates that server failed during update.";
+ case 32:
+ return "DatabaseVersionException was thrown. This indicates that server can not use given database.";
+ case 33:
+ return "DatabaseCorruptionException was thrown. This indicates that server has detected database corruption.";
+ case 34:
+ return "ExitReadPortException was thrown. This indicates that server did not start or exit cleanly.";
+ case 35:
+ return "ExitGuardOpenException was thrown. This indicates that server could not open guard file.";
+ case 36:
+ return "ExitGuardOpenWriteException was thrown. This indicates that server could not open guard file for writing.";
+ case 37:
+ return "ExitGuardWritePidException was thrown. This indicates that server could not write pid to guard file.";
+ case 38:
+ return "ExitGuardWritePortException was thrown. This indicates that server could not write port to guard file.";
+ case 39:
+ return "LastCommitNotCleanException was thrown. This indicates that client died during transaction. Recovery action needed.";
+ case 40:
+ return "TerminatedException was thrown. This indicates that server died because it received terminating signal.";
+ case 41:
+ return "ExitGuardWriteStatusException was thrown. This indicates that server could not write exit status to guard file.";
+ case 42:
+ return "EndReadStatusException was thrown. This indicates that server died because it could not read end status from guard file.";
+ case 43:
+ return "EndStatusValueException was thrown. This indicates that server died because end value was not zero.";
+ case 44:
+ return "ExitGuardReadVersionException was thrown. This indicates that server died because server could not read version from guard file.";
+ case 45:
+ return "ExitGuardValueVersionException was thrown. This indicates that server died because server did not support guard file version.";
+ case 46:
+ return "ExitGuardWriteVersionException was thrown. This indicates that server died because server could not write version to guard file.";
+ case 47:
+ return "ListenPortException was thrown. This indicates that server died because server could not listen to given port or any other port.";
+ case 48:
+ return "JournalVersionException was thrown. This indicates that database was written by older version.";
+ case 49:
+ return "TailFileException was thrown. This indicates problems with handling of data involved with purge operation.";
+ }
+ }
+ synchronized void start()
+ throws ProCoreException {
+ try {
+ startInternal();
+ } catch (DatabaseLastExitException e) {
+ throw e; // This can be analyzed and handled by start handler.
+ } catch (Throwable t) {
+ ProCoreException pe = null;
+ if (t instanceof ProCoreException)
+ pe = (ProCoreException)t;
+ else {
+ String msg = "ProCoreServer startup failed."
+ + " Automatic rcovery handling not implemented."
+ + " Database must be fixed manually.";
+ pe = new ProCoreException(msg, t);
+ }
+ if (null == pe.getDbFolder())
+ pe.setDbFolder(proCoreProcess.builder.directory());
+ }
+ }
+ synchronized void stop()
+ throws ProCoreException, InterruptedException{
+ if (null != databaseId)
+ databaseIds.remove(databaseId);
+ boolean dead = tryToStop();
+ if (!dead)
+ throw new ProCoreException("Failed to stop ProCoreServer.");
+ }
+ public synchronized String getExitMessage()
+ throws ProCoreException {
+ Integer returnValue = guardFile.getExitValue();
+ if (null == returnValue)
+ throw new ProCoreException("Exit message not available.");
+ return getExitMessage(returnValue);
+ }
+ //@Override
+ public synchronized ServerAddress getAddress()
+ throws ProCoreException {
+ return new ServerAddress(LOCALHOST, getPort());
+ }
+ //@Override
+ public synchronized int getPort()
+ throws ProCoreException {
+ int port = 0;
+ if (proCoreClient.isConnected()) {
+ port = proCoreClient.getPort();
+ if (0 != port)
+ return port;
+ }
+ if (proCoreProcess.isActive()) {
+ port = proCoreProcess.getPort();
+ if (0 != port)
+ return port;
+ }
+ return guardFile.getPort();
+ }
+ public synchronized TailFile.Data getTailData()
+ throws ProCoreException {
+ Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.TAIL_DIR);
+ return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());
+ }
+// public synchronized TailFile.Data getTailDataFromHead()
+// throws ProCoreException {
+// Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.BRANCH_DIR);
+// return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());
+// }
+ public static class GuardFile {
+ public static final String GUARD_FILE = ProCoreServer.GUARD_FILE;
+ private static final int VERSION_LINE = 1;
+ private static final int PID_LINE = 2;
+ private static final int PORT_LINE = 3;
+ private static final int EXIT_LINE = 4;
+ private static final int END_LINE = 5;
+ private final File guardFile;
+ GuardFile(File workingDir) {
+ guardFile = new File(workingDir, GUARD_FILE);
+ }
+ int getPort() throws ProCoreException {
+ String[] lines = getFileLines(guardFile, END_LINE);
+ if (lines.length < PORT_LINE)
+ throw new ProCoreException("Server port is not available. " + guardFile.getAbsolutePath());
+ return parseInt(lines[PORT_LINE-1]);
+ }
+ private void checkVersion(String[] lines) throws ProCoreException {
+ if (lines.length < VERSION_LINE)
+ throw new ProCoreException("Guard file version is not available. " + guardFile.getAbsolutePath());
+ int version = parseInt(lines[VERSION_LINE-1]);
+ if (1 != version)
+ throw new GuardFileVersionException(guardFile, "Unsupported guard file version. version=" + version);
+ }
+ boolean delete() {
+ if (guardFile.exists())
+ return guardFile.delete();
+ else
+ return true; // Guard file is deleted i.e. does not exit.
+ }
+ void deleteLog() {
+ if (!guardFile.exists())
+ return;
+ try {
+ Files.delete(guardFile.toPath());
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ Integer getExitValue() {
+ String lines[] = getFileLines(guardFile, END_LINE);
+ try {
+ checkVersion(lines);
+ if (lines.length >= EXIT_LINE)
+ return parseInt(lines[EXIT_LINE-1]);
+ else if (lines.length >= PID_LINE && !isRunning(lines[PID_LINE-1]))
+ return -1; // Server has died without writing exit status.
+ return null; // Server might be running.
+ } catch (Exception e) {
+ Logger.defaultLogError("Could not get ProCoreServer exit value.", e);
+ return null; // Server might be running.
+ }
+ }
+// void stopProCoreServer() throws ProCoreException {
+// if (!guardFile.exists())
+// throw new ServerGuardNotFoundException(guardFile.getParentFile(), "Guard file does not exist.");
+// String lines[] = new String[0];
+// for (int i=0; i<LONG_SLEEP_COUNT; ++i, sleep(i, LONG_SLEEP_COUNT)) {
+// lines = getFileLines(guardFile, END_LINE);
+// if (lines.length < 1)
+// throw new ProCoreException("Guard file does not contain lines.");
+// checkVersion(lines);
+// if (lines.length >= END_LINE)
+// return; // Server is dead according to guard file.
+// if (!isRunning(lines[PID_LINE-1]))
+// return; // ProCoreServer is not running.
+// if (lines.length >= PORT_LINE) // ProCoreServer is running according to pid. Either it's slow or pid is for some other file.
+// break;
+// }
+// if (lines.length < PORT_LINE)
+// throw new ProCoreException("Guard file does not contain port.");
+// try {
+// int port = parseInt(lines[PORT_LINE-1]);
+// if (port > 0) {
+// ProCoreClient lProCoreClient = new ProCoreClient();
+// SessionAddress sa = new SessionAddressImpl(new InetSocketAddress(LOCALHOST, port), true);
+// lProCoreClient.connect(sa);
+// try {
+// String reply = lProCoreClient.execute(QUIT_CMD);
+// int nConnect = Integer.parseInt(reply);
+// if (nConnect > 1)
+// throw new ProCoreException("More than one active connection in server while giving quit command. folder=" + guardFile.getParent());
+// } finally {
+// lProCoreClient.disconnect();
+// }
+// }
+// } catch (Throwable t) {
+// throw new ProCoreException("Failed to stop procore server cleanly.", t);
+// }
+// // Managed to send quit command to server, let's wait and see if it has any effect.
+// boolean dead = wait4ServerToDie(null, LONG_SLEEP_COUNT);
+// if (!dead)
+// throw new ProCoreException("Unable to stop server.");
+// }
+ boolean isActive(ProCoreClient proCoreClient) throws ProCoreException {
+ if (proCoreClient.isConnected())
+ throw new ProCoreException("Illegal argument. ProCoreClient must not be connected.");
+ String lines[] = getFileLines(guardFile, END_LINE);
+ if (lines.length != PORT_LINE)
+ return false;
+ checkVersion(lines);
+ try {
+ int port = parseInt(lines[PORT_LINE-1]);
+ if (port > 0) {
+ try {
+ // Checking if core is responsive.
+ // This should be supported regardless of version.
+ SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);
+ proCoreClient.connect(sa);
+ proCoreClient.execute(""); // Dubious, not necessarily supported by all protocol versions.
+ return true;
+ } catch (ProCoreException e) {
+ } finally { // We must disconnect or check protocol version.
+ proCoreClient.disconnect();
+ }
+ }
+ } catch (Throwable t) {
+ // Intentionally empty.
+ }
+ return false;
+ }
+ boolean isAlive() throws ProCoreException {
+ String lines[] = getFileLines(guardFile, END_LINE);
+ if (lines.length < 1)
+ return false; // No lines, assuming server is dead.
+ checkVersion(lines);
+ if (lines.length < PID_LINE)
+ return false; // No process id, assuming server is dead.
+ else if (lines.length >= END_LINE)
+ return false; // End status available, assuming server is dead.
+ else if (lines.length >= EXIT_LINE)
+ return false; // Exit status available, assuming server is dying.
+ else if (isRunning(lines[PID_LINE-1]))
+ return true; // Process with given id running. Server might be alive.
+ else
+ return false;// Process with given id not running.
+ }
+ }
+ public static class TailFile {
+ public static class Data {
+ Data(long version, long nextChangeSetId, long nextFreeId) {
+ this.version = version;
+ this.nextChangeSetId = nextChangeSetId;
+ this.nextFreeId = nextFreeId;
+ }
+ long version;
+ public long nextChangeSetId;
+ long nextFreeId;
+ }
+ public static void createTailFile(File file, long nextChangeSetId, long nextFreeId, String dbId) throws ProCoreException {
+ if (file.exists())
+ throw new TailException("Tail file exists and thus cannot be created. file=" + file);
+ FileOutputStream fos;
+ try {
+ fos = new FileOutputStream(file);
+ } catch (FileNotFoundException e) {
+ throw new TailException("Tail file cannot be created. file=" + file);
+ }
+ PrintStream ps = new PrintStream(fos);
+ try {
+ ps.println(1); // file version
+ ps.println(nextChangeSetId); // next revision
+ ps.println(nextFreeId); // next free cluster
+ ps.println(dbId); // database id
+ } finally {
+ ps.close();
+ }
+ }
+ public static Data readTailFile(File file) throws ProCoreException {
+ String lines[] = getFileLines(file, 3);
+ if (lines.length < 3)
+ throw new TailReadException("Could not read data from " + file);
+ long ver = Long.parseLong(lines[0]);
+ long cs = Long.parseLong(lines[1]);
+ long id = Long.parseLong(lines[2]);
+ return new Data(ver, cs, id);
+ }
+ }
+}
+class SessionAddress {
+ private final InetSocketAddress socketAddress;
+ private final boolean ignoreProtocolVersion;
+ public SessionAddress(InetSocketAddress socketAddress, boolean ignoreProtocolVersion) {
+ assert(null != socketAddress);
+ this.socketAddress = socketAddress;
+ this.ignoreProtocolVersion = ignoreProtocolVersion;
+ }
+ public InetSocketAddress getSocketAddress() {
+ return socketAddress;
+ }
+ public boolean ignoreProtocolVersion() {
+ return ignoreProtocolVersion;
+ }
+ @Override
+ public String toString() {
+ return socketAddress.toString();
+ }
+}
+class PacketQueue {
+ private final LinkedBlockingQueue<Packet> packets = new LinkedBlockingQueue<Packet>();
+ PacketQueue() {
+ }
+ void addFirst(Packet packet) {
+ packets.add(packet);
+ }
+ Packet takeFirst() throws InterruptedException {
+ return packets.take();
+ }
+}
+class Header {
+ Header(int[] ints) {
+ lastTokenIn = ints[0];
+ token = ints[1];
+ messageNumber = ints[2];
+ inflateSize = ints[3];
+ deflateSize = ints[4];
+ }
+ int lastTokenIn;
+ int token;
+ int messageNumber;
+ int inflateSize;
+ int deflateSize;
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(80);
+ sb.append("lastTokenIn=");
+ sb.append(lastTokenIn);
+ sb.append(" token=");
+ sb.append(token);
+ sb.append(" messageNumber=");
+ sb.append(messageNumber);
+ sb.append(" inflateSize=");
+ sb.append(inflateSize);
+ sb.append(" deflateSize=");
+ sb.append(deflateSize);
+ return sb.toString();
+ }
+}
+class Packet {
+ Header header;
+ ByteBuffer bytes;
+ Packet(int[] header, ByteBuffer bytes) {
+ this.header = new Header(header);
+ this.bytes = bytes;
+ }
+}
+class ConnectionThread {
+ private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;
+ private static final int HEADER_N = 5;
+ private static final int HEADER_SIZE = HEADER_N * 4;
+ private static final int DEFLATE = 4;
+ private static final int INCREMENT_SIZE = 8192;
+ private static int instanceCount = 0;
+ class Manager implements Runnable {
+ private final int instance;
+ private volatile boolean carryOn = true;
+ private volatile int lastTokenIn = 0;
+ private volatile MethodQueue methodQueue = new MethodQueue();
+ private volatile CountDownLatch running = new CountDownLatch(1);
+ private final EventHandler defaultEventHandler = new DefaultEventHandler();
+// private EventHandler eventHandler = defaultEventHandler;
+ Manager() {
+ instance = ++instanceCount;
+ }
+ @Override
+ public String toString() {
+ return "Connection " + instance;
+ }
+ InetSocketAddress getAddress() {
+ return connection.getAddress();
+ }
+ boolean isConnected() {
+ return connection.isConnected();
+ }
+ void reconnect() throws ProCoreException, InterruptedException {
+ InetSocketAddress a = connection.getAddress();
+ if (connection.addressNotInitialized() || 0 == a.getPort())
+ throw new ProCoreException("Address not ok. address=" + connection.getAddress());
+ connect(a);
+ }
+ void connect(int port) throws ProCoreException, InterruptedException {
+ InetSocketAddress a = connection.getAddress();
+ if (connection.addressNotInitialized() || 0 == a.getPort() || (port != 0 && port != a.getPort()))
+ a = new InetSocketAddress(ProCoreClient.LOCALHOST, port);
+ connect(a);
+ }
+ void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {
+ if (connection.isConnected()) {
+ if (connection.equalsAddress(address))
+ return;
+ else
+ throw new ProCoreException("Already connected to different address. old=" + connection.getAddress() + " new=" + address);
+ }
+ connection.init4Connection(address);
+ connectionManager.connect(connection);
+// compression = new Compression();
+ if (thread.isAlive())
+ return;
+ thread.start();
+ running.await();
+ }
+ void call(Method method) throws ProCoreException, InterruptedException, IOException {
+// eventHandler = method.getEventHandler(eventHandler);
+ connection.sendMethod(methodQueue, method, lastTokenIn);
+ }
+ void disconnect() {
+ methodQueue.close();
+ connection.disconnect();
+ }
+ @Override
+ public void run() { // MARK: CT.Manger.run
+ try {
+ running.countDown();
+ while (carryOn) {
+ Packet packet = null;
+ try {
+ packet = inputQueue.takeFirst();
+ } catch (InterruptedException e) {
+ Util.trace("Wait for input packet interrupted. this=" + this);
+ continue;
+ }
+ if (null == packet)
+ break;
+ handlePacket(packet);
+ }
+ if (carryOn)
+ Util.trace("Thread " + thread.getName() + " stopped with null packet.");
+ else
+ Util.trace("Thread " + thread.getName() + " stopped with false carryOn.");
+ } catch (Throwable t) {
+ Util.logError("Thread " + thread.getName() + " stopped to exception:", t);
+ } finally {
+ disconnect();
+ running.countDown();
+ }
+ }
+ private void handlePacket(Packet packet) throws ProCoreException {
+ if (packet.header.token > lastTokenIn) {
+ lastTokenIn = packet.header.token;
+ if (DEBUG)
+ Util.showDebug("Setting lastTokeinIn=" + lastTokenIn);
+ }
+ Method method = methodQueue.remove(packet.header.token);
+ if (null == method) {
+ try {
+ switch (packet.header.messageNumber) {
+ default: {
+ Util.logError("Missing method. token=" + packet.header.token);
+ } return;
+ case MessageNumber.ChangeSetUpdateRequest: {
+ defaultEventHandler.on(new ChangeSetUpdateEvent(packet));
+ } break;
+ }
+ } catch (Throwable t) {
+ Util.logError("Event handler failed:", t);
+ }
+ return;
+ }
+// else
+// eventHandler = method.getEventHandler(defaultEventHandler);
+ switch (packet.header.messageNumber) {
+ default: {
+ String s = "Illegal message number " + packet.header.messageNumber;
+ method.gotException(s);
+ } break;
+ case MessageNumber.ChangeSetUpdateRequest:
+ method.getEventHandler(defaultEventHandler).on(new ChangeSetUpdateEvent(packet));
+ methodQueue.add(method); // Event caused by method. Actual method not yet handled.
+ break;
+ case MessageNumber.NullResponse:
+ //Util.log("Null(ack) message. method=" + method + " packet=" + packet);
+ break;
+ case MessageNumber.ExceptionResponse:
+ method.gotException(packet);
+ break; // MARK: handlePacket
+ case MessageNumber.AAAResponse:
+ case MessageNumber.AskTransactionResponse:
+ case MessageNumber.CancelCommitResponse:
+ case MessageNumber.CloseClientSessionResponse:
+ case MessageNumber.EchoResponse:
+ case MessageNumber.EndTransactionResponse:
+ case MessageNumber.ExecuteResponse:
+ case MessageNumber.GetChangeSetContextResponse:
+ case MessageNumber.GetChangeSetDataResponse:
+ case MessageNumber.GetChangeSetsResponse:
+ case MessageNumber.GetClusterNewResponse:
+ case MessageNumber.GetRefreshResponse:
+ case MessageNumber.GetResourceSegmentResponse:
+ case MessageNumber.GetServerInfoResponse:
+ case MessageNumber.OpenClientSessionResponse:
+ case MessageNumber.RefreshResponse:
+ case MessageNumber.ReserveIdsResponse:
+ case MessageNumber.UndoResponse:
+ case MessageNumber.ReconnectResponse:
+ case MessageNumber.GetRefresh2Response:
+ case MessageNumber.GetClusterChangesResponse:
+ case MessageNumber.GetServerInfo2Response:
+ case MessageNumber.ListClustersResponse: {
+ method.gotPacket(packet);
+ } break;
+ }
+ }
+ }
+ private final ConnectionManager connectionManager;
+ private final PacketQueue inputQueue = new PacketQueue();
+ private final Connection connection = new Connection(this);
+ private final Manager manager = new Manager();
+ private Thread thread = new Thread(manager, manager.toString());
+ private int[] header = null;
+ private int[] INTS = new int[HEADER_N];
+ private int remaining = HEADER_SIZE;
+ private int deflateSize;
+ ConnectionThread(ConnectionManager connectionManager) {
+ this.connectionManager = connectionManager;
+ }
+ ByteBuffer getInputBuffer() {
+ return getInputBuffer(INCREMENT_SIZE);
+ }
+ ByteBuffer getInputBuffer(int sizeAtLeast) {
+ ByteBuffer bb = ByteBuffer.allocate(Math.max(sizeAtLeast, INCREMENT_SIZE));
+ bb.order(ByteOrder.LITTLE_ENDIAN);
+ return bb;
+ }
+ @Override
+ public String toString() {
+ return "header=" + header + " remaining=" + remaining;
+ }
+ private String toString(ByteBuffer buffer, int nRead) {
+ return this + " nRead=" + nRead + " buffer=" + buffer;
+ }
+ ByteBuffer handleInput(ByteBuffer buffer, int nRead) // nRead > 0
+ throws IOException { // MARK: handleInput
+ int position = buffer.position();
+ if (null == header) {
+ if (position < HEADER_SIZE)
+ return buffer;
+ else { // Got header.
+ buffer.position(0);
+ IntBuffer ib = buffer.asIntBuffer();
+ ib.get(INTS);
+ ib = null;
+ header = INTS;
+ deflateSize = header[DEFLATE];
+ if (deflateSize < 0)
+ throw new IOException("Illegal deflate size=" + deflateSize);
+ remaining += deflateSize;
+ if (remaining > buffer.limit()) { // We need more space. This is not optimal but a simple way to handle this case.
+ ByteBuffer bb = getInputBuffer(remaining);
+ buffer.position(0);
+ buffer.limit(position);
+ bb.put(buffer);
+ return bb;
+ }
+ buffer.position(position);
+ }
+ }
+ if (position < remaining)
+ return buffer; // Need more data.
+ else if (position > remaining) { // Got whole packet and then some. This is not optimal but a simple way to handle this case.
+ ByteBuffer bb = getInputBuffer(remaining);
+ buffer.position(0);
+ buffer.limit(remaining);
+ bb.put(buffer);
+ inputQueue.addFirst(new Packet(header, bb));
+ ByteBuffer b2 = getInputBuffer(buffer.limit());
+ buffer.position(remaining);
+ buffer.limit(position);
+ b2.put(buffer);
+ header = null;
+ remaining = HEADER_SIZE;
+ return handleInput(b2, b2.position());
+ } else if (position != remaining)
+ throw new IOException("Assertion error. this=" + toString(buffer, nRead));
+ // Got exactly one packet.
+ inputQueue.addFirst(new Packet(header, buffer));
+ header = null;
+ remaining = HEADER_SIZE;
+ return getInputBuffer(INCREMENT_SIZE);
+ }
+ InetSocketAddress getAddress() {
+ return manager.getAddress();
+ }
+ boolean isConnected() {
+ return manager.isConnected();
+ }
+ void reconnect() throws ProCoreException, InterruptedException {
+ manager.reconnect();
+ }
+ void connect(int port) throws ProCoreException, InterruptedException {
+ manager.connect(port);
+ }
+ void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {
+ manager.connect(address);
+ }
+ void disconnect() {
+ manager.disconnect();
+ }
+ Connection call(Method m) throws ProCoreException, InterruptedException, IOException {
+ manager.call(m);
+ return connection;
+ }
+}
+class Client {
+ private final ProCoreServer proCoreServer;
+ private final ProCoreClient proCoreClient;
+ private final ConnectionThread connectionThread;
+ private boolean closed = false;
+ Client(ProCoreServer proCoreServer, ProCoreClient proCoreClient, ConnectionThread connectionThread) {
+ this.proCoreServer = proCoreServer;
+ this.proCoreClient = proCoreClient;
+ this.connectionThread = connectionThread;
+ }
+ void open() throws ProCoreException, InterruptedException {
+ synchronized (proCoreClient) {
+ if (!connectionThread.isConnected())
+ connectionThread.connect(proCoreServer.getPort());
+ }
+ closed = false;
+ }
+ boolean isOpen() {
+ synchronized (proCoreClient) {
+ return connectionThread.isConnected();
+ }
+ }
+ boolean isClosed() {
+ synchronized (proCoreClient) {
+ return closed || !connectionThread.isConnected();
+ }
+ }
+ void close() {
+ proCoreClient.deleteClient(this);
+ closed = true;
+ }
+ String execute(String command) throws ProCoreException {
+ return proCoreClient.execute(command);
+ }
+ void call(Method m) throws ProCoreException {
+ proCoreClient.call(m);
+ }
+}
+class ProCoreClient {
+ public static final String LOCALHOST = "127.0.0.1";
+ class Remote {
+ private final ConnectionManager connectionManager = ConnectionManager.getInstance();
+ private final ConnectionThread connectionThread = new ConnectionThread(connectionManager);
+ Remote() {
+ }
+ }
+ private final Remote remote = new Remote();
+ private final Set<Client> clients = new HashSet<Client>();
+ private final ProCoreServer proCoreServer;
+ public ProCoreClient(ProCoreServer proCoreServer) throws ProCoreException {
+ this.proCoreServer = proCoreServer;
+ }
+ synchronized public int getPort() {
+ return remote.connectionThread.getAddress().getPort();
+ }
+ synchronized public boolean isConnected() {
+ return remote.connectionThread.isConnected();
+ }
+ synchronized public Client newClient() throws ProCoreException {
+ return newClient(proCoreServer, remote.connectionThread.getAddress());
+ }
+ synchronized public Client newClient(ProCoreServer proCoreServer, InetSocketAddress address) throws ProCoreException {
+ if (!remote.connectionThread.isConnected())
+ try {
+ remote.connectionThread.connect(address);
+ } catch (InterruptedException e) {
+ if (!remote.connectionThread.isConnected()) {
+ remote.connectionThread.disconnect();
+ throw new ProCoreException("Failed to create new client.");
+ }
+ }
+ else if (!address.equals(remote.connectionThread.getAddress()))
+ throw new ProCoreException("Illegal newClient call. Already connected to different address.");
+ Client client = new Client(proCoreServer, this, remote.connectionThread);
+ clients.add(client);
+ return client;
+ }
+ synchronized int deleteClient(Client client) {
+ clients.remove(client);
+ return clients.size();
+ }
+ void closeClient(int aSessionId) throws ProCoreException {
+ try {
+ CloseClientSessionFunction f = new CloseClientSessionFunction(aSessionId);
+ Method m = new Method(f, null, null);
+ Connection c = remote.connectionThread.call(m);
+ m.waitForReply(c);
+ } catch (InterruptedException e) {
+ throw new ProCoreException("Thread interrupted during method call.", e);
+ } catch (IOException e) {
+ throw new ProCoreException("IOException during method call.", e);
+ } catch (Throwable e) {
+ throw new ProCoreException("Throwable during method call.", e);
+ }
+ }
+ synchronized public String execute(String command) throws ProCoreException {
+ try {
+ ExecuteFunction f = new ExecuteFunction(command);
+ Method m = new Method(f, null, null);
+ Connection c = remote.connectionThread.call(m);
+ m.waitForReply(c);
+ return f.out;
+ } catch (InterruptedException e) {
+ throw new ProCoreException("Thread interrupted during method call.", e);
+ } catch (IOException e) {
+ throw new ProCoreException("IOException during method call.", e);
+ } catch (Throwable e) {
+ throw new ProCoreException("Throwable during method call.", e);
+ }
+ }
+ private void recall(Method method, IOException e) throws ProCoreException {// :)
+ if (remote.connectionThread.isConnected())
+ throw new ProCoreException("IOException during method call.", e);
+ try {
+ remote.connectionThread.reconnect();
+ Connection c = remote.connectionThread.call(method);
+ method.waitForReply(c);
+ } catch (Throwable t) {
+ throw new ProCoreException("IOException during method call. Recall failed.", e);
+ }
+ }
+ synchronized public void call(Method method) throws ProCoreException {
+ try {
+ Connection c = remote.connectionThread.call(method);
+ method.waitForReply(c);
+ } catch (InterruptedException e) {
+ throw new ProCoreException("Thread interrupted during method call.", e);
+ } catch (IOException e) {
+ recall(method, e);
+ } catch (ProCoreException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw new ProCoreException("Throwable during method call.", e);
+ }
+ }
+ synchronized public void connect(int port) throws ProCoreException, InterruptedException {
+ remote.connectionThread.connect(port);
+ }
+ synchronized public void connect(SessionAddress sessionAddress)
+ throws ProCoreException, InterruptedException {
+ remote.connectionThread.connect(sessionAddress.getSocketAddress());
+ }
+ synchronized public void disconnect() throws ProCoreException {
+ remote.connectionThread.disconnect();
+ }
+}
+abstract class Event {
+ final String name;
+ final Packet packet;
+ Event(String name, Packet packet) {
+ this.name = name;
+ this.packet = packet;
+ }
+ int getToken() {
+ return packet.header.token;
+ }
+ void deserialize(org.simantics.db.server.protocol.AbstractEvent event) {
+ event.deserialize(event.getRequestNumber(), getDataBuffer());
+ }
+ private org.simantics.db.server.protocol.DataBuffer getDataBuffer() {
+ this.packet.bytes.position(20);
+ return new org.simantics.db.server.protocol.DataBuffer(this.packet.bytes);
+ }
+ @Override public String toString() {
+ return name + " token=" + packet.header.token;
+ }
+}
+class ChangeSetUpdateEvent extends Event {
+ ChangeSetUpdateEvent(Packet packet) {
+ super("ChangeSetUpdateEvent", packet);
+ }
+}
+abstract class EventHandler {
+ abstract void on(Event event);
+}
+class DefaultEventHandler extends EventHandler {
+ void on(Event event) {
+ Util.log("Missing event handler. event=" + event);
+ }
+}