1 /*******************************************************************************
3 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
5 * All rights reserved. This program and the accompanying materials
6 * are made available under the terms of the Eclipse Public License v1.0
7 * which accompanies this distribution, and is available at
8 * http://www.eclipse.org/legal/epl-v10.html
11 * VTT Technical Research Centre of Finland - initial API and implementation
12 *******************************************************************************/
13 package org.simantics.db.server.internal;
15 import java.io.BufferedReader;
16 import java.io.BufferedWriter;
18 import java.io.FileInputStream;
19 import java.io.FileNotFoundException;
20 import java.io.FileOutputStream;
21 import java.io.IOException;
22 import java.io.InputStreamReader;
23 import java.io.OutputStreamWriter;
24 import java.io.PrintStream;
25 import java.net.InetSocketAddress;
26 import java.nio.ByteBuffer;
27 import java.nio.ByteOrder;
28 import java.nio.IntBuffer;
29 import java.nio.file.Files;
30 import java.nio.file.Path;
31 import java.util.ArrayList;
32 import java.util.HashMap;
33 import java.util.HashSet;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.LinkedBlockingQueue;
39 import org.simantics.db.common.utils.Logger;
40 import org.simantics.db.server.DatabaseLastExitException;
41 import org.simantics.db.server.GuardFileVersionException;
42 import org.simantics.db.server.ProCoreException;
43 import org.simantics.db.server.ServerNotFoundException;
44 import org.simantics.db.server.protocol.CloseClientSessionFunction;
45 import org.simantics.db.server.protocol.ExecuteFunction;
46 import org.simantics.db.server.protocol.MessageNumber;
48 public class ProCoreServer {
49 private static Map<String, ProCoreServer> workingDirs = new HashMap<String, ProCoreServer>();
50 private static Map<String, ProCoreServer> databaseIds = new HashMap<String, ProCoreServer>();
51 private static final String LOCALHOST = ProCoreClient.LOCALHOST;
52 public static final int VERSION_MAJOR = 1;
53 public static final int VERSION_MINOR = 4; // 2015-05-05
54 public static final String BRANCH_DIR = "procore.headClusters.procore";
55 public static final String TAIL_DIR = "procore.tailClusters.procore";
56 public static final String TAIL_FILE = "procore.tail.procore";
57 public static final String CONFIG_FILE = "procore.config.procore";
58 public static final String GUARD_FILE = "procore.guard.procore";
59 public static final String JOURNAL_FILE = "procore.journal.procore";
60 public static final String LOG_FILE = "procore.log.procore";
61 public static final String DCS_FILE = "procore.dumpChangeSets.procore";
62 public static final String RECOVERY_NEEDED_FILE = "recovery.needed";
63 public static final String RECOVERY_IGNORED_FILE = "recovery.ignored";
64 public static final String PROTOCOL_IGNORED_FILE = "protocol.ignored";
65 public static final String PAGE_FILE_PATTERN = "procore.page*.procore";
66 public static final String DATA_PREFIX = "ClusterData.";
67 public static final String DATA_PATTERN = DATA_PREFIX + "*";
68 public static final String INDEX_PREFIX = "ClusterIndex.";
69 public static final String INDEX_PATTERN = INDEX_PREFIX + "*";
70 public static final String VALUE_PREFIX = "ExternalValue.";
71 public static final String VALUE_PATTERN = VALUE_PREFIX + "*";
72 public static final String VALUE_SUFFIX = ".procore";
73 public static final String DELETED_PREFIX = "ClusterDeleted.";
74 public static final String DELETED_PATTERN = DELETED_PREFIX + "*";
75 private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;
76 public static final String QUIT_CMD = "quit";
77 // private static final int SHORT_SLEEP_COUNT = 1;
78 // private static final int LONG_SLEEP_COUNT = 10;
79 // static public void stopProCoreServer(File workingDir) throws ProCoreException {
80 // GuardFile guardFile = new GuardFile(workingDir);
81 // guardFile.stopProCoreServer();
83 private static boolean isRunning(String aPid) throws ProCoreException {
84 boolean isRunning = false;
86 switch (OSType.calculate()) {
88 Process p = Runtime.getRuntime().exec("tasklist.exe /fo csv /nh /fi \"pid eq " + aPid + "\"");
89 BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
90 String line = input.readLine();
91 if (line.matches(".*ProCoreServer.exe.*"))
97 Process p = Runtime.getRuntime().exec("ps -p " + aPid + " -o comm=");
98 BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
99 String line = input.readLine();
100 if (line.matches(".*ProCoreServer.*"))
107 } catch (Exception e) {
108 throw new ProCoreException("Could not get list of running processes.", e);
112 static class ProCoreProcess {
113 private enum State { Created, Initialized, ThreadStarted, ProcessStarted, Prompted, GotPort, ThreadStopped };
114 class Manager implements Runnable {
115 private State state = State.Created;
116 private CountDownLatch serverPrompted;
117 private LinkedBlockingQueue<String> reply;
118 private volatile String command;
119 private volatile Integer exitValue;
120 private volatile boolean carryOn;
121 private volatile int port;
122 private final File dbFolder; // Just for exception i.e. error reporting.
123 Manager(File dbFolder) {
124 this.dbFolder = dbFolder;
127 private void init() {
128 serverPrompted = new CountDownLatch(1);
129 reply = new LinkedBlockingQueue<String>();
133 state = State.Initialized;
138 void tryToStart() throws ProCoreException, InterruptedException {
139 if (thread.isAlive())
141 thread = new Thread(manager, manager.initAndGetName());
143 serverPrompted.await();
144 if (!state.equals(State.Prompted))
145 throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");
146 String reply = giveCommand("print port");
148 port = Integer.parseInt(reply);
149 } catch (NumberFormatException e) {
150 if (null != exitValue && 0 != exitValue)
151 throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");
152 if (reply.equals("End of server thread."))
153 throw new ProCoreException("Server did not start (did not prompt). state=" + state);
155 throw new ProCoreException("Server did not start. Could not parse port. reply=" + reply);
157 state = State.GotPort;
159 String giveCommand(String command) throws ProCoreException, InterruptedException {
160 if (!thread.isAlive())
161 throw new ProCoreException("Server thread not alive.");
162 if (null != this.command)
163 throw new ProCoreException("Old command pending.");
164 this.command = command;
167 private String initAndGetName() {
169 return "ProCoreProcess " + ++instanceCount + " " + builder.directory();
171 boolean isActive() throws ProCoreException {
172 if (null == exitValue && thread.isAlive())
177 boolean tryToStop() throws ProCoreException {
178 if (!thread.isAlive())
183 thread.join(10000); // milliseconds
184 } catch (InterruptedException e) {
186 return !thread.isAlive();
190 state = State.ThreadStarted;
191 Util.log("ProCoreServer thread started.");
193 Util.trace("ProCoreProcessManager start.");
194 Process process = builder.start();
195 exitValue = exitValue(process);
196 if (null != exitValue)
197 Util.logError("Server process did not start.");
199 state = State.ProcessStarted;
200 runProcessStarted(process); // Process start
202 } catch (IOException e) {
203 Util.logError("Failed during process watch.");
205 reply.add("End of server thread.");
206 Util.trace("ProCoreServerThread stop.");
207 state = State.ThreadStopped;
208 serverPrompted.countDown();
211 private void runProcessStarted(Process process) throws IOException {
212 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
213 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));)
218 handleLine(reader.readLine(), reader, writer);
219 else if (null != command) {
220 writer.write(command);
224 String line = getReply(reader);
227 Thread.sleep(100); // milliseconds
228 } catch (InterruptedException e) {
229 Util.trace("ProCoreServerThread interrupted.");
231 exitValue = exitValue(process);
232 if (null == exitValue && !carryOn)
235 writer.write(QUIT_CMD);
238 } catch (Throwable t) {
239 Util.logError("Wait for process death was interrupted.", t);
242 exitValue = process.waitFor();
243 } catch (InterruptedException e) {
244 Util.logError("Wait for process death was interrupted.", e);
248 } while (null == exitValue);
249 } // End of resource try.
251 private String getReply(BufferedReader reader) throws IOException {
252 StringBuilder sb = new StringBuilder();
254 String line = reader.readLine();
255 if (null == line || line.startsWith("Waiting for input?"))
259 return sb.toString();
261 private void handleLine(String line, BufferedReader reader, BufferedWriter writer) throws IOException {
263 Util.showDebug(line);
264 if (line.startsWith("Main end.")) {
265 carryOn = false; // Last line from server.
266 } else if (serverPrompted.getCount() > 0 && line.startsWith("Waiting for input?")) {
267 state = State.Prompted;
268 serverPrompted.countDown(); // Server has been started and is waiting for console input.
270 Util.trace("From server. line=" + line);
272 private Integer exitValue(Process process) {
274 return process.exitValue();
275 } catch (IllegalThreadStateException e) {
276 return null; // Still alive.
280 private static int instanceCount; // Keeps count of thread instances.
281 private final ProcessBuilder builder;
282 private final Manager manager;
283 private Thread thread;
284 ProCoreProcess(File exeFile, File workingDir) {
285 String[] args = new String[0];
286 String[] cmd = new String[1 + args.length];
287 cmd[0] = exeFile.getAbsolutePath().toString();
288 System.arraycopy(args, 0, cmd, 1, args.length);
289 builder = new ProcessBuilder(cmd);
290 builder.redirectErrorStream(true);
291 builder.directory(workingDir);
292 manager = new Manager(workingDir);
293 thread = new Thread(manager, manager.initAndGetName());
295 public void tryToStart() throws ProCoreException, InterruptedException {
296 manager.tryToStart();
298 // public int exitValue() throws IllegalStateException {
299 // Integer i = proCoreServerThread.getExitValue();
301 // throw new IllegalStateException("ProCoreServerThread is still running.");
304 // public boolean ignoreProtocolVersion() {
305 // File wd = builder.directory();
308 // Path ignore = wd.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE);
309 // if (Files.exists(ignore))
313 public boolean isActive() throws ProCoreException {
314 return manager.isActive();
316 public boolean tryToStop() throws ProCoreException {
317 return manager.tryToStop();
319 int getPort() throws ProCoreException {
320 return manager.getPort();
323 private static enum OSType {
324 UNIX, WINDOWS, UNKNOWN;
326 public static OSType calculate() {
327 String osName = System.getProperty("os.name");
328 assert osName != null;
329 osName = osName.toLowerCase();
330 if (osName.startsWith("windows"))
332 if (osName.startsWith("mac os x") || osName.startsWith("linux") || osName.startsWith("sun"))
337 static public boolean ignoreProtocolVersion(File dbFolder) {
338 if (null == dbFolder)
340 if (Files.exists(dbFolder.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE)))
344 static public ProCoreServer getProCoreServer(File serverDir, final File workingDir)
345 throws ProCoreException, ServerNotFoundException {
347 ProCoreServer proCoreServer;
348 synchronized (workingDirs) {
349 proCoreServer = workingDirs.get(workingDir.getCanonicalPath());
350 if (null != proCoreServer)
351 return proCoreServer;
352 if (!serverDir.isDirectory())
353 throw new ProCoreException("Expected server directory as argument");
354 if (!workingDir.isDirectory())
355 throw new ProCoreException("Expected working directory as argument. wd=" + workingDir.getAbsolutePath());
356 String exeSuffix = Activator.getExeSuffix();
357 File executable = new File(serverDir, "ProCoreServer" + exeSuffix);
358 proCoreServer = new ProCoreServer(executable, workingDir);
359 workingDirs.put(workingDir.getCanonicalPath(), proCoreServer);
360 return proCoreServer;
362 } catch (IOException e) {
363 throw new ProCoreException("IOException", e);
367 public static void logVersionInformation() {
368 StringBuilder msg = new StringBuilder(200);
369 String nl = " \n"; // Default log error removes newlines so I add space to make the text easier to read.
370 msg.append("ProCore version information:");
372 msg.append("ProCoreServer version=" + ProCoreServer.VERSION_MAJOR + "." + ProCoreServer.VERSION_MINOR + " protocol=" + MessageNumber.ProtocolVersionMajor + "." + MessageNumber.ProtocolVersionMinor);
374 msg.append(" folder=" + org.simantics.db.server.internal.Activator.getServerFolder());
375 } catch (IOException e) {
376 msg.append(" folder information not available");
379 msg.append("svn id=$Id: ProCoreServer.java r31684 2015-09-10 13:45:00Z $");
380 String str = msg.toString();
381 Logger.defaultLog(str);
383 private static String[] getFileLines(final File aFile, int maxLines) {
384 final String[] STRING_EMPTY_ARRAY = new String[0];
386 if (!aFile.canRead()) {
387 return STRING_EMPTY_ARRAY;
389 try (FileInputStream fis = new FileInputStream(aFile);
390 InputStreamReader isr = new InputStreamReader(fis, "US-ASCII");
391 BufferedReader br = new BufferedReader(isr);){
394 ArrayList<String> lines = new ArrayList<String>();
395 while ((line = br.readLine()) != null) {
396 if (++count > maxLines)
397 break; // Caller not interested in more lines.
400 return lines.toArray(STRING_EMPTY_ARRAY);
401 } catch (FileNotFoundException e) {
402 return STRING_EMPTY_ARRAY;
404 } catch (Throwable t) {
407 Logger.defaultLogError(t);
409 return new String[0];
411 private static int parseInt(String s)
412 throws ProCoreException {
414 return Integer.parseInt(s);
415 } catch (NumberFormatException e) {
416 String msg = "Could not convert string to number. string=" + s;
417 Logger.defaultLogError(msg);
418 throw new ProCoreException(msg, e);
421 private String databaseId = null;
422 private final GuardFile guardFile;
423 private final ProCoreClient proCoreClient;
424 ProCoreClient getProCoreClient() {
425 return proCoreClient;
427 private final ProCoreProcess proCoreProcess;
428 private ProCoreServer(File exeFile, File workingDir)
429 throws ProCoreException {
431 throw new ProCoreException("Illegal argument: exeFile is null.");
432 if (null == workingDir)
433 throw new ProCoreException("Illegal argument: workingDir is null.");
434 guardFile = new GuardFile(workingDir);
435 proCoreClient = new ProCoreClient(this);
436 proCoreProcess = new ProCoreProcess(exeFile, workingDir);
437 Util.log("ProCoreServer exe: " + exeFile);
438 Util.log("ProCoreServer folder: " + workingDir);
439 logVersionInformation();
442 public synchronized String execute(String command) throws ProCoreException, InterruptedException {
443 return proCoreClient.execute(command);
445 private void startInternal()
446 throws ProCoreException {
447 if (proCoreClient.isConnected())
451 proCoreProcess.tryToStart();
452 int port = proCoreProcess.getPort();
453 SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);
454 proCoreClient.connect(sa);
456 } catch (InterruptedException e) {
457 Util.logError("ProCoreProcess start was interrupted.", e);
459 } catch (DatabaseLastExitException e) {
460 throw e; // This situation can be analyzed and handled by start handler.
461 } catch (ProCoreException e) {
462 Util.logError("Failed to start ProCoreServer process.", e);
465 boolean isActive = guardFile.isActive(proCoreClient);
467 throw new ProCoreException("Failed to connect to ProCoreServer.", exception);
469 // private int getWaitCount() throws ProCoreException {
470 // if (!proCoreProcess.isActive())
471 // return SHORT_SLEEP_COUNT; // Short wait because server process is not running.
473 // return LONG_SLEEP_COUNT;
475 // private void startInternalReinit(int exitStatus)
476 // throws ProCoreException {
477 // switch (exitStatus) {
478 // default: // What has happened here?
479 // throw new DatabaseStartException(workingDir, getExitMessage(exitStatus));
480 // case -1: // Exit status not available, not sure what has happened here.
481 // case 29: // Server died because it could not read exit status from guard file.
482 // case 30: // Server died because old exit status was not zero.
483 // case 40: // Server died because it received terminating signal.
484 // case 42: // Server died because it could not read end status from guard file.
485 // case 43: // Server died because end value was not zero.
486 // case 44: // Server died because server could not read version from guard file.
487 // case 45: // Server died because server did not support guard file version.
488 // case 47: // Server died because server could not listen to any port.
490 // case 32: // Server died becuase database version did not match.
491 // throw new DatabaseVersionException(workingDir, getExitMessage(exitStatus));
492 // case 33: // Server died because database has been corrupted.
493 // throw new DatabaseCorruptedException(workingDir, getExitMessage(exitStatus));
494 // case 39: // Server died because last exit was not clean.
495 // throw new DatabaseLastExitException(workingDir, getExitMessage(exitStatus));
497 // boolean dead = guardFile.wait4ServerToDie(proCoreClient, getWaitCount());
499 // StringBuilder sb = new StringBuilder(256);
500 // sb.append("ProCoreServer is running according to guard file. Giving up trying to start a new instance. Server exit message:\n");
501 // sb.append(getExitMessage(exitStatus));
502 // throw new DatabaseStartException(workingDir, sb.toString());
504 // boolean deleted = guardFile.delete();
506 // Logger.defaultLogError("ProCoreServer is dead but can not delete guard file. " + getExitMessage(exitStatus));
509 public synchronized boolean isActive()
510 throws ProCoreException {
511 if (!proCoreClient.isConnected())
512 return guardFile.isActive(proCoreClient);
513 proCoreClient.execute("");
516 public synchronized boolean isAlive()
517 throws ProCoreException {
518 if (proCoreClient.isConnected())
520 if (proCoreProcess.isActive())
522 return guardFile.isAlive();
524 Client newClient() throws ProCoreException {
525 if (proCoreClient.isConnected())
526 return proCoreClient.newClient();
527 int port = getPort();
528 InetSocketAddress isa = new InetSocketAddress(LOCALHOST, port);
529 return proCoreClient.newClient(this, isa);
531 public synchronized boolean tryToStop()
532 throws ProCoreException {
533 synchronized (proCoreClient) {
534 ProCoreException ex = null;
535 if (!proCoreClient.isConnected() && !proCoreProcess.isActive()) {
537 proCoreClient.connect(getPort());
538 } catch (InterruptedException e) {
541 if (proCoreClient.isConnected()) {
544 s = proCoreClient.execute(QUIT_CMD);
545 String t = s.replaceAll("\n", "");
546 int n = Integer.parseInt(t);
548 Util.log("Number of connections after quit command is " + n + ".");
549 proCoreClient.closeClient(0);
550 if (!isLocal() && n > 1)
551 throw new ProCoreException("More than one connection for remote. n=" + n);
552 } catch (NumberFormatException e) {
553 Util.logError("Failed to parse number of connections. s=" + s, e);
554 } catch (ProCoreException e) {
557 proCoreClient.disconnect();
561 proCoreProcess.tryToStop();
563 if (proCoreProcess.isActive() && null == ex)
564 ex = new ProCoreException("Failed to stop ProCoreProcess.");
568 return !proCoreClient.isConnected() && !proCoreProcess.isActive();
571 public synchronized boolean isConnected()
572 throws ProCoreException {
573 return proCoreClient.isConnected();
575 public synchronized boolean isLocal()
576 throws ProCoreException {
577 return proCoreClient.isConnected() && proCoreProcess.isActive();
579 public synchronized void disconnect()
580 throws ProCoreException {
581 proCoreClient.disconnect();
583 public synchronized void connect()
584 throws ProCoreException, InterruptedException {
585 int port = getPort();
587 throw new ProCoreException("Port 0 not supported as connection address.");
588 proCoreClient.connect(port);
590 private String getExitMessage(int exitValue) {
593 return "Unexpected exit value = " + exitValue + ".";
595 return "Exception was thrown. This indicates problems with program logic.";
597 return "CallException was thrown. This indicates problems with calls to/from parent server.";
599 return "ProtocolException was thrown. This indicates problems with executable versions between local and parent servers.";
601 return "NoSuchElementException was thrown. This indicates problems with program logic.";
603 return "IllegalArgumentException was thrown. This indicates problems with program logic.";
605 return "IllegalResourceException was thrown. This indicates problems with server data.";
607 return "IllegalClusterException was thrown. This indicates problems with server data.";
609 return "ParseException was thrown. This indicates problems with/during parsing of the configuration file.";
611 return "PortSemaphoreException was thrown. This indicates that some other server is using the same port number.";
613 return "DatabaseSemaphoreException was thrown. This indicates that some other server is using the same database id.";
615 return "SystemException was thrown. This indicates problems with program logic.";
617 return "CppException was thrown. This indicates problems with program logic.";
619 return "UnknownException was thrown. This indicates problems with program logic.";
621 return "ThrowException was thrown. This indicates problems with program logic.";
623 return "AssertException was thrown. This indicates problems with program logic.";
625 return "ParentCheckException was thrown. This indicates problems with the database version of parent.";
627 return "ParentConnectionException was thrown. This indicates problems with parent child relationship.";
629 return "ConnectionException was thrown. This indicates problems with TCP/IP connections.";
631 return "ProxyException was thrown. This indicates problems with TCP/IP connections to proxy.";
633 return "NoSuchFileException was thrown. This indicates problems with a missing file.";
635 return "JournalException was thrown. This indicates problems with journal mechanisim.";
637 return "OutOfSpaceException was thrown. This indicates problems with memory.";
639 return "ExternalValueException was thrown. This indicates problems with large value handling.";
641 return "IllegalTransactionException was thrown. This indicates problems with transaction logic.";
643 return "WriteTransactionException was thrown. This indicates that server failed during write transaction.";
645 return "ConsoleException was thrown. This indicates that server console was closed without quit command.";
647 return "ExitException was thrown. This indicates that server did not exit cleanly.";
649 return "ExitReadPIdException was thrown. This indicates that server did not exit cleanly.";
651 return "ExitReadStatusException was thrown. This indicates that server could not read exit status from guard file.";
653 return "ExitStatusValueException was thrown. This indicates that server did not exit cleanly.";
655 return "UpdateException was thrown. This indicates that server failed during update.";
657 return "DatabaseVersionException was thrown. This indicates that server can not use given database.";
659 return "DatabaseCorruptionException was thrown. This indicates that server has detected database corruption.";
661 return "ExitReadPortException was thrown. This indicates that server did not start or exit cleanly.";
663 return "ExitGuardOpenException was thrown. This indicates that server could not open guard file.";
665 return "ExitGuardOpenWriteException was thrown. This indicates that server could not open guard file for writing.";
667 return "ExitGuardWritePidException was thrown. This indicates that server could not write pid to guard file.";
669 return "ExitGuardWritePortException was thrown. This indicates that server could not write port to guard file.";
671 return "LastCommitNotCleanException was thrown. This indicates that client died during transaction. Recovery action needed.";
673 return "TerminatedException was thrown. This indicates that server died because it received terminating signal.";
675 return "ExitGuardWriteStatusException was thrown. This indicates that server could not write exit status to guard file.";
677 return "EndReadStatusException was thrown. This indicates that server died because it could not read end status from guard file.";
679 return "EndStatusValueException was thrown. This indicates that server died because end value was not zero.";
681 return "ExitGuardReadVersionException was thrown. This indicates that server died because server could not read version from guard file.";
683 return "ExitGuardValueVersionException was thrown. This indicates that server died because server did not support guard file version.";
685 return "ExitGuardWriteVersionException was thrown. This indicates that server died because server could not write version to guard file.";
687 return "ListenPortException was thrown. This indicates that server died because server could not listen to given port or any other port.";
689 return "JournalVersionException was thrown. This indicates that database was written by older version.";
691 return "TailFileException was thrown. This indicates problems with handling of data involved with purge operation.";
694 synchronized void start()
695 throws ProCoreException {
698 } catch (DatabaseLastExitException e) {
699 throw e; // This can be analyzed and handled by start handler.
700 } catch (Throwable t) {
701 ProCoreException pe = null;
702 if (t instanceof ProCoreException)
703 pe = (ProCoreException)t;
705 String msg = "ProCoreServer startup failed."
706 + " Automatic rcovery handling not implemented."
707 + " Database must be fixed manually.";
708 pe = new ProCoreException(msg, t);
710 if (null == pe.getDbFolder())
711 pe.setDbFolder(proCoreProcess.builder.directory());
714 synchronized void stop()
715 throws ProCoreException, InterruptedException{
716 if (null != databaseId)
717 databaseIds.remove(databaseId);
718 boolean dead = tryToStop();
720 throw new ProCoreException("Failed to stop ProCoreServer.");
722 public synchronized String getExitMessage()
723 throws ProCoreException {
724 Integer returnValue = guardFile.getExitValue();
725 if (null == returnValue)
726 throw new ProCoreException("Exit message not available.");
727 return getExitMessage(returnValue);
730 public synchronized ServerAddress getAddress()
731 throws ProCoreException {
732 return new ServerAddress(LOCALHOST, getPort());
735 public synchronized int getPort()
736 throws ProCoreException {
738 if (proCoreClient.isConnected()) {
739 port = proCoreClient.getPort();
743 if (proCoreProcess.isActive()) {
744 port = proCoreProcess.getPort();
748 return guardFile.getPort();
750 public synchronized TailFile.Data getTailData()
751 throws ProCoreException {
752 Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.TAIL_DIR);
753 return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());
755 // public synchronized TailFile.Data getTailDataFromHead()
756 // throws ProCoreException {
757 // Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.BRANCH_DIR);
758 // return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());
760 public static class GuardFile {
761 public static final String GUARD_FILE = ProCoreServer.GUARD_FILE;
762 private static final int VERSION_LINE = 1;
763 private static final int PID_LINE = 2;
764 private static final int PORT_LINE = 3;
765 private static final int EXIT_LINE = 4;
766 private static final int END_LINE = 5;
767 private final File guardFile;
768 GuardFile(File workingDir) {
769 guardFile = new File(workingDir, GUARD_FILE);
771 int getPort() throws ProCoreException {
772 String[] lines = getFileLines(guardFile, END_LINE);
773 if (lines.length < PORT_LINE)
774 throw new ProCoreException("Server port is not available. " + guardFile.getAbsolutePath());
775 return parseInt(lines[PORT_LINE-1]);
777 private void checkVersion(String[] lines) throws ProCoreException {
778 if (lines.length < VERSION_LINE)
779 throw new ProCoreException("Guard file version is not available. " + guardFile.getAbsolutePath());
780 int version = parseInt(lines[VERSION_LINE-1]);
782 throw new GuardFileVersionException(guardFile, "Unsupported guard file version. version=" + version);
785 if (guardFile.exists())
786 return guardFile.delete();
788 return true; // Guard file is deleted i.e. does not exit.
791 if (!guardFile.exists())
794 Files.delete(guardFile.toPath());
795 } catch (Throwable t) {
799 Integer getExitValue() {
800 String lines[] = getFileLines(guardFile, END_LINE);
803 if (lines.length >= EXIT_LINE)
804 return parseInt(lines[EXIT_LINE-1]);
805 else if (lines.length >= PID_LINE && !isRunning(lines[PID_LINE-1]))
806 return -1; // Server has died without writing exit status.
807 return null; // Server might be running.
808 } catch (Exception e) {
809 Logger.defaultLogError("Could not get ProCoreServer exit value.", e);
810 return null; // Server might be running.
813 // void stopProCoreServer() throws ProCoreException {
814 // if (!guardFile.exists())
815 // throw new ServerGuardNotFoundException(guardFile.getParentFile(), "Guard file does not exist.");
816 // String lines[] = new String[0];
817 // for (int i=0; i<LONG_SLEEP_COUNT; ++i, sleep(i, LONG_SLEEP_COUNT)) {
818 // lines = getFileLines(guardFile, END_LINE);
819 // if (lines.length < 1)
820 // throw new ProCoreException("Guard file does not contain lines.");
821 // checkVersion(lines);
822 // if (lines.length >= END_LINE)
823 // return; // Server is dead according to guard file.
824 // if (!isRunning(lines[PID_LINE-1]))
825 // return; // ProCoreServer is not running.
826 // if (lines.length >= PORT_LINE) // ProCoreServer is running according to pid. Either it's slow or pid is for some other file.
829 // if (lines.length < PORT_LINE)
830 // throw new ProCoreException("Guard file does not contain port.");
832 // int port = parseInt(lines[PORT_LINE-1]);
834 // ProCoreClient lProCoreClient = new ProCoreClient();
835 // SessionAddress sa = new SessionAddressImpl(new InetSocketAddress(LOCALHOST, port), true);
836 // lProCoreClient.connect(sa);
838 // String reply = lProCoreClient.execute(QUIT_CMD);
839 // int nConnect = Integer.parseInt(reply);
841 // throw new ProCoreException("More than one active connection in server while giving quit command. folder=" + guardFile.getParent());
843 // lProCoreClient.disconnect();
846 // } catch (Throwable t) {
847 // throw new ProCoreException("Failed to stop procore server cleanly.", t);
849 // // Managed to send quit command to server, let's wait and see if it has any effect.
850 // boolean dead = wait4ServerToDie(null, LONG_SLEEP_COUNT);
852 // throw new ProCoreException("Unable to stop server.");
854 boolean isActive(ProCoreClient proCoreClient) throws ProCoreException {
855 if (proCoreClient.isConnected())
856 throw new ProCoreException("Illegal argument. ProCoreClient must not be connected.");
857 String lines[] = getFileLines(guardFile, END_LINE);
858 if (lines.length != PORT_LINE)
862 int port = parseInt(lines[PORT_LINE-1]);
865 // Checking if core is responsive.
866 // This should be supported regardless of version.
867 SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);
868 proCoreClient.connect(sa);
869 proCoreClient.execute(""); // Dubious, not necessarily supported by all protocol versions.
871 } catch (ProCoreException e) {
872 } finally { // We must disconnect or check protocol version.
873 proCoreClient.disconnect();
876 } catch (Throwable t) {
877 // Intentionally empty.
881 boolean isAlive() throws ProCoreException {
882 String lines[] = getFileLines(guardFile, END_LINE);
883 if (lines.length < 1)
884 return false; // No lines, assuming server is dead.
886 if (lines.length < PID_LINE)
887 return false; // No process id, assuming server is dead.
888 else if (lines.length >= END_LINE)
889 return false; // End status available, assuming server is dead.
890 else if (lines.length >= EXIT_LINE)
891 return false; // Exit status available, assuming server is dying.
892 else if (isRunning(lines[PID_LINE-1]))
893 return true; // Process with given id running. Server might be alive.
895 return false;// Process with given id not running.
898 public static class TailFile {
899 public static class Data {
900 Data(long version, long nextChangeSetId, long nextFreeId) {
901 this.version = version;
902 this.nextChangeSetId = nextChangeSetId;
903 this.nextFreeId = nextFreeId;
906 public long nextChangeSetId;
909 public static void createTailFile(File file, long nextChangeSetId, long nextFreeId, String dbId) throws ProCoreException {
911 throw new TailException("Tail file exists and thus cannot be created. file=" + file);
912 FileOutputStream fos;
914 fos = new FileOutputStream(file);
915 } catch (FileNotFoundException e) {
916 throw new TailException("Tail file cannot be created. file=" + file);
918 PrintStream ps = new PrintStream(fos);
920 ps.println(1); // file version
921 ps.println(nextChangeSetId); // next revision
922 ps.println(nextFreeId); // next free cluster
923 ps.println(dbId); // database id
928 public static Data readTailFile(File file) throws ProCoreException {
929 String lines[] = getFileLines(file, 3);
930 if (lines.length < 3)
931 throw new TailReadException("Could not read data from " + file);
932 long ver = Long.parseLong(lines[0]);
933 long cs = Long.parseLong(lines[1]);
934 long id = Long.parseLong(lines[2]);
935 return new Data(ver, cs, id);
939 class SessionAddress {
940 private final InetSocketAddress socketAddress;
941 private final boolean ignoreProtocolVersion;
942 public SessionAddress(InetSocketAddress socketAddress, boolean ignoreProtocolVersion) {
943 assert(null != socketAddress);
944 this.socketAddress = socketAddress;
945 this.ignoreProtocolVersion = ignoreProtocolVersion;
947 public InetSocketAddress getSocketAddress() {
948 return socketAddress;
950 public boolean ignoreProtocolVersion() {
951 return ignoreProtocolVersion;
954 public String toString() {
955 return socketAddress.toString();
959 private final LinkedBlockingQueue<Packet> packets = new LinkedBlockingQueue<Packet>();
962 void addFirst(Packet packet) {
965 Packet takeFirst() throws InterruptedException {
966 return packets.take();
971 lastTokenIn = ints[0];
973 messageNumber = ints[2];
974 inflateSize = ints[3];
975 deflateSize = ints[4];
983 public String toString() {
984 StringBuilder sb = new StringBuilder(80);
985 sb.append("lastTokenIn=");
986 sb.append(lastTokenIn);
987 sb.append(" token=");
989 sb.append(" messageNumber=");
990 sb.append(messageNumber);
991 sb.append(" inflateSize=");
992 sb.append(inflateSize);
993 sb.append(" deflateSize=");
994 sb.append(deflateSize);
995 return sb.toString();
1001 Packet(int[] header, ByteBuffer bytes) {
1002 this.header = new Header(header);
1006 class ConnectionThread {
1007 private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;
1008 private static final int HEADER_N = 5;
1009 private static final int HEADER_SIZE = HEADER_N * 4;
1010 private static final int DEFLATE = 4;
1011 private static final int INCREMENT_SIZE = 8192;
1012 private static int instanceCount = 0;
1013 class Manager implements Runnable {
1014 private final int instance;
1015 private volatile boolean carryOn = true;
1016 private volatile int lastTokenIn = 0;
1017 private volatile MethodQueue methodQueue = new MethodQueue();
1018 private volatile CountDownLatch running = new CountDownLatch(1);
1019 private final EventHandler defaultEventHandler = new DefaultEventHandler();
1020 // private EventHandler eventHandler = defaultEventHandler;
1022 instance = ++instanceCount;
1025 public String toString() {
1026 return "Connection " + instance;
1028 InetSocketAddress getAddress() {
1029 return connection.getAddress();
1031 boolean isConnected() {
1032 return connection.isConnected();
1034 void reconnect() throws ProCoreException, InterruptedException {
1035 InetSocketAddress a = connection.getAddress();
1036 if (connection.addressNotInitialized() || 0 == a.getPort())
1037 throw new ProCoreException("Address not ok. address=" + connection.getAddress());
1040 void connect(int port) throws ProCoreException, InterruptedException {
1041 InetSocketAddress a = connection.getAddress();
1042 if (connection.addressNotInitialized() || 0 == a.getPort() || (port != 0 && port != a.getPort()))
1043 a = new InetSocketAddress(ProCoreClient.LOCALHOST, port);
1046 void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {
1047 if (connection.isConnected()) {
1048 if (connection.equalsAddress(address))
1051 throw new ProCoreException("Already connected to different address. old=" + connection.getAddress() + " new=" + address);
1053 connection.init4Connection(address);
1054 connectionManager.connect(connection);
1055 // compression = new Compression();
1056 if (thread.isAlive())
1061 void call(Method method) throws ProCoreException, InterruptedException, IOException {
1062 // eventHandler = method.getEventHandler(eventHandler);
1063 connection.sendMethod(methodQueue, method, lastTokenIn);
1066 methodQueue.close();
1067 connection.disconnect();
1070 public void run() { // MARK: CT.Manger.run
1072 running.countDown();
1074 Packet packet = null;
1076 packet = inputQueue.takeFirst();
1077 } catch (InterruptedException e) {
1078 Util.trace("Wait for input packet interrupted. this=" + this);
1083 handlePacket(packet);
1086 Util.trace("Thread " + thread.getName() + " stopped with null packet.");
1088 Util.trace("Thread " + thread.getName() + " stopped with false carryOn.");
1089 } catch (Throwable t) {
1090 Util.logError("Thread " + thread.getName() + " stopped to exception:", t);
1093 running.countDown();
1096 private void handlePacket(Packet packet) throws ProCoreException {
1097 if (packet.header.token > lastTokenIn) {
1098 lastTokenIn = packet.header.token;
1100 Util.showDebug("Setting lastTokeinIn=" + lastTokenIn);
1102 Method method = methodQueue.remove(packet.header.token);
1103 if (null == method) {
1105 switch (packet.header.messageNumber) {
1107 Util.logError("Missing method. token=" + packet.header.token);
1109 case MessageNumber.ChangeSetUpdateRequest: {
1110 defaultEventHandler.on(new ChangeSetUpdateEvent(packet));
1113 } catch (Throwable t) {
1114 Util.logError("Event handler failed:", t);
1119 // eventHandler = method.getEventHandler(defaultEventHandler);
1120 switch (packet.header.messageNumber) {
1122 String s = "Illegal message number " + packet.header.messageNumber;
1123 method.gotException(s);
1125 case MessageNumber.ChangeSetUpdateRequest:
1126 method.getEventHandler(defaultEventHandler).on(new ChangeSetUpdateEvent(packet));
1127 methodQueue.add(method); // Event caused by method. Actual method not yet handled.
1129 case MessageNumber.NullResponse:
1130 //Util.log("Null(ack) message. method=" + method + " packet=" + packet);
1132 case MessageNumber.ExceptionResponse:
1133 method.gotException(packet);
1134 break; // MARK: handlePacket
1135 case MessageNumber.AAAResponse:
1136 case MessageNumber.AskTransactionResponse:
1137 case MessageNumber.CancelCommitResponse:
1138 case MessageNumber.CloseClientSessionResponse:
1139 case MessageNumber.EchoResponse:
1140 case MessageNumber.EndTransactionResponse:
1141 case MessageNumber.ExecuteResponse:
1142 case MessageNumber.GetChangeSetContextResponse:
1143 case MessageNumber.GetChangeSetDataResponse:
1144 case MessageNumber.GetChangeSetsResponse:
1145 case MessageNumber.GetClusterNewResponse:
1146 case MessageNumber.GetRefreshResponse:
1147 case MessageNumber.GetResourceSegmentResponse:
1148 case MessageNumber.GetServerInfoResponse:
1149 case MessageNumber.OpenClientSessionResponse:
1150 case MessageNumber.RefreshResponse:
1151 case MessageNumber.ReserveIdsResponse:
1152 case MessageNumber.UndoResponse:
1153 case MessageNumber.ReconnectResponse:
1154 case MessageNumber.GetRefresh2Response:
1155 case MessageNumber.GetClusterChangesResponse:
1156 case MessageNumber.GetServerInfo2Response:
1157 case MessageNumber.ListClustersResponse: {
1158 method.gotPacket(packet);
1163 private final ConnectionManager connectionManager;
1164 private final PacketQueue inputQueue = new PacketQueue();
1165 private final Connection connection = new Connection(this);
1166 private final Manager manager = new Manager();
1167 private Thread thread = new Thread(manager, manager.toString());
1168 private int[] header = null;
1169 private int[] INTS = new int[HEADER_N];
1170 private int remaining = HEADER_SIZE;
1171 private int deflateSize;
1172 ConnectionThread(ConnectionManager connectionManager) {
1173 this.connectionManager = connectionManager;
1175 ByteBuffer getInputBuffer() {
1176 return getInputBuffer(INCREMENT_SIZE);
1178 ByteBuffer getInputBuffer(int sizeAtLeast) {
1179 ByteBuffer bb = ByteBuffer.allocate(Math.max(sizeAtLeast, INCREMENT_SIZE));
1180 bb.order(ByteOrder.LITTLE_ENDIAN);
1184 public String toString() {
1185 return "header=" + header + " remaining=" + remaining;
1187 private String toString(ByteBuffer buffer, int nRead) {
1188 return this + " nRead=" + nRead + " buffer=" + buffer;
1190 ByteBuffer handleInput(ByteBuffer buffer, int nRead) // nRead > 0
1191 throws IOException { // MARK: handleInput
1192 int position = buffer.position();
1193 if (null == header) {
1194 if (position < HEADER_SIZE)
1196 else { // Got header.
1198 IntBuffer ib = buffer.asIntBuffer();
1202 deflateSize = header[DEFLATE];
1203 if (deflateSize < 0)
1204 throw new IOException("Illegal deflate size=" + deflateSize);
1205 remaining += deflateSize;
1206 if (remaining > buffer.limit()) { // We need more space. This is not optimal but a simple way to handle this case.
1207 ByteBuffer bb = getInputBuffer(remaining);
1209 buffer.limit(position);
1213 buffer.position(position);
1216 if (position < remaining)
1217 return buffer; // Need more data.
1218 else if (position > remaining) { // Got whole packet and then some. This is not optimal but a simple way to handle this case.
1219 ByteBuffer bb = getInputBuffer(remaining);
1221 buffer.limit(remaining);
1223 inputQueue.addFirst(new Packet(header, bb));
1224 ByteBuffer b2 = getInputBuffer(buffer.limit());
1225 buffer.position(remaining);
1226 buffer.limit(position);
1229 remaining = HEADER_SIZE;
1230 return handleInput(b2, b2.position());
1231 } else if (position != remaining)
1232 throw new IOException("Assertion error. this=" + toString(buffer, nRead));
1233 // Got exactly one packet.
1234 inputQueue.addFirst(new Packet(header, buffer));
1236 remaining = HEADER_SIZE;
1237 return getInputBuffer(INCREMENT_SIZE);
1239 InetSocketAddress getAddress() {
1240 return manager.getAddress();
1242 boolean isConnected() {
1243 return manager.isConnected();
1245 void reconnect() throws ProCoreException, InterruptedException {
1246 manager.reconnect();
1248 void connect(int port) throws ProCoreException, InterruptedException {
1249 manager.connect(port);
1251 void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {
1252 manager.connect(address);
1255 manager.disconnect();
1257 Connection call(Method m) throws ProCoreException, InterruptedException, IOException {
1263 private final ProCoreServer proCoreServer;
1264 private final ProCoreClient proCoreClient;
1265 private final ConnectionThread connectionThread;
1266 private boolean closed = false;
1267 Client(ProCoreServer proCoreServer, ProCoreClient proCoreClient, ConnectionThread connectionThread) {
1268 this.proCoreServer = proCoreServer;
1269 this.proCoreClient = proCoreClient;
1270 this.connectionThread = connectionThread;
1272 void open() throws ProCoreException, InterruptedException {
1273 synchronized (proCoreClient) {
1274 if (!connectionThread.isConnected())
1275 connectionThread.connect(proCoreServer.getPort());
1280 synchronized (proCoreClient) {
1281 return connectionThread.isConnected();
1284 boolean isClosed() {
1285 synchronized (proCoreClient) {
1286 return closed || !connectionThread.isConnected();
1290 proCoreClient.deleteClient(this);
1293 String execute(String command) throws ProCoreException {
1294 return proCoreClient.execute(command);
1296 void call(Method m) throws ProCoreException {
1297 proCoreClient.call(m);
1300 class ProCoreClient {
1301 public static final String LOCALHOST = "127.0.0.1";
1303 private final ConnectionManager connectionManager = ConnectionManager.getInstance();
1304 private final ConnectionThread connectionThread = new ConnectionThread(connectionManager);
1308 private final Remote remote = new Remote();
1309 private final Set<Client> clients = new HashSet<Client>();
1310 private final ProCoreServer proCoreServer;
1311 public ProCoreClient(ProCoreServer proCoreServer) throws ProCoreException {
1312 this.proCoreServer = proCoreServer;
1314 synchronized public int getPort() {
1315 return remote.connectionThread.getAddress().getPort();
1317 synchronized public boolean isConnected() {
1318 return remote.connectionThread.isConnected();
1320 synchronized public Client newClient() throws ProCoreException {
1321 return newClient(proCoreServer, remote.connectionThread.getAddress());
1323 synchronized public Client newClient(ProCoreServer proCoreServer, InetSocketAddress address) throws ProCoreException {
1324 if (!remote.connectionThread.isConnected())
1326 remote.connectionThread.connect(address);
1327 } catch (InterruptedException e) {
1328 if (!remote.connectionThread.isConnected()) {
1329 remote.connectionThread.disconnect();
1330 throw new ProCoreException("Failed to create new client.");
1333 else if (!address.equals(remote.connectionThread.getAddress()))
1334 throw new ProCoreException("Illegal newClient call. Already connected to different address.");
1335 Client client = new Client(proCoreServer, this, remote.connectionThread);
1336 clients.add(client);
1339 synchronized int deleteClient(Client client) {
1340 clients.remove(client);
1341 return clients.size();
1343 void closeClient(int aSessionId) throws ProCoreException {
1345 CloseClientSessionFunction f = new CloseClientSessionFunction(aSessionId);
1346 Method m = new Method(f, null, null);
1347 Connection c = remote.connectionThread.call(m);
1349 } catch (InterruptedException e) {
1350 throw new ProCoreException("Thread interrupted during method call.", e);
1351 } catch (IOException e) {
1352 throw new ProCoreException("IOException during method call.", e);
1353 } catch (Throwable e) {
1354 throw new ProCoreException("Throwable during method call.", e);
1357 synchronized public String execute(String command) throws ProCoreException {
1359 ExecuteFunction f = new ExecuteFunction(command);
1360 Method m = new Method(f, null, null);
1361 Connection c = remote.connectionThread.call(m);
1364 } catch (InterruptedException e) {
1365 throw new ProCoreException("Thread interrupted during method call.", e);
1366 } catch (IOException e) {
1367 throw new ProCoreException("IOException during method call.", e);
1368 } catch (Throwable e) {
1369 throw new ProCoreException("Throwable during method call.", e);
1372 private void recall(Method method, IOException e) throws ProCoreException {// :)
1373 if (remote.connectionThread.isConnected())
1374 throw new ProCoreException("IOException during method call.", e);
1376 remote.connectionThread.reconnect();
1377 Connection c = remote.connectionThread.call(method);
1378 method.waitForReply(c);
1379 } catch (Throwable t) {
1380 throw new ProCoreException("IOException during method call. Recall failed.", e);
1383 synchronized public void call(Method method) throws ProCoreException {
1385 Connection c = remote.connectionThread.call(method);
1386 method.waitForReply(c);
1387 } catch (InterruptedException e) {
1388 throw new ProCoreException("Thread interrupted during method call.", e);
1389 } catch (IOException e) {
1391 } catch (ProCoreException e) {
1393 } catch (Throwable e) {
1394 throw new ProCoreException("Throwable during method call.", e);
1397 synchronized public void connect(int port) throws ProCoreException, InterruptedException {
1398 remote.connectionThread.connect(port);
1400 synchronized public void connect(SessionAddress sessionAddress)
1401 throws ProCoreException, InterruptedException {
1402 remote.connectionThread.connect(sessionAddress.getSocketAddress());
1404 synchronized public void disconnect() throws ProCoreException {
1405 remote.connectionThread.disconnect();
1408 abstract class Event {
1410 final Packet packet;
1411 Event(String name, Packet packet) {
1413 this.packet = packet;
1416 return packet.header.token;
1418 void deserialize(org.simantics.db.server.protocol.AbstractEvent event) {
1419 event.deserialize(event.getRequestNumber(), getDataBuffer());
1421 private org.simantics.db.server.protocol.DataBuffer getDataBuffer() {
1422 this.packet.bytes.position(20);
1423 return new org.simantics.db.server.protocol.DataBuffer(this.packet.bytes);
1425 @Override public String toString() {
1426 return name + " token=" + packet.header.token;
1429 class ChangeSetUpdateEvent extends Event {
1430 ChangeSetUpdateEvent(Packet packet) {
1431 super("ChangeSetUpdateEvent", packet);
1434 abstract class EventHandler {
1435 abstract void on(Event event);
1437 class DefaultEventHandler extends EventHandler {
1438 void on(Event event) {
1439 Util.log("Missing event handler. event=" + event);