--- /dev/null
+/*******************************************************************************\r
+\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.db.server.internal;\r
+\r
+import java.io.BufferedReader;\r
+import java.io.BufferedWriter;\r
+import java.io.File;\r
+import java.io.FileInputStream;\r
+import java.io.FileNotFoundException;\r
+import java.io.FileOutputStream;\r
+import java.io.IOException;\r
+import java.io.InputStreamReader;\r
+import java.io.OutputStreamWriter;\r
+import java.io.PrintStream;\r
+import java.net.InetSocketAddress;\r
+import java.nio.ByteBuffer;\r
+import java.nio.ByteOrder;\r
+import java.nio.IntBuffer;\r
+import java.nio.file.Files;\r
+import java.nio.file.Path;\r
+import java.util.ArrayList;\r
+import java.util.HashMap;\r
+import java.util.HashSet;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.concurrent.CountDownLatch;\r
+import java.util.concurrent.LinkedBlockingQueue;\r
+\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.server.DatabaseLastExitException;\r
+import org.simantics.db.server.GuardFileVersionException;\r
+import org.simantics.db.server.ProCoreException;\r
+import org.simantics.db.server.ServerNotFoundException;\r
+import org.simantics.db.server.protocol.CloseClientSessionFunction;\r
+import org.simantics.db.server.protocol.ExecuteFunction;\r
+import org.simantics.db.server.protocol.MessageNumber;\r
+\r
+public class ProCoreServer {\r
+ private static Map<String, ProCoreServer> workingDirs = new HashMap<String, ProCoreServer>();\r
+ private static Map<String, ProCoreServer> databaseIds = new HashMap<String, ProCoreServer>();\r
+ private static final String LOCALHOST = ProCoreClient.LOCALHOST;\r
+ public static final int VERSION_MAJOR = 1;\r
+ public static final int VERSION_MINOR = 4; // 2015-05-05\r
+ public static final String BRANCH_DIR = "procore.headClusters.procore";\r
+ public static final String TAIL_DIR = "procore.tailClusters.procore";\r
+ public static final String TAIL_FILE = "procore.tail.procore";\r
+ public static final String CONFIG_FILE = "procore.config.procore";\r
+ public static final String GUARD_FILE = "procore.guard.procore";\r
+ public static final String JOURNAL_FILE = "procore.journal.procore";\r
+ public static final String LOG_FILE = "procore.log.procore";\r
+ public static final String DCS_FILE = "procore.dumpChangeSets.procore";\r
+ public static final String RECOVERY_NEEDED_FILE = "recovery.needed";\r
+ public static final String RECOVERY_IGNORED_FILE = "recovery.ignored";\r
+ public static final String PROTOCOL_IGNORED_FILE = "protocol.ignored";\r
+ public static final String PAGE_FILE_PATTERN = "procore.page*.procore";\r
+ public static final String DATA_PREFIX = "ClusterData.";\r
+ public static final String DATA_PATTERN = DATA_PREFIX + "*";\r
+ public static final String INDEX_PREFIX = "ClusterIndex.";\r
+ public static final String INDEX_PATTERN = INDEX_PREFIX + "*";\r
+ public static final String VALUE_PREFIX = "ExternalValue.";\r
+ public static final String VALUE_PATTERN = VALUE_PREFIX + "*";\r
+ public static final String VALUE_SUFFIX = ".procore";\r
+ public static final String DELETED_PREFIX = "ClusterDeleted.";\r
+ public static final String DELETED_PATTERN = DELETED_PREFIX + "*";\r
+ private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;\r
+ public static final String QUIT_CMD = "quit";\r
+// private static final int SHORT_SLEEP_COUNT = 1;\r
+// private static final int LONG_SLEEP_COUNT = 10;\r
+// static public void stopProCoreServer(File workingDir) throws ProCoreException {\r
+// GuardFile guardFile = new GuardFile(workingDir);\r
+// guardFile.stopProCoreServer();\r
+// }\r
+ private static boolean isRunning(String aPid) throws ProCoreException {\r
+ boolean isRunning = false;\r
+ try {\r
+ switch (OSType.calculate()) {\r
+ case WINDOWS: {\r
+ Process p = Runtime.getRuntime().exec("tasklist.exe /fo csv /nh /fi \"pid eq " + aPid + "\"");\r
+ BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));\r
+ String line = input.readLine();\r
+ if (line.matches(".*ProCoreServer.exe.*"))\r
+ isRunning = true;\r
+ input.close();\r
+ break;\r
+ }\r
+ case UNIX: {\r
+ Process p = Runtime.getRuntime().exec("ps -p " + aPid + " -o comm=");\r
+ BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));\r
+ String line = input.readLine();\r
+ if (line.matches(".*ProCoreServer.*"))\r
+ isRunning = true;\r
+ input.close();\r
+ break;\r
+ }\r
+ default:\r
+ }\r
+ } catch (Exception e) {\r
+ throw new ProCoreException("Could not get list of running processes.", e);\r
+ }\r
+ return isRunning;\r
+ }\r
+ static class ProCoreProcess {\r
+ private enum State { Created, Initialized, ThreadStarted, ProcessStarted, Prompted, GotPort, ThreadStopped };\r
+ class Manager implements Runnable {\r
+ private State state = State.Created;\r
+ private CountDownLatch serverPrompted;\r
+ private LinkedBlockingQueue<String> reply;\r
+ private volatile String command;\r
+ private volatile Integer exitValue;\r
+ private volatile boolean carryOn;\r
+ private volatile int port;\r
+ private final File dbFolder; // Just for exception i.e. error reporting.\r
+ Manager(File dbFolder) {\r
+ this.dbFolder = dbFolder;\r
+ init();\r
+ }\r
+ private void init() {\r
+ serverPrompted = new CountDownLatch(1);\r
+ reply = new LinkedBlockingQueue<String>();\r
+ exitValue = null;\r
+ carryOn = true;\r
+ port = 0;\r
+ state = State.Initialized;\r
+ }\r
+ int getPort() {\r
+ return port;\r
+ }\r
+ void tryToStart() throws ProCoreException, InterruptedException {\r
+ if (thread.isAlive())\r
+ return;\r
+ thread = new Thread(manager, manager.initAndGetName());\r
+ thread.start();\r
+ serverPrompted.await();\r
+ if (!state.equals(State.Prompted))\r
+ throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");\r
+ String reply = giveCommand("print port");\r
+ try {\r
+ port = Integer.parseInt(reply);\r
+ } catch (NumberFormatException e) {\r
+ if (null != exitValue && 0 != exitValue)\r
+ throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");\r
+ if (reply.equals("End of server thread."))\r
+ throw new ProCoreException("Server did not start (did not prompt). state=" + state);\r
+ else\r
+ throw new ProCoreException("Server did not start. Could not parse port. reply=" + reply);\r
+ }\r
+ state = State.GotPort;\r
+ }\r
+ String giveCommand(String command) throws ProCoreException, InterruptedException {\r
+ if (!thread.isAlive())\r
+ throw new ProCoreException("Server thread not alive.");\r
+ if (null != this.command)\r
+ throw new ProCoreException("Old command pending.");\r
+ this.command = command;\r
+ return reply.take();\r
+ }\r
+ private String initAndGetName() {\r
+ init();\r
+ return "ProCoreProcess " + ++instanceCount + " " + builder.directory();\r
+ }\r
+ boolean isActive() throws ProCoreException {\r
+ if (null == exitValue && thread.isAlive())\r
+ return true;\r
+ else\r
+ return false;\r
+ }\r
+ boolean tryToStop() throws ProCoreException {\r
+ if (!thread.isAlive())\r
+ return true;\r
+ carryOn = false;\r
+ thread.interrupt();\r
+ try {\r
+ thread.join(10000); // milliseconds\r
+ } catch (InterruptedException e) {\r
+ }\r
+ return !thread.isAlive();\r
+ }\r
+ @Override\r
+ public void run() {\r
+ state = State.ThreadStarted;\r
+ Util.log("ProCoreServer thread started.");\r
+ try {\r
+ Util.trace("ProCoreProcessManager start.");\r
+ Process process = builder.start();\r
+ exitValue = exitValue(process);\r
+ if (null != exitValue)\r
+ Util.logError("Server process did not start.");\r
+ else {\r
+ state = State.ProcessStarted;\r
+ runProcessStarted(process); // Process start\r
+ }\r
+ } catch (IOException e) {\r
+ Util.logError("Failed during process watch.");\r
+ } finally {\r
+ reply.add("End of server thread.");\r
+ Util.trace("ProCoreServerThread stop.");\r
+ state = State.ThreadStopped;\r
+ serverPrompted.countDown();\r
+ }\r
+ }\r
+ private void runProcessStarted(Process process) throws IOException {\r
+ try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));\r
+ final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));)\r
+ {\r
+ do {\r
+ try {\r
+ if (reader.ready())\r
+ handleLine(reader.readLine(), reader, writer);\r
+ else if (null != command) {\r
+ writer.write(command);\r
+ command = null;\r
+ writer.newLine();\r
+ writer.flush();\r
+ String line = getReply(reader);\r
+ reply.add(line);\r
+ } else\r
+ Thread.sleep(100); // milliseconds\r
+ } catch (InterruptedException e) {\r
+ Util.trace("ProCoreServerThread interrupted.");\r
+ }\r
+ exitValue = exitValue(process);\r
+ if (null == exitValue && !carryOn)\r
+ try {\r
+ try {\r
+ writer.write(QUIT_CMD);\r
+ writer.newLine();\r
+ writer.flush();\r
+ } catch (Throwable t) {\r
+ Util.logError("Wait for process death was interrupted.", t);\r
+ } finally {\r
+ }\r
+ exitValue = process.waitFor();\r
+ } catch (InterruptedException e) {\r
+ Util.logError("Wait for process death was interrupted.", e);\r
+ carryOn = true;\r
+ continue;\r
+ }\r
+ } while (null == exitValue);\r
+ } // End of resource try.\r
+ }\r
+ private String getReply(BufferedReader reader) throws IOException {\r
+ StringBuilder sb = new StringBuilder();\r
+ while (true) {\r
+ String line = reader.readLine();\r
+ if (null == line || line.startsWith("Waiting for input?"))\r
+ break;\r
+ sb.append(line);\r
+ }\r
+ return sb.toString();\r
+ }\r
+ private void handleLine(String line, BufferedReader reader, BufferedWriter writer) throws IOException {\r
+ if (DEBUG)\r
+ Util.showDebug(line);\r
+ if (line.startsWith("Main end.")) {\r
+ carryOn = false; // Last line from server.\r
+ } else if (serverPrompted.getCount() > 0 && line.startsWith("Waiting for input?")) {\r
+ state = State.Prompted;\r
+ serverPrompted.countDown(); // Server has been started and is waiting for console input.\r
+ } else\r
+ Util.trace("From server. line=" + line);\r
+ }\r
+ private Integer exitValue(Process process) {\r
+ try {\r
+ return process.exitValue();\r
+ } catch (IllegalThreadStateException e) {\r
+ return null; // Still alive.\r
+ }\r
+ }\r
+ }\r
+ private static int instanceCount; // Keeps count of thread instances.\r
+ private final ProcessBuilder builder;\r
+ private final Manager manager;\r
+ private Thread thread;\r
+ ProCoreProcess(File exeFile, File workingDir) {\r
+ String[] args = new String[0];\r
+ String[] cmd = new String[1 + args.length];\r
+ cmd[0] = exeFile.getAbsolutePath().toString();\r
+ System.arraycopy(args, 0, cmd, 1, args.length);\r
+ builder = new ProcessBuilder(cmd);\r
+ builder.redirectErrorStream(true);\r
+ builder.directory(workingDir);\r
+ manager = new Manager(workingDir);\r
+ thread = new Thread(manager, manager.initAndGetName());\r
+ }\r
+ public void tryToStart() throws ProCoreException, InterruptedException {\r
+ manager.tryToStart();\r
+ }\r
+// public int exitValue() throws IllegalStateException {\r
+// Integer i = proCoreServerThread.getExitValue();\r
+// if (null == i)\r
+// throw new IllegalStateException("ProCoreServerThread is still running.");\r
+// return i;\r
+// }\r
+// public boolean ignoreProtocolVersion() {\r
+// File wd = builder.directory();\r
+// if (null == wd)\r
+// return false;\r
+// Path ignore = wd.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE);\r
+// if (Files.exists(ignore))\r
+// return true;\r
+// return false;\r
+// }\r
+ public boolean isActive() throws ProCoreException {\r
+ return manager.isActive();\r
+ }\r
+ public boolean tryToStop() throws ProCoreException {\r
+ return manager.tryToStop();\r
+ }\r
+ int getPort() throws ProCoreException {\r
+ return manager.getPort();\r
+ }\r
+ }\r
+ private static enum OSType {\r
+ UNIX, WINDOWS, UNKNOWN;\r
+\r
+ public static OSType calculate() {\r
+ String osName = System.getProperty("os.name");\r
+ assert osName != null;\r
+ osName = osName.toLowerCase();\r
+ if (osName.startsWith("windows"))\r
+ return WINDOWS;\r
+ if (osName.startsWith("mac os x") || osName.startsWith("linux") || osName.startsWith("sun"))\r
+ return UNIX;\r
+ return UNKNOWN;\r
+ }\r
+ }\r
+ static public boolean ignoreProtocolVersion(File dbFolder) {\r
+ if (null == dbFolder)\r
+ return false;\r
+ if (Files.exists(dbFolder.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE)))\r
+ return true;\r
+ return false;\r
+ }\r
+ static public ProCoreServer getProCoreServer(File serverDir, final File workingDir)\r
+ throws ProCoreException, ServerNotFoundException {\r
+ try {\r
+ ProCoreServer proCoreServer;\r
+ synchronized (workingDirs) {\r
+ proCoreServer = workingDirs.get(workingDir.getCanonicalPath());\r
+ if (null != proCoreServer)\r
+ return proCoreServer;\r
+ if (!serverDir.isDirectory())\r
+ throw new ProCoreException("Expected server directory as argument");\r
+ if (!workingDir.isDirectory())\r
+ throw new ProCoreException("Expected working directory as argument. wd=" + workingDir.getAbsolutePath());\r
+ String exeSuffix = Activator.getExeSuffix();\r
+ File executable = new File(serverDir, "ProCoreServer" + exeSuffix);\r
+ proCoreServer = new ProCoreServer(executable, workingDir);\r
+ workingDirs.put(workingDir.getCanonicalPath(), proCoreServer);\r
+ return proCoreServer;\r
+ }\r
+ } catch (IOException e) {\r
+ throw new ProCoreException("IOException", e);\r
+ }\r
+ }\r
+\r
+ public static void logVersionInformation() {\r
+ StringBuilder msg = new StringBuilder(200);\r
+ String nl = " \n"; // Default log error removes newlines so I add space to make the text easier to read.\r
+ msg.append("ProCore version information:");\r
+ msg.append(nl);\r
+ msg.append("ProCoreServer version=" + ProCoreServer.VERSION_MAJOR + "." + ProCoreServer.VERSION_MINOR + " protocol=" + MessageNumber.ProtocolVersionMajor + "." + MessageNumber.ProtocolVersionMinor);\r
+ try {\r
+ msg.append(" folder=" + org.simantics.db.server.internal.Activator.getServerFolder());\r
+ } catch (IOException e) {\r
+ msg.append(" folder information not available");\r
+ }\r
+ msg.append(nl);\r
+ msg.append("svn id=$Id: ProCoreServer.java r31684 2015-09-10 13:45:00Z $");\r
+ String str = msg.toString();\r
+ Logger.defaultLog(str);\r
+ }\r
+ private static String[] getFileLines(final File aFile, int maxLines) {\r
+ final String[] STRING_EMPTY_ARRAY = new String[0];\r
+ try {\r
+ if (!aFile.canRead()) {\r
+ return STRING_EMPTY_ARRAY;\r
+ }\r
+ try (FileInputStream fis = new FileInputStream(aFile);\r
+ InputStreamReader isr = new InputStreamReader(fis, "US-ASCII");\r
+ BufferedReader br = new BufferedReader(isr);){\r
+ String line = null;\r
+ int count = 0;\r
+ ArrayList<String> lines = new ArrayList<String>();\r
+ while ((line = br.readLine()) != null) {\r
+ if (++count > maxLines)\r
+ break; // Caller not interested in more lines.\r
+ lines.add(line);\r
+ }\r
+ return lines.toArray(STRING_EMPTY_ARRAY);\r
+ } catch (FileNotFoundException e) {\r
+ return STRING_EMPTY_ARRAY;\r
+ }\r
+ } catch (Throwable t) {\r
+ if (DEBUG)\r
+ t.printStackTrace();\r
+ Logger.defaultLogError(t);\r
+ }\r
+ return new String[0];\r
+ }\r
+ private static int parseInt(String s)\r
+ throws ProCoreException {\r
+ try {\r
+ return Integer.parseInt(s);\r
+ } catch (NumberFormatException e) {\r
+ String msg = "Could not convert string to number. string=" + s;\r
+ Logger.defaultLogError(msg);\r
+ throw new ProCoreException(msg, e);\r
+ }\r
+ }\r
+ private String databaseId = null;\r
+ private final GuardFile guardFile;\r
+ private final ProCoreClient proCoreClient;\r
+ ProCoreClient getProCoreClient() {\r
+ return proCoreClient;\r
+ }\r
+ private final ProCoreProcess proCoreProcess;\r
+ private ProCoreServer(File exeFile, File workingDir)\r
+ throws ProCoreException {\r
+ if (null == exeFile)\r
+ throw new ProCoreException("Illegal argument: exeFile is null.");\r
+ if (null == workingDir)\r
+ throw new ProCoreException("Illegal argument: workingDir is null.");\r
+ guardFile = new GuardFile(workingDir);\r
+ proCoreClient = new ProCoreClient(this);\r
+ proCoreProcess = new ProCoreProcess(exeFile, workingDir);\r
+ Util.log("ProCoreServer exe: " + exeFile);\r
+ Util.log("ProCoreServer folder: " + workingDir);\r
+ logVersionInformation();\r
+ }\r
+// //@Override\r
+ public synchronized String execute(String command) throws ProCoreException, InterruptedException {\r
+ return proCoreClient.execute(command);\r
+ }\r
+ private void startInternal()\r
+ throws ProCoreException {\r
+ if (proCoreClient.isConnected())\r
+ return;\r
+ Exception exception;\r
+ try {\r
+ proCoreProcess.tryToStart();\r
+ int port = proCoreProcess.getPort();\r
+ SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);\r
+ proCoreClient.connect(sa);\r
+ return;\r
+ } catch (InterruptedException e) {\r
+ Util.logError("ProCoreProcess start was interrupted.", e);\r
+ exception = e;\r
+ } catch (DatabaseLastExitException e) {\r
+ throw e; // This situation can be analyzed and handled by start handler.\r
+ } catch (ProCoreException e) {\r
+ Util.logError("Failed to start ProCoreServer process.", e);\r
+ exception = e;\r
+ }\r
+ boolean isActive = guardFile.isActive(proCoreClient);\r
+ if (!isActive)\r
+ throw new ProCoreException("Failed to connect to ProCoreServer.", exception);\r
+ }\r
+// private int getWaitCount() throws ProCoreException {\r
+// if (!proCoreProcess.isActive())\r
+// return SHORT_SLEEP_COUNT; // Short wait because server process is not running.\r
+// else\r
+// return LONG_SLEEP_COUNT;\r
+// }\r
+// private void startInternalReinit(int exitStatus)\r
+// throws ProCoreException {\r
+// switch (exitStatus) {\r
+// default: // What has happened here?\r
+// throw new DatabaseStartException(workingDir, getExitMessage(exitStatus));\r
+// case -1: // Exit status not available, not sure what has happened here.\r
+// case 29: // Server died because it could not read exit status from guard file.\r
+// case 30: // Server died because old exit status was not zero.\r
+// case 40: // Server died because it received terminating signal.\r
+// case 42: // Server died because it could not read end status from guard file.\r
+// case 43: // Server died because end value was not zero.\r
+// case 44: // Server died because server could not read version from guard file.\r
+// case 45: // Server died because server did not support guard file version.\r
+// case 47: // Server died because server could not listen to any port.\r
+// break;\r
+// case 32: // Server died becuase database version did not match.\r
+// throw new DatabaseVersionException(workingDir, getExitMessage(exitStatus));\r
+// case 33: // Server died because database has been corrupted.\r
+// throw new DatabaseCorruptedException(workingDir, getExitMessage(exitStatus));\r
+// case 39: // Server died because last exit was not clean.\r
+// throw new DatabaseLastExitException(workingDir, getExitMessage(exitStatus));\r
+// }\r
+// boolean dead = guardFile.wait4ServerToDie(proCoreClient, getWaitCount());\r
+// if (!dead) {\r
+// StringBuilder sb = new StringBuilder(256);\r
+// sb.append("ProCoreServer is running according to guard file. Giving up trying to start a new instance. Server exit message:\n");\r
+// sb.append(getExitMessage(exitStatus));\r
+// throw new DatabaseStartException(workingDir, sb.toString());\r
+// }\r
+// boolean deleted = guardFile.delete();\r
+// if (!deleted)\r
+// Logger.defaultLogError("ProCoreServer is dead but can not delete guard file. " + getExitMessage(exitStatus));\r
+// }\r
+ //@Override\r
+ public synchronized boolean isActive()\r
+ throws ProCoreException {\r
+ if (!proCoreClient.isConnected())\r
+ return guardFile.isActive(proCoreClient);\r
+ proCoreClient.execute("");\r
+ return true;\r
+ }\r
+ public synchronized boolean isAlive()\r
+ throws ProCoreException {\r
+ if (proCoreClient.isConnected())\r
+ return true;\r
+ if (proCoreProcess.isActive())\r
+ return true;\r
+ return guardFile.isAlive();\r
+ }\r
+ Client newClient() throws ProCoreException {\r
+ if (proCoreClient.isConnected())\r
+ return proCoreClient.newClient();\r
+ int port = getPort();\r
+ InetSocketAddress isa = new InetSocketAddress(LOCALHOST, port);\r
+ return proCoreClient.newClient(this, isa);\r
+ }\r
+ public synchronized boolean tryToStop()\r
+ throws ProCoreException {\r
+ synchronized (proCoreClient) {\r
+ ProCoreException ex = null;\r
+ if (!proCoreClient.isConnected() && !proCoreProcess.isActive()) {\r
+ try {\r
+ proCoreClient.connect(getPort());\r
+ } catch (InterruptedException e) {\r
+ }\r
+ }\r
+ if (proCoreClient.isConnected()) {\r
+ String s = "";\r
+ try {\r
+ s = proCoreClient.execute(QUIT_CMD);\r
+ String t = s.replaceAll("\n", "");\r
+ int n = Integer.parseInt(t);\r
+ if (n != 1)\r
+ Util.log("Number of connections after quit command is " + n + ".");\r
+ proCoreClient.closeClient(0);\r
+ if (!isLocal() && n > 1)\r
+ throw new ProCoreException("More than one connection for remote. n=" + n);\r
+ } catch (NumberFormatException e) {\r
+ Util.logError("Failed to parse number of connections. s=" + s, e);\r
+ } catch (ProCoreException e) {\r
+ ex = e;\r
+ } finally {\r
+ proCoreClient.disconnect();\r
+ }\r
+ }\r
+ try {\r
+ proCoreProcess.tryToStop();\r
+ } finally {\r
+ if (proCoreProcess.isActive() && null == ex)\r
+ ex = new ProCoreException("Failed to stop ProCoreProcess.");\r
+ }\r
+ if (null != ex)\r
+ throw ex;\r
+ return !proCoreClient.isConnected() && !proCoreProcess.isActive();\r
+ }\r
+ }\r
+ public synchronized boolean isConnected()\r
+ throws ProCoreException {\r
+ return proCoreClient.isConnected();\r
+ }\r
+ public synchronized boolean isLocal()\r
+ throws ProCoreException {\r
+ return proCoreClient.isConnected() && proCoreProcess.isActive();\r
+ }\r
+ public synchronized void disconnect()\r
+ throws ProCoreException {\r
+ proCoreClient.disconnect();\r
+ }\r
+ public synchronized void connect()\r
+ throws ProCoreException, InterruptedException {\r
+ int port = getPort();\r
+ if (0 == port)\r
+ throw new ProCoreException("Port 0 not supported as connection address.");\r
+ proCoreClient.connect(port);\r
+ }\r
+ private String getExitMessage(int exitValue) {\r
+ switch (exitValue) {\r
+ default:\r
+ return "Unexpected exit value = " + exitValue + ".";\r
+ case 1:\r
+ return "Exception was thrown. This indicates problems with program logic.";\r
+ case 2:\r
+ return "CallException was thrown. This indicates problems with calls to/from parent server.";\r
+ case 3:\r
+ return "ProtocolException was thrown. This indicates problems with executable versions between local and parent servers.";\r
+ case 4:\r
+ return "NoSuchElementException was thrown. This indicates problems with program logic.";\r
+ case 5:\r
+ return "IllegalArgumentException was thrown. This indicates problems with program logic.";\r
+ case 6:\r
+ return "IllegalResourceException was thrown. This indicates problems with server data.";\r
+ case 7:\r
+ return "IllegalClusterException was thrown. This indicates problems with server data.";\r
+ case 8:\r
+ return "ParseException was thrown. This indicates problems with/during parsing of the configuration file.";\r
+ case 9:\r
+ return "PortSemaphoreException was thrown. This indicates that some other server is using the same port number.";\r
+ case 10:\r
+ return "DatabaseSemaphoreException was thrown. This indicates that some other server is using the same database id.";\r
+ case 11:\r
+ return "SystemException was thrown. This indicates problems with program logic.";\r
+ case 12:\r
+ return "CppException was thrown. This indicates problems with program logic.";\r
+ case 13:\r
+ return "UnknownException was thrown. This indicates problems with program logic.";\r
+ case 14:\r
+ return "ThrowException was thrown. This indicates problems with program logic.";\r
+ case 15:\r
+ return "AssertException was thrown. This indicates problems with program logic.";\r
+ case 16:\r
+ return "ParentCheckException was thrown. This indicates problems with the database version of parent.";\r
+ case 17:\r
+ return "ParentConnectionException was thrown. This indicates problems with parent child relationship.";\r
+ case 18:\r
+ return "ConnectionException was thrown. This indicates problems with TCP/IP connections.";\r
+ case 19:\r
+ return "ProxyException was thrown. This indicates problems with TCP/IP connections to proxy.";\r
+ case 20:\r
+ return "NoSuchFileException was thrown. This indicates problems with a missing file.";\r
+ case 21:\r
+ return "JournalException was thrown. This indicates problems with journal mechanisim.";\r
+ case 22:\r
+ return "OutOfSpaceException was thrown. This indicates problems with memory.";\r
+ case 23:\r
+ return "ExternalValueException was thrown. This indicates problems with large value handling.";\r
+ case 24:\r
+ return "IllegalTransactionException was thrown. This indicates problems with transaction logic.";\r
+ case 25:\r
+ return "WriteTransactionException was thrown. This indicates that server failed during write transaction.";\r
+ case 26:\r
+ return "ConsoleException was thrown. This indicates that server console was closed without quit command.";\r
+ case 27:\r
+ return "ExitException was thrown. This indicates that server did not exit cleanly.";\r
+ case 28:\r
+ return "ExitReadPIdException was thrown. This indicates that server did not exit cleanly.";\r
+ case 29:\r
+ return "ExitReadStatusException was thrown. This indicates that server could not read exit status from guard file.";\r
+ case 30:\r
+ return "ExitStatusValueException was thrown. This indicates that server did not exit cleanly.";\r
+ case 31:\r
+ return "UpdateException was thrown. This indicates that server failed during update.";\r
+ case 32:\r
+ return "DatabaseVersionException was thrown. This indicates that server can not use given database.";\r
+ case 33:\r
+ return "DatabaseCorruptionException was thrown. This indicates that server has detected database corruption.";\r
+ case 34:\r
+ return "ExitReadPortException was thrown. This indicates that server did not start or exit cleanly.";\r
+ case 35:\r
+ return "ExitGuardOpenException was thrown. This indicates that server could not open guard file.";\r
+ case 36:\r
+ return "ExitGuardOpenWriteException was thrown. This indicates that server could not open guard file for writing.";\r
+ case 37:\r
+ return "ExitGuardWritePidException was thrown. This indicates that server could not write pid to guard file.";\r
+ case 38:\r
+ return "ExitGuardWritePortException was thrown. This indicates that server could not write port to guard file.";\r
+ case 39:\r
+ return "LastCommitNotCleanException was thrown. This indicates that client died during transaction. Recovery action needed.";\r
+ case 40:\r
+ return "TerminatedException was thrown. This indicates that server died because it received terminating signal.";\r
+ case 41:\r
+ return "ExitGuardWriteStatusException was thrown. This indicates that server could not write exit status to guard file.";\r
+ case 42:\r
+ return "EndReadStatusException was thrown. This indicates that server died because it could not read end status from guard file.";\r
+ case 43:\r
+ return "EndStatusValueException was thrown. This indicates that server died because end value was not zero.";\r
+ case 44:\r
+ return "ExitGuardReadVersionException was thrown. This indicates that server died because server could not read version from guard file.";\r
+ case 45:\r
+ return "ExitGuardValueVersionException was thrown. This indicates that server died because server did not support guard file version.";\r
+ case 46:\r
+ return "ExitGuardWriteVersionException was thrown. This indicates that server died because server could not write version to guard file.";\r
+ case 47:\r
+ return "ListenPortException was thrown. This indicates that server died because server could not listen to given port or any other port.";\r
+ case 48:\r
+ return "JournalVersionException was thrown. This indicates that database was written by older version.";\r
+ case 49:\r
+ return "TailFileException was thrown. This indicates problems with handling of data involved with purge operation.";\r
+ }\r
+ }\r
+ synchronized void start()\r
+ throws ProCoreException {\r
+ try {\r
+ startInternal();\r
+ } catch (DatabaseLastExitException e) {\r
+ throw e; // This can be analyzed and handled by start handler.\r
+ } catch (Throwable t) {\r
+ ProCoreException pe = null;\r
+ if (t instanceof ProCoreException)\r
+ pe = (ProCoreException)t;\r
+ else {\r
+ String msg = "ProCoreServer startup failed."\r
+ + " Automatic rcovery handling not implemented."\r
+ + " Database must be fixed manually.";\r
+ pe = new ProCoreException(msg, t);\r
+ }\r
+ if (null == pe.getDbFolder())\r
+ pe.setDbFolder(proCoreProcess.builder.directory());\r
+ }\r
+ }\r
+ synchronized void stop()\r
+ throws ProCoreException, InterruptedException{\r
+ if (null != databaseId)\r
+ databaseIds.remove(databaseId);\r
+ boolean dead = tryToStop();\r
+ if (!dead)\r
+ throw new ProCoreException("Failed to stop ProCoreServer.");\r
+ }\r
+ public synchronized String getExitMessage()\r
+ throws ProCoreException {\r
+ Integer returnValue = guardFile.getExitValue();\r
+ if (null == returnValue)\r
+ throw new ProCoreException("Exit message not available.");\r
+ return getExitMessage(returnValue);\r
+ }\r
+ //@Override\r
+ public synchronized ServerAddress getAddress()\r
+ throws ProCoreException {\r
+ return new ServerAddress(LOCALHOST, getPort());\r
+ }\r
+ //@Override\r
+ public synchronized int getPort()\r
+ throws ProCoreException {\r
+ int port = 0;\r
+ if (proCoreClient.isConnected()) {\r
+ port = proCoreClient.getPort();\r
+ if (0 != port)\r
+ return port;\r
+ }\r
+ if (proCoreProcess.isActive()) {\r
+ port = proCoreProcess.getPort();\r
+ if (0 != port)\r
+ return port;\r
+ }\r
+ return guardFile.getPort();\r
+ }\r
+ public synchronized TailFile.Data getTailData()\r
+ throws ProCoreException {\r
+ Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.TAIL_DIR);\r
+ return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());\r
+ }\r
+// public synchronized TailFile.Data getTailDataFromHead()\r
+// throws ProCoreException {\r
+// Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.BRANCH_DIR);\r
+// return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());\r
+// }\r
+ public static class GuardFile {\r
+ public static final String GUARD_FILE = ProCoreServer.GUARD_FILE;\r
+ private static final int VERSION_LINE = 1;\r
+ private static final int PID_LINE = 2;\r
+ private static final int PORT_LINE = 3;\r
+ private static final int EXIT_LINE = 4;\r
+ private static final int END_LINE = 5;\r
+ private final File guardFile;\r
+ GuardFile(File workingDir) {\r
+ guardFile = new File(workingDir, GUARD_FILE);\r
+ }\r
+ int getPort() throws ProCoreException {\r
+ String[] lines = getFileLines(guardFile, END_LINE);\r
+ if (lines.length < PORT_LINE)\r
+ throw new ProCoreException("Server port is not available. " + guardFile.getAbsolutePath());\r
+ return parseInt(lines[PORT_LINE-1]);\r
+ }\r
+ private void checkVersion(String[] lines) throws ProCoreException {\r
+ if (lines.length < VERSION_LINE)\r
+ throw new ProCoreException("Guard file version is not available. " + guardFile.getAbsolutePath());\r
+ int version = parseInt(lines[VERSION_LINE-1]);\r
+ if (1 != version)\r
+ throw new GuardFileVersionException(guardFile, "Unsupported guard file version. version=" + version);\r
+ }\r
+ boolean delete() {\r
+ if (guardFile.exists())\r
+ return guardFile.delete();\r
+ else\r
+ return true; // Guard file is deleted i.e. does not exit.\r
+ }\r
+ void deleteLog() {\r
+ if (!guardFile.exists())\r
+ return;\r
+ try {\r
+ Files.delete(guardFile.toPath());\r
+ } catch (Throwable t) {\r
+ t.printStackTrace();\r
+ }\r
+ }\r
+ Integer getExitValue() {\r
+ String lines[] = getFileLines(guardFile, END_LINE);\r
+ try {\r
+ checkVersion(lines);\r
+ if (lines.length >= EXIT_LINE)\r
+ return parseInt(lines[EXIT_LINE-1]);\r
+ else if (lines.length >= PID_LINE && !isRunning(lines[PID_LINE-1]))\r
+ return -1; // Server has died without writing exit status.\r
+ return null; // Server might be running.\r
+ } catch (Exception e) {\r
+ Logger.defaultLogError("Could not get ProCoreServer exit value.", e);\r
+ return null; // Server might be running.\r
+ }\r
+ }\r
+// void stopProCoreServer() throws ProCoreException {\r
+// if (!guardFile.exists())\r
+// throw new ServerGuardNotFoundException(guardFile.getParentFile(), "Guard file does not exist.");\r
+// String lines[] = new String[0];\r
+// for (int i=0; i<LONG_SLEEP_COUNT; ++i, sleep(i, LONG_SLEEP_COUNT)) {\r
+// lines = getFileLines(guardFile, END_LINE);\r
+// if (lines.length < 1)\r
+// throw new ProCoreException("Guard file does not contain lines.");\r
+// checkVersion(lines);\r
+// if (lines.length >= END_LINE)\r
+// return; // Server is dead according to guard file.\r
+// if (!isRunning(lines[PID_LINE-1]))\r
+// return; // ProCoreServer is not running.\r
+// if (lines.length >= PORT_LINE) // ProCoreServer is running according to pid. Either it's slow or pid is for some other file.\r
+// break;\r
+// }\r
+// if (lines.length < PORT_LINE)\r
+// throw new ProCoreException("Guard file does not contain port.");\r
+// try {\r
+// int port = parseInt(lines[PORT_LINE-1]);\r
+// if (port > 0) {\r
+// ProCoreClient lProCoreClient = new ProCoreClient();\r
+// SessionAddress sa = new SessionAddressImpl(new InetSocketAddress(LOCALHOST, port), true);\r
+// lProCoreClient.connect(sa);\r
+// try {\r
+// String reply = lProCoreClient.execute(QUIT_CMD);\r
+// int nConnect = Integer.parseInt(reply);\r
+// if (nConnect > 1)\r
+// throw new ProCoreException("More than one active connection in server while giving quit command. folder=" + guardFile.getParent());\r
+// } finally {\r
+// lProCoreClient.disconnect();\r
+// }\r
+// }\r
+// } catch (Throwable t) {\r
+// throw new ProCoreException("Failed to stop procore server cleanly.", t);\r
+// }\r
+// // Managed to send quit command to server, let's wait and see if it has any effect.\r
+// boolean dead = wait4ServerToDie(null, LONG_SLEEP_COUNT);\r
+// if (!dead)\r
+// throw new ProCoreException("Unable to stop server.");\r
+// }\r
+ boolean isActive(ProCoreClient proCoreClient) throws ProCoreException {\r
+ if (proCoreClient.isConnected())\r
+ throw new ProCoreException("Illegal argument. ProCoreClient must not be connected.");\r
+ String lines[] = getFileLines(guardFile, END_LINE);\r
+ if (lines.length != PORT_LINE)\r
+ return false;\r
+ checkVersion(lines);\r
+ try {\r
+ int port = parseInt(lines[PORT_LINE-1]);\r
+ if (port > 0) {\r
+ try {\r
+ // Checking if core is responsive.\r
+ // This should be supported regardless of version.\r
+ SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);\r
+ proCoreClient.connect(sa);\r
+ proCoreClient.execute(""); // Dubious, not necessarily supported by all protocol versions.\r
+ return true;\r
+ } catch (ProCoreException e) {\r
+ } finally { // We must disconnect or check protocol version.\r
+ proCoreClient.disconnect();\r
+ }\r
+ }\r
+ } catch (Throwable t) {\r
+ // Intentionally empty.\r
+ }\r
+ return false;\r
+ }\r
+ boolean isAlive() throws ProCoreException {\r
+ String lines[] = getFileLines(guardFile, END_LINE);\r
+ if (lines.length < 1)\r
+ return false; // No lines, assuming server is dead.\r
+ checkVersion(lines);\r
+ if (lines.length < PID_LINE)\r
+ return false; // No process id, assuming server is dead.\r
+ else if (lines.length >= END_LINE)\r
+ return false; // End status available, assuming server is dead.\r
+ else if (lines.length >= EXIT_LINE)\r
+ return false; // Exit status available, assuming server is dying.\r
+ else if (isRunning(lines[PID_LINE-1]))\r
+ return true; // Process with given id running. Server might be alive.\r
+ else\r
+ return false;// Process with given id not running.\r
+ }\r
+ }\r
+ public static class TailFile {\r
+ public static class Data {\r
+ Data(long version, long nextChangeSetId, long nextFreeId) {\r
+ this.version = version;\r
+ this.nextChangeSetId = nextChangeSetId;\r
+ this.nextFreeId = nextFreeId;\r
+ }\r
+ long version;\r
+ public long nextChangeSetId;\r
+ long nextFreeId;\r
+ }\r
+ public static void createTailFile(File file, long nextChangeSetId, long nextFreeId, String dbId) throws ProCoreException {\r
+ if (file.exists())\r
+ throw new TailException("Tail file exists and thus cannot be created. file=" + file);\r
+ FileOutputStream fos;\r
+ try {\r
+ fos = new FileOutputStream(file);\r
+ } catch (FileNotFoundException e) {\r
+ throw new TailException("Tail file cannot be created. file=" + file);\r
+ }\r
+ PrintStream ps = new PrintStream(fos);\r
+ try {\r
+ ps.println(1); // file version\r
+ ps.println(nextChangeSetId); // next revision\r
+ ps.println(nextFreeId); // next free cluster\r
+ ps.println(dbId); // database id\r
+ } finally {\r
+ ps.close();\r
+ }\r
+ }\r
+ public static Data readTailFile(File file) throws ProCoreException {\r
+ String lines[] = getFileLines(file, 3);\r
+ if (lines.length < 3)\r
+ throw new TailReadException("Could not read data from " + file);\r
+ long ver = Long.parseLong(lines[0]);\r
+ long cs = Long.parseLong(lines[1]);\r
+ long id = Long.parseLong(lines[2]);\r
+ return new Data(ver, cs, id);\r
+ }\r
+ }\r
+}\r
+class SessionAddress {\r
+ private final InetSocketAddress socketAddress;\r
+ private final boolean ignoreProtocolVersion;\r
+ public SessionAddress(InetSocketAddress socketAddress, boolean ignoreProtocolVersion) {\r
+ assert(null != socketAddress);\r
+ this.socketAddress = socketAddress;\r
+ this.ignoreProtocolVersion = ignoreProtocolVersion;\r
+ }\r
+ public InetSocketAddress getSocketAddress() {\r
+ return socketAddress;\r
+ }\r
+ public boolean ignoreProtocolVersion() {\r
+ return ignoreProtocolVersion;\r
+ }\r
+ @Override\r
+ public String toString() {\r
+ return socketAddress.toString();\r
+ }\r
+}\r
+class PacketQueue {\r
+ private final LinkedBlockingQueue<Packet> packets = new LinkedBlockingQueue<Packet>();\r
+ PacketQueue() {\r
+ }\r
+ void addFirst(Packet packet) {\r
+ packets.add(packet);\r
+ }\r
+ Packet takeFirst() throws InterruptedException {\r
+ return packets.take();\r
+ }\r
+}\r
+class Header {\r
+ Header(int[] ints) {\r
+ lastTokenIn = ints[0];\r
+ token = ints[1];\r
+ messageNumber = ints[2];\r
+ inflateSize = ints[3];\r
+ deflateSize = ints[4];\r
+ }\r
+ int lastTokenIn;\r
+ int token;\r
+ int messageNumber;\r
+ int inflateSize;\r
+ int deflateSize;\r
+ @Override\r
+ public String toString() {\r
+ StringBuilder sb = new StringBuilder(80);\r
+ sb.append("lastTokenIn=");\r
+ sb.append(lastTokenIn);\r
+ sb.append(" token=");\r
+ sb.append(token);\r
+ sb.append(" messageNumber=");\r
+ sb.append(messageNumber);\r
+ sb.append(" inflateSize=");\r
+ sb.append(inflateSize);\r
+ sb.append(" deflateSize=");\r
+ sb.append(deflateSize);\r
+ return sb.toString();\r
+ }\r
+}\r
+class Packet {\r
+ Header header;\r
+ ByteBuffer bytes;\r
+ Packet(int[] header, ByteBuffer bytes) {\r
+ this.header = new Header(header);\r
+ this.bytes = bytes;\r
+ }\r
+}\r
+class ConnectionThread {\r
+ private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;\r
+ private static final int HEADER_N = 5;\r
+ private static final int HEADER_SIZE = HEADER_N * 4;\r
+ private static final int DEFLATE = 4;\r
+ private static final int INCREMENT_SIZE = 8192;\r
+ private static int instanceCount = 0;\r
+ class Manager implements Runnable {\r
+ private final int instance;\r
+ private volatile boolean carryOn = true;\r
+ private volatile int lastTokenIn = 0;\r
+ private volatile MethodQueue methodQueue = new MethodQueue();\r
+ private volatile CountDownLatch running = new CountDownLatch(1);\r
+ private final EventHandler defaultEventHandler = new DefaultEventHandler();\r
+// private EventHandler eventHandler = defaultEventHandler;\r
+ Manager() {\r
+ instance = ++instanceCount;\r
+ }\r
+ @Override\r
+ public String toString() {\r
+ return "Connection " + instance;\r
+ }\r
+ InetSocketAddress getAddress() {\r
+ return connection.getAddress();\r
+ }\r
+ boolean isConnected() {\r
+ return connection.isConnected();\r
+ }\r
+ void reconnect() throws ProCoreException, InterruptedException {\r
+ InetSocketAddress a = connection.getAddress();\r
+ if (connection.addressNotInitialized() || 0 == a.getPort())\r
+ throw new ProCoreException("Address not ok. address=" + connection.getAddress());\r
+ connect(a);\r
+ }\r
+ void connect(int port) throws ProCoreException, InterruptedException {\r
+ InetSocketAddress a = connection.getAddress();\r
+ if (connection.addressNotInitialized() || 0 == a.getPort() || (port != 0 && port != a.getPort()))\r
+ a = new InetSocketAddress(ProCoreClient.LOCALHOST, port);\r
+ connect(a);\r
+ }\r
+ void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {\r
+ if (connection.isConnected()) {\r
+ if (connection.equalsAddress(address))\r
+ return;\r
+ else\r
+ throw new ProCoreException("Already connected to different address. old=" + connection.getAddress() + " new=" + address);\r
+ }\r
+ connection.init4Connection(address);\r
+ connectionManager.connect(connection);\r
+// compression = new Compression();\r
+ if (thread.isAlive())\r
+ return;\r
+ thread.start();\r
+ running.await();\r
+ }\r
+ void call(Method method) throws ProCoreException, InterruptedException, IOException {\r
+// eventHandler = method.getEventHandler(eventHandler);\r
+ connection.sendMethod(methodQueue, method, lastTokenIn);\r
+ }\r
+ void disconnect() {\r
+ methodQueue.close();\r
+ connection.disconnect();\r
+ }\r
+ @Override\r
+ public void run() { // MARK: CT.Manger.run\r
+ try {\r
+ running.countDown();\r
+ while (carryOn) {\r
+ Packet packet = null;\r
+ try {\r
+ packet = inputQueue.takeFirst();\r
+ } catch (InterruptedException e) {\r
+ Util.trace("Wait for input packet interrupted. this=" + this);\r
+ continue;\r
+ }\r
+ if (null == packet)\r
+ break;\r
+ handlePacket(packet);\r
+ }\r
+ if (carryOn)\r
+ Util.trace("Thread " + thread.getName() + " stopped with null packet.");\r
+ else\r
+ Util.trace("Thread " + thread.getName() + " stopped with false carryOn.");\r
+ } catch (Throwable t) {\r
+ Util.logError("Thread " + thread.getName() + " stopped to exception:", t);\r
+ } finally {\r
+ disconnect();\r
+ running.countDown();\r
+ }\r
+ }\r
+ private void handlePacket(Packet packet) throws ProCoreException {\r
+ if (packet.header.token > lastTokenIn) {\r
+ lastTokenIn = packet.header.token;\r
+ if (DEBUG)\r
+ Util.showDebug("Setting lastTokeinIn=" + lastTokenIn);\r
+ }\r
+ Method method = methodQueue.remove(packet.header.token);\r
+ if (null == method) {\r
+ try {\r
+ switch (packet.header.messageNumber) {\r
+ default: {\r
+ Util.logError("Missing method. token=" + packet.header.token);\r
+ } return;\r
+ case MessageNumber.ChangeSetUpdateRequest: {\r
+ defaultEventHandler.on(new ChangeSetUpdateEvent(packet));\r
+ } break;\r
+ }\r
+ } catch (Throwable t) {\r
+ Util.logError("Event handler failed:", t);\r
+ }\r
+ return;\r
+ }\r
+// else\r
+// eventHandler = method.getEventHandler(defaultEventHandler);\r
+ switch (packet.header.messageNumber) {\r
+ default: {\r
+ String s = "Illegal message number " + packet.header.messageNumber;\r
+ method.gotException(s);\r
+ } break;\r
+ case MessageNumber.ChangeSetUpdateRequest:\r
+ method.getEventHandler(defaultEventHandler).on(new ChangeSetUpdateEvent(packet));\r
+ methodQueue.add(method); // Event caused by method. Actual method not yet handled.\r
+ break;\r
+ case MessageNumber.NullResponse:\r
+ //Util.log("Null(ack) message. method=" + method + " packet=" + packet);\r
+ break;\r
+ case MessageNumber.ExceptionResponse:\r
+ method.gotException(packet);\r
+ break; // MARK: handlePacket\r
+ case MessageNumber.AAAResponse:\r
+ case MessageNumber.AskTransactionResponse:\r
+ case MessageNumber.CancelCommitResponse:\r
+ case MessageNumber.CloseClientSessionResponse:\r
+ case MessageNumber.EchoResponse:\r
+ case MessageNumber.EndTransactionResponse:\r
+ case MessageNumber.ExecuteResponse:\r
+ case MessageNumber.GetChangeSetContextResponse:\r
+ case MessageNumber.GetChangeSetDataResponse:\r
+ case MessageNumber.GetChangeSetsResponse:\r
+ case MessageNumber.GetClusterNewResponse:\r
+ case MessageNumber.GetRefreshResponse:\r
+ case MessageNumber.GetResourceSegmentResponse:\r
+ case MessageNumber.GetServerInfoResponse:\r
+ case MessageNumber.OpenClientSessionResponse:\r
+ case MessageNumber.RefreshResponse:\r
+ case MessageNumber.ReserveIdsResponse:\r
+ case MessageNumber.UndoResponse:\r
+ case MessageNumber.ReconnectResponse:\r
+ case MessageNumber.GetRefresh2Response:\r
+ case MessageNumber.GetClusterChangesResponse:\r
+ case MessageNumber.GetServerInfo2Response:\r
+ case MessageNumber.ListClustersResponse: {\r
+ method.gotPacket(packet);\r
+ } break;\r
+ }\r
+ }\r
+ }\r
+ private final ConnectionManager connectionManager;\r
+ private final PacketQueue inputQueue = new PacketQueue();\r
+ private final Connection connection = new Connection(this);\r
+ private final Manager manager = new Manager();\r
+ private Thread thread = new Thread(manager, manager.toString());\r
+ private int[] header = null;\r
+ private int[] INTS = new int[HEADER_N];\r
+ private int remaining = HEADER_SIZE;\r
+ private int deflateSize;\r
+ ConnectionThread(ConnectionManager connectionManager) {\r
+ this.connectionManager = connectionManager;\r
+ }\r
+ ByteBuffer getInputBuffer() {\r
+ return getInputBuffer(INCREMENT_SIZE);\r
+ }\r
+ ByteBuffer getInputBuffer(int sizeAtLeast) {\r
+ ByteBuffer bb = ByteBuffer.allocate(Math.max(sizeAtLeast, INCREMENT_SIZE));\r
+ bb.order(ByteOrder.LITTLE_ENDIAN);\r
+ return bb;\r
+ }\r
+ @Override\r
+ public String toString() {\r
+ return "header=" + header + " remaining=" + remaining;\r
+ }\r
+ private String toString(ByteBuffer buffer, int nRead) {\r
+ return this + " nRead=" + nRead + " buffer=" + buffer;\r
+ }\r
+ ByteBuffer handleInput(ByteBuffer buffer, int nRead) // nRead > 0\r
+ throws IOException { // MARK: handleInput\r
+ int position = buffer.position();\r
+ if (null == header) {\r
+ if (position < HEADER_SIZE)\r
+ return buffer;\r
+ else { // Got header.\r
+ buffer.position(0);\r
+ IntBuffer ib = buffer.asIntBuffer();\r
+ ib.get(INTS);\r
+ ib = null;\r
+ header = INTS;\r
+ deflateSize = header[DEFLATE];\r
+ if (deflateSize < 0)\r
+ throw new IOException("Illegal deflate size=" + deflateSize);\r
+ remaining += deflateSize;\r
+ if (remaining > buffer.limit()) { // We need more space. This is not optimal but a simple way to handle this case.\r
+ ByteBuffer bb = getInputBuffer(remaining);\r
+ buffer.position(0);\r
+ buffer.limit(position);\r
+ bb.put(buffer);\r
+ return bb;\r
+ }\r
+ buffer.position(position);\r
+ }\r
+ }\r
+ if (position < remaining)\r
+ return buffer; // Need more data.\r
+ else if (position > remaining) { // Got whole packet and then some. This is not optimal but a simple way to handle this case.\r
+ ByteBuffer bb = getInputBuffer(remaining);\r
+ buffer.position(0);\r
+ buffer.limit(remaining);\r
+ bb.put(buffer);\r
+ inputQueue.addFirst(new Packet(header, bb));\r
+ ByteBuffer b2 = getInputBuffer(buffer.limit());\r
+ buffer.position(remaining);\r
+ buffer.limit(position);\r
+ b2.put(buffer);\r
+ header = null;\r
+ remaining = HEADER_SIZE;\r
+ return handleInput(b2, b2.position());\r
+ } else if (position != remaining)\r
+ throw new IOException("Assertion error. this=" + toString(buffer, nRead));\r
+ // Got exactly one packet.\r
+ inputQueue.addFirst(new Packet(header, buffer));\r
+ header = null;\r
+ remaining = HEADER_SIZE;\r
+ return getInputBuffer(INCREMENT_SIZE);\r
+ }\r
+ InetSocketAddress getAddress() {\r
+ return manager.getAddress();\r
+ }\r
+ boolean isConnected() {\r
+ return manager.isConnected();\r
+ }\r
+ void reconnect() throws ProCoreException, InterruptedException {\r
+ manager.reconnect();\r
+ }\r
+ void connect(int port) throws ProCoreException, InterruptedException {\r
+ manager.connect(port);\r
+ }\r
+ void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {\r
+ manager.connect(address);\r
+ }\r
+ void disconnect() {\r
+ manager.disconnect();\r
+ }\r
+ Connection call(Method m) throws ProCoreException, InterruptedException, IOException {\r
+ manager.call(m);\r
+ return connection;\r
+ }\r
+}\r
+class Client {\r
+ private final ProCoreServer proCoreServer;\r
+ private final ProCoreClient proCoreClient;\r
+ private final ConnectionThread connectionThread;\r
+ private boolean closed = false;\r
+ Client(ProCoreServer proCoreServer, ProCoreClient proCoreClient, ConnectionThread connectionThread) {\r
+ this.proCoreServer = proCoreServer;\r
+ this.proCoreClient = proCoreClient;\r
+ this.connectionThread = connectionThread;\r
+ }\r
+ void open() throws ProCoreException, InterruptedException {\r
+ synchronized (proCoreClient) {\r
+ if (!connectionThread.isConnected())\r
+ connectionThread.connect(proCoreServer.getPort());\r
+ }\r
+ closed = false;\r
+ }\r
+ boolean isOpen() {\r
+ synchronized (proCoreClient) {\r
+ return connectionThread.isConnected();\r
+ }\r
+ }\r
+ boolean isClosed() {\r
+ synchronized (proCoreClient) {\r
+ return closed || !connectionThread.isConnected();\r
+ }\r
+ }\r
+ void close() {\r
+ proCoreClient.deleteClient(this);\r
+ closed = true;\r
+ }\r
+ String execute(String command) throws ProCoreException {\r
+ return proCoreClient.execute(command);\r
+ }\r
+ void call(Method m) throws ProCoreException {\r
+ proCoreClient.call(m);\r
+ }\r
+}\r
+class ProCoreClient {\r
+ public static final String LOCALHOST = "127.0.0.1";\r
+ class Remote {\r
+ private final ConnectionManager connectionManager = ConnectionManager.getInstance();\r
+ private final ConnectionThread connectionThread = new ConnectionThread(connectionManager);\r
+ Remote() {\r
+ }\r
+ }\r
+ private final Remote remote = new Remote();\r
+ private final Set<Client> clients = new HashSet<Client>();\r
+ private final ProCoreServer proCoreServer;\r
+ public ProCoreClient(ProCoreServer proCoreServer) throws ProCoreException {\r
+ this.proCoreServer = proCoreServer;\r
+ }\r
+ synchronized public int getPort() {\r
+ return remote.connectionThread.getAddress().getPort();\r
+ }\r
+ synchronized public boolean isConnected() {\r
+ return remote.connectionThread.isConnected();\r
+ }\r
+ synchronized public Client newClient() throws ProCoreException {\r
+ return newClient(proCoreServer, remote.connectionThread.getAddress());\r
+ }\r
+ synchronized public Client newClient(ProCoreServer proCoreServer, InetSocketAddress address) throws ProCoreException {\r
+ if (!remote.connectionThread.isConnected())\r
+ try {\r
+ remote.connectionThread.connect(address);\r
+ } catch (InterruptedException e) {\r
+ if (!remote.connectionThread.isConnected()) {\r
+ remote.connectionThread.disconnect();\r
+ throw new ProCoreException("Failed to create new client.");\r
+ }\r
+ }\r
+ else if (!address.equals(remote.connectionThread.getAddress()))\r
+ throw new ProCoreException("Illegal newClient call. Already connected to different address.");\r
+ Client client = new Client(proCoreServer, this, remote.connectionThread);\r
+ clients.add(client);\r
+ return client;\r
+ }\r
+ synchronized int deleteClient(Client client) {\r
+ clients.remove(client);\r
+ return clients.size();\r
+ }\r
+ void closeClient(int aSessionId) throws ProCoreException {\r
+ try {\r
+ CloseClientSessionFunction f = new CloseClientSessionFunction(aSessionId);\r
+ Method m = new Method(f, null, null);\r
+ Connection c = remote.connectionThread.call(m);\r
+ m.waitForReply(c);\r
+ } catch (InterruptedException e) {\r
+ throw new ProCoreException("Thread interrupted during method call.", e);\r
+ } catch (IOException e) {\r
+ throw new ProCoreException("IOException during method call.", e);\r
+ } catch (Throwable e) {\r
+ throw new ProCoreException("Throwable during method call.", e);\r
+ }\r
+ }\r
+ synchronized public String execute(String command) throws ProCoreException {\r
+ try {\r
+ ExecuteFunction f = new ExecuteFunction(command);\r
+ Method m = new Method(f, null, null);\r
+ Connection c = remote.connectionThread.call(m);\r
+ m.waitForReply(c);\r
+ return f.out;\r
+ } catch (InterruptedException e) {\r
+ throw new ProCoreException("Thread interrupted during method call.", e);\r
+ } catch (IOException e) {\r
+ throw new ProCoreException("IOException during method call.", e);\r
+ } catch (Throwable e) {\r
+ throw new ProCoreException("Throwable during method call.", e);\r
+ }\r
+ }\r
+ private void recall(Method method, IOException e) throws ProCoreException {// :)\r
+ if (remote.connectionThread.isConnected())\r
+ throw new ProCoreException("IOException during method call.", e);\r
+ try {\r
+ remote.connectionThread.reconnect();\r
+ Connection c = remote.connectionThread.call(method);\r
+ method.waitForReply(c);\r
+ } catch (Throwable t) {\r
+ throw new ProCoreException("IOException during method call. Recall failed.", e);\r
+ }\r
+ }\r
+ synchronized public void call(Method method) throws ProCoreException {\r
+ try {\r
+ Connection c = remote.connectionThread.call(method);\r
+ method.waitForReply(c);\r
+ } catch (InterruptedException e) {\r
+ throw new ProCoreException("Thread interrupted during method call.", e);\r
+ } catch (IOException e) {\r
+ recall(method, e);\r
+ } catch (ProCoreException e) {\r
+ throw e;\r
+ } catch (Throwable e) {\r
+ throw new ProCoreException("Throwable during method call.", e);\r
+ }\r
+ }\r
+ synchronized public void connect(int port) throws ProCoreException, InterruptedException {\r
+ remote.connectionThread.connect(port);\r
+ }\r
+ synchronized public void connect(SessionAddress sessionAddress)\r
+ throws ProCoreException, InterruptedException {\r
+ remote.connectionThread.connect(sessionAddress.getSocketAddress());\r
+ }\r
+ synchronized public void disconnect() throws ProCoreException {\r
+ remote.connectionThread.disconnect();\r
+ }\r
+}\r
+abstract class Event {\r
+ final String name;\r
+ final Packet packet;\r
+ Event(String name, Packet packet) {\r
+ this.name = name;\r
+ this.packet = packet;\r
+ }\r
+ int getToken() {\r
+ return packet.header.token;\r
+ }\r
+ void deserialize(org.simantics.db.server.protocol.AbstractEvent event) {\r
+ event.deserialize(event.getRequestNumber(), getDataBuffer());\r
+ }\r
+ private org.simantics.db.server.protocol.DataBuffer getDataBuffer() {\r
+ this.packet.bytes.position(20);\r
+ return new org.simantics.db.server.protocol.DataBuffer(this.packet.bytes);\r
+ }\r
+ @Override public String toString() {\r
+ return name + " token=" + packet.header.token;\r
+ }\r
+}\r
+class ChangeSetUpdateEvent extends Event {\r
+ ChangeSetUpdateEvent(Packet packet) {\r
+ super("ChangeSetUpdateEvent", packet);\r
+ }\r
+}\r
+abstract class EventHandler {\r
+ abstract void on(Event event);\r
+}\r
+class DefaultEventHandler extends EventHandler {\r
+ void on(Event event) {\r
+ Util.log("Missing event handler. event=" + event);\r
+ }\r
+}\r