1 /*******************************************************************************
3 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
5 * All rights reserved. This program and the accompanying materials
6 * are made available under the terms of the Eclipse Public License v1.0
7 * which accompanies this distribution, and is available at
8 * http://www.eclipse.org/legal/epl-v10.html
11 * VTT Technical Research Centre of Finland - initial API and implementation
12 *******************************************************************************/
13 package org.simantics.db.server.internal;
15 import java.io.BufferedReader;
16 import java.io.BufferedWriter;
18 import java.io.FileInputStream;
19 import java.io.FileNotFoundException;
20 import java.io.FileOutputStream;
21 import java.io.IOException;
22 import java.io.InputStreamReader;
23 import java.io.OutputStreamWriter;
24 import java.io.PrintStream;
25 import java.net.InetSocketAddress;
26 import java.nio.ByteBuffer;
27 import java.nio.ByteOrder;
28 import java.nio.IntBuffer;
29 import java.nio.file.Files;
30 import java.nio.file.Path;
31 import java.util.ArrayList;
32 import java.util.HashMap;
33 import java.util.HashSet;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.LinkedBlockingQueue;
39 import org.simantics.db.common.utils.Logger;
40 import org.simantics.db.server.DatabaseLastExitException;
41 import org.simantics.db.server.GuardFileVersionException;
42 import org.simantics.db.server.ProCoreException;
43 import org.simantics.db.server.ServerNotFoundException;
44 import org.simantics.db.server.protocol.CloseClientSessionFunction;
45 import org.simantics.db.server.protocol.ExecuteFunction;
46 import org.simantics.db.server.protocol.MessageNumber;
47 import org.slf4j.LoggerFactory;
49 public class ProCoreServer {
50 private static Map<String, ProCoreServer> workingDirs = new HashMap<String, ProCoreServer>();
51 private static Map<String, ProCoreServer> databaseIds = new HashMap<String, ProCoreServer>();
52 private static final String LOCALHOST = ProCoreClient.LOCALHOST;
53 public static final int VERSION_MAJOR = 1;
54 public static final int VERSION_MINOR = 4; // 2015-05-05
55 public static final String BRANCH_DIR = "procore.headClusters.procore";
56 public static final String TAIL_DIR = "procore.tailClusters.procore";
57 public static final String TAIL_FILE = "procore.tail.procore";
58 public static final String CONFIG_FILE = "procore.config.procore";
59 public static final String GUARD_FILE = "procore.guard.procore";
60 public static final String JOURNAL_FILE = "procore.journal.procore";
61 public static final String LOG_FILE = "procore.log.procore";
62 public static final String DCS_FILE = "procore.dumpChangeSets.procore";
63 public static final String RECOVERY_NEEDED_FILE = "recovery.needed";
64 public static final String RECOVERY_IGNORED_FILE = "recovery.ignored";
65 public static final String PROTOCOL_IGNORED_FILE = "protocol.ignored";
66 public static final String PAGE_FILE_PATTERN = "procore.page*.procore";
67 public static final String DATA_PREFIX = "ClusterData.";
68 public static final String DATA_PATTERN = DATA_PREFIX + "*";
69 public static final String INDEX_PREFIX = "ClusterIndex.";
70 public static final String INDEX_PATTERN = INDEX_PREFIX + "*";
71 public static final String VALUE_PREFIX = "ExternalValue.";
72 public static final String VALUE_PATTERN = VALUE_PREFIX + "*";
73 public static final String VALUE_SUFFIX = ".procore";
74 public static final String DELETED_PREFIX = "ClusterDeleted.";
75 public static final String DELETED_PATTERN = DELETED_PREFIX + "*";
76 private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;
77 public static final String QUIT_CMD = "quit";
78 // private static final int SHORT_SLEEP_COUNT = 1;
79 // private static final int LONG_SLEEP_COUNT = 10;
80 // static public void stopProCoreServer(File workingDir) throws ProCoreException {
81 // GuardFile guardFile = new GuardFile(workingDir);
82 // guardFile.stopProCoreServer();
84 private static boolean isRunning(String aPid) throws ProCoreException {
85 boolean isRunning = false;
87 switch (OSType.calculate()) {
89 Process p = Runtime.getRuntime().exec("tasklist.exe /fo csv /nh /fi \"pid eq " + aPid + "\"");
90 BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
91 String line = input.readLine();
92 if (line.matches(".*ProCoreServer.exe.*"))
98 Process p = Runtime.getRuntime().exec("ps -p " + aPid + " -o comm=");
99 BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
100 String line = input.readLine();
101 if (line.matches(".*ProCoreServer.*"))
108 } catch (Exception e) {
109 throw new ProCoreException("Could not get list of running processes.", e);
113 static class ProCoreProcess {
114 private enum State { Created, Initialized, ThreadStarted, ProcessStarted, Prompted, GotPort, ThreadStopped };
115 class Manager implements Runnable {
116 private State state = State.Created;
117 private CountDownLatch serverPrompted;
118 private LinkedBlockingQueue<String> reply;
119 private volatile String command;
120 private volatile Integer exitValue;
121 private volatile boolean carryOn;
122 private volatile int port;
123 private final File dbFolder; // Just for exception i.e. error reporting.
124 Manager(File dbFolder) {
125 this.dbFolder = dbFolder;
128 private void init() {
129 serverPrompted = new CountDownLatch(1);
130 reply = new LinkedBlockingQueue<String>();
134 state = State.Initialized;
139 void tryToStart() throws ProCoreException, InterruptedException {
140 if (thread.isAlive())
142 thread = new Thread(manager, manager.initAndGetName());
144 serverPrompted.await();
145 if (!state.equals(State.Prompted))
146 throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");
147 String reply = giveCommand("print port");
149 port = Integer.parseInt(reply);
150 } catch (NumberFormatException e) {
151 if (null != exitValue && 0 != exitValue)
152 throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");
153 if (reply.equals("End of server thread."))
154 throw new ProCoreException("Server did not start (did not prompt). state=" + state);
156 throw new ProCoreException("Server did not start. Could not parse port. reply=" + reply);
158 state = State.GotPort;
160 String giveCommand(String command) throws ProCoreException, InterruptedException {
161 if (!thread.isAlive())
162 throw new ProCoreException("Server thread not alive.");
163 if (null != this.command)
164 throw new ProCoreException("Old command pending.");
165 this.command = command;
168 private String initAndGetName() {
170 return "ProCoreProcess " + ++instanceCount + " " + builder.directory();
172 boolean isActive() throws ProCoreException {
173 if (null == exitValue && thread.isAlive())
178 boolean tryToStop() throws ProCoreException {
179 if (!thread.isAlive())
184 thread.join(10000); // milliseconds
185 } catch (InterruptedException e) {
187 return !thread.isAlive();
191 state = State.ThreadStarted;
192 Util.log("ProCoreServer thread started.");
194 Util.trace("ProCoreProcessManager start.");
195 Process process = builder.start();
196 exitValue = exitValue(process);
197 if (null != exitValue)
198 Util.logError("Server process did not start.");
200 state = State.ProcessStarted;
201 runProcessStarted(process); // Process start
203 } catch (IOException e) {
204 Util.logError("Failed during process watch.");
206 reply.add("End of server thread.");
207 Util.trace("ProCoreServerThread stop.");
208 state = State.ThreadStopped;
209 serverPrompted.countDown();
212 private void runProcessStarted(Process process) throws IOException {
213 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
214 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));)
219 handleLine(reader.readLine(), reader, writer);
220 else if (null != command) {
221 writer.write(command);
225 String line = getReply(reader);
228 Thread.sleep(100); // milliseconds
229 } catch (InterruptedException e) {
230 Util.trace("ProCoreServerThread interrupted.");
232 exitValue = exitValue(process);
233 if (null == exitValue && !carryOn)
236 writer.write(QUIT_CMD);
239 } catch (Throwable t) {
240 Util.logError("Wait for process death was interrupted.", t);
243 exitValue = process.waitFor();
244 } catch (InterruptedException e) {
245 Util.logError("Wait for process death was interrupted.", e);
249 } while (null == exitValue);
250 } // End of resource try.
252 private String getReply(BufferedReader reader) throws IOException {
253 StringBuilder sb = new StringBuilder();
255 String line = reader.readLine();
256 if (null == line || line.startsWith("Waiting for input?"))
260 return sb.toString();
262 private void handleLine(String line, BufferedReader reader, BufferedWriter writer) throws IOException {
264 Util.showDebug(line);
265 if (line.startsWith("Main end.")) {
266 carryOn = false; // Last line from server.
267 } else if (serverPrompted.getCount() > 0 && line.startsWith("Waiting for input?")) {
268 state = State.Prompted;
269 serverPrompted.countDown(); // Server has been started and is waiting for console input.
271 Util.trace("From server. line=" + line);
273 private Integer exitValue(Process process) {
275 return process.exitValue();
276 } catch (IllegalThreadStateException e) {
277 return null; // Still alive.
281 private static int instanceCount; // Keeps count of thread instances.
282 private final ProcessBuilder builder;
283 private final Manager manager;
284 private Thread thread;
285 ProCoreProcess(File exeFile, File workingDir) {
286 String[] args = new String[0];
287 String[] cmd = new String[1 + args.length];
288 cmd[0] = exeFile.getAbsolutePath().toString();
289 System.arraycopy(args, 0, cmd, 1, args.length);
290 builder = new ProcessBuilder(cmd);
291 builder.redirectErrorStream(true);
292 builder.directory(workingDir);
293 manager = new Manager(workingDir);
294 thread = new Thread(manager, manager.initAndGetName());
296 public void tryToStart() throws ProCoreException, InterruptedException {
297 manager.tryToStart();
299 // public int exitValue() throws IllegalStateException {
300 // Integer i = proCoreServerThread.getExitValue();
302 // throw new IllegalStateException("ProCoreServerThread is still running.");
305 // public boolean ignoreProtocolVersion() {
306 // File wd = builder.directory();
309 // Path ignore = wd.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE);
310 // if (Files.exists(ignore))
314 public boolean isActive() throws ProCoreException {
315 return manager.isActive();
317 public boolean tryToStop() throws ProCoreException {
318 return manager.tryToStop();
320 int getPort() throws ProCoreException {
321 return manager.getPort();
324 private static enum OSType {
325 UNIX, WINDOWS, UNKNOWN;
327 public static OSType calculate() {
328 String osName = System.getProperty("os.name");
329 assert osName != null;
330 osName = osName.toLowerCase();
331 if (osName.startsWith("windows"))
333 if (osName.startsWith("mac os x") || osName.startsWith("linux") || osName.startsWith("sun"))
338 static public boolean ignoreProtocolVersion(File dbFolder) {
339 if (null == dbFolder)
341 if (Files.exists(dbFolder.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE)))
345 static public ProCoreServer getProCoreServer(File serverDir, final File workingDir)
346 throws ProCoreException, ServerNotFoundException {
348 ProCoreServer proCoreServer;
349 synchronized (workingDirs) {
350 proCoreServer = workingDirs.get(workingDir.getCanonicalPath());
351 if (null != proCoreServer)
352 return proCoreServer;
353 if (!serverDir.isDirectory())
354 throw new ProCoreException("Expected server directory as argument");
355 if (!workingDir.isDirectory())
356 throw new ProCoreException("Expected working directory as argument. wd=" + workingDir.getAbsolutePath());
357 String exeSuffix = Activator.getExeSuffix();
358 File executable = new File(serverDir, "ProCoreServer" + exeSuffix);
359 proCoreServer = new ProCoreServer(executable, workingDir);
360 workingDirs.put(workingDir.getCanonicalPath(), proCoreServer);
361 return proCoreServer;
363 } catch (IOException e) {
364 throw new ProCoreException("IOException", e);
368 public static void logVersionInformation() {
369 StringBuilder msg = new StringBuilder(200);
370 String nl = " \n"; // Default log error removes newlines so I add space to make the text easier to read.
371 msg.append("ProCore version information:");
373 msg.append("ProCoreServer version=" + ProCoreServer.VERSION_MAJOR + "." + ProCoreServer.VERSION_MINOR + " protocol=" + MessageNumber.ProtocolVersionMajor + "." + MessageNumber.ProtocolVersionMinor);
375 msg.append(" folder=" + org.simantics.db.server.internal.Activator.getServerFolder());
376 } catch (IOException e) {
377 msg.append(" folder information not available");
380 msg.append("svn id=$Id: ProCoreServer.java r31684 2015-09-10 13:45:00Z $");
381 String str = msg.toString();
382 LoggerFactory.getLogger(ProCoreServer.class).info(str);
384 private static String[] getFileLines(final File aFile, int maxLines) {
385 final String[] STRING_EMPTY_ARRAY = new String[0];
387 if (!aFile.canRead()) {
388 return STRING_EMPTY_ARRAY;
390 try (FileInputStream fis = new FileInputStream(aFile);
391 InputStreamReader isr = new InputStreamReader(fis, "US-ASCII");
392 BufferedReader br = new BufferedReader(isr);){
395 ArrayList<String> lines = new ArrayList<String>();
396 while ((line = br.readLine()) != null) {
397 if (++count > maxLines)
398 break; // Caller not interested in more lines.
401 return lines.toArray(STRING_EMPTY_ARRAY);
402 } catch (FileNotFoundException e) {
403 return STRING_EMPTY_ARRAY;
405 } catch (Throwable t) {
408 Logger.defaultLogError(t);
410 return new String[0];
412 private static int parseInt(String s)
413 throws ProCoreException {
415 return Integer.parseInt(s);
416 } catch (NumberFormatException e) {
417 String msg = "Could not convert string to number. string=" + s;
418 Logger.defaultLogError(msg);
419 throw new ProCoreException(msg, e);
422 private String databaseId = null;
423 private final GuardFile guardFile;
424 private final ProCoreClient proCoreClient;
425 ProCoreClient getProCoreClient() {
426 return proCoreClient;
428 private final ProCoreProcess proCoreProcess;
429 private ProCoreServer(File exeFile, File workingDir)
430 throws ProCoreException {
432 throw new ProCoreException("Illegal argument: exeFile is null.");
433 if (null == workingDir)
434 throw new ProCoreException("Illegal argument: workingDir is null.");
435 guardFile = new GuardFile(workingDir);
436 proCoreClient = new ProCoreClient(this);
437 proCoreProcess = new ProCoreProcess(exeFile, workingDir);
438 Util.log("ProCoreServer exe: " + exeFile);
439 Util.log("ProCoreServer folder: " + workingDir);
440 logVersionInformation();
443 public synchronized String execute(String command) throws ProCoreException, InterruptedException {
444 return proCoreClient.execute(command);
446 private void startInternal()
447 throws ProCoreException {
448 if (proCoreClient.isConnected())
452 proCoreProcess.tryToStart();
453 int port = proCoreProcess.getPort();
454 SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);
455 proCoreClient.connect(sa);
457 } catch (InterruptedException e) {
458 Util.logError("ProCoreProcess start was interrupted.", e);
460 } catch (DatabaseLastExitException e) {
461 throw e; // This situation can be analyzed and handled by start handler.
462 } catch (ProCoreException e) {
463 Util.logError("Failed to start ProCoreServer process.", e);
466 boolean isActive = guardFile.isActive(proCoreClient);
468 throw new ProCoreException("Failed to connect to ProCoreServer.", exception);
470 // private int getWaitCount() throws ProCoreException {
471 // if (!proCoreProcess.isActive())
472 // return SHORT_SLEEP_COUNT; // Short wait because server process is not running.
474 // return LONG_SLEEP_COUNT;
476 // private void startInternalReinit(int exitStatus)
477 // throws ProCoreException {
478 // switch (exitStatus) {
479 // default: // What has happened here?
480 // throw new DatabaseStartException(workingDir, getExitMessage(exitStatus));
481 // case -1: // Exit status not available, not sure what has happened here.
482 // case 29: // Server died because it could not read exit status from guard file.
483 // case 30: // Server died because old exit status was not zero.
484 // case 40: // Server died because it received terminating signal.
485 // case 42: // Server died because it could not read end status from guard file.
486 // case 43: // Server died because end value was not zero.
487 // case 44: // Server died because server could not read version from guard file.
488 // case 45: // Server died because server did not support guard file version.
489 // case 47: // Server died because server could not listen to any port.
491 // case 32: // Server died becuase database version did not match.
492 // throw new DatabaseVersionException(workingDir, getExitMessage(exitStatus));
493 // case 33: // Server died because database has been corrupted.
494 // throw new DatabaseCorruptedException(workingDir, getExitMessage(exitStatus));
495 // case 39: // Server died because last exit was not clean.
496 // throw new DatabaseLastExitException(workingDir, getExitMessage(exitStatus));
498 // boolean dead = guardFile.wait4ServerToDie(proCoreClient, getWaitCount());
500 // StringBuilder sb = new StringBuilder(256);
501 // sb.append("ProCoreServer is running according to guard file. Giving up trying to start a new instance. Server exit message:\n");
502 // sb.append(getExitMessage(exitStatus));
503 // throw new DatabaseStartException(workingDir, sb.toString());
505 // boolean deleted = guardFile.delete();
507 // Logger.defaultLogError("ProCoreServer is dead but can not delete guard file. " + getExitMessage(exitStatus));
510 public synchronized boolean isActive()
511 throws ProCoreException {
512 if (!proCoreClient.isConnected())
513 return guardFile.isActive(proCoreClient);
514 proCoreClient.execute("");
517 public synchronized boolean isAlive()
518 throws ProCoreException {
519 if (proCoreClient.isConnected())
521 if (proCoreProcess.isActive())
523 return guardFile.isAlive();
525 Client newClient() throws ProCoreException {
526 if (proCoreClient.isConnected())
527 return proCoreClient.newClient();
528 int port = getPort();
529 InetSocketAddress isa = new InetSocketAddress(LOCALHOST, port);
530 return proCoreClient.newClient(this, isa);
532 public synchronized boolean tryToStop()
533 throws ProCoreException {
534 synchronized (proCoreClient) {
535 ProCoreException ex = null;
536 if (!proCoreClient.isConnected() && !proCoreProcess.isActive()) {
538 proCoreClient.connect(getPort());
539 } catch (InterruptedException e) {
542 if (proCoreClient.isConnected()) {
545 s = proCoreClient.execute(QUIT_CMD);
546 String t = s.replaceAll("\n", "");
547 int n = Integer.parseInt(t);
549 Util.log("Number of connections after quit command is " + n + ".");
550 proCoreClient.closeClient(0);
551 if (!isLocal() && n > 1)
552 throw new ProCoreException("More than one connection for remote. n=" + n);
553 } catch (NumberFormatException e) {
554 Util.logError("Failed to parse number of connections. s=" + s, e);
555 } catch (ProCoreException e) {
558 proCoreClient.disconnect();
562 proCoreProcess.tryToStop();
564 if (proCoreProcess.isActive() && null == ex)
565 ex = new ProCoreException("Failed to stop ProCoreProcess.");
569 return !proCoreClient.isConnected() && !proCoreProcess.isActive();
572 public synchronized boolean isConnected()
573 throws ProCoreException {
574 return proCoreClient.isConnected();
576 public synchronized boolean isLocal()
577 throws ProCoreException {
578 return proCoreClient.isConnected() && proCoreProcess.isActive();
580 public synchronized void disconnect()
581 throws ProCoreException {
582 proCoreClient.disconnect();
584 public synchronized void connect()
585 throws ProCoreException, InterruptedException {
586 int port = getPort();
588 throw new ProCoreException("Port 0 not supported as connection address.");
589 proCoreClient.connect(port);
591 private String getExitMessage(int exitValue) {
594 return "Unexpected exit value = " + exitValue + ".";
596 return "Exception was thrown. This indicates problems with program logic.";
598 return "CallException was thrown. This indicates problems with calls to/from parent server.";
600 return "ProtocolException was thrown. This indicates problems with executable versions between local and parent servers.";
602 return "NoSuchElementException was thrown. This indicates problems with program logic.";
604 return "IllegalArgumentException was thrown. This indicates problems with program logic.";
606 return "IllegalResourceException was thrown. This indicates problems with server data.";
608 return "IllegalClusterException was thrown. This indicates problems with server data.";
610 return "ParseException was thrown. This indicates problems with/during parsing of the configuration file.";
612 return "PortSemaphoreException was thrown. This indicates that some other server is using the same port number.";
614 return "DatabaseSemaphoreException was thrown. This indicates that some other server is using the same database id.";
616 return "SystemException was thrown. This indicates problems with program logic.";
618 return "CppException was thrown. This indicates problems with program logic.";
620 return "UnknownException was thrown. This indicates problems with program logic.";
622 return "ThrowException was thrown. This indicates problems with program logic.";
624 return "AssertException was thrown. This indicates problems with program logic.";
626 return "ParentCheckException was thrown. This indicates problems with the database version of parent.";
628 return "ParentConnectionException was thrown. This indicates problems with parent child relationship.";
630 return "ConnectionException was thrown. This indicates problems with TCP/IP connections.";
632 return "ProxyException was thrown. This indicates problems with TCP/IP connections to proxy.";
634 return "NoSuchFileException was thrown. This indicates problems with a missing file.";
636 return "JournalException was thrown. This indicates problems with journal mechanisim.";
638 return "OutOfSpaceException was thrown. This indicates problems with memory.";
640 return "ExternalValueException was thrown. This indicates problems with large value handling.";
642 return "IllegalTransactionException was thrown. This indicates problems with transaction logic.";
644 return "WriteTransactionException was thrown. This indicates that server failed during write transaction.";
646 return "ConsoleException was thrown. This indicates that server console was closed without quit command.";
648 return "ExitException was thrown. This indicates that server did not exit cleanly.";
650 return "ExitReadPIdException was thrown. This indicates that server did not exit cleanly.";
652 return "ExitReadStatusException was thrown. This indicates that server could not read exit status from guard file.";
654 return "ExitStatusValueException was thrown. This indicates that server did not exit cleanly.";
656 return "UpdateException was thrown. This indicates that server failed during update.";
658 return "DatabaseVersionException was thrown. This indicates that server can not use given database.";
660 return "DatabaseCorruptionException was thrown. This indicates that server has detected database corruption.";
662 return "ExitReadPortException was thrown. This indicates that server did not start or exit cleanly.";
664 return "ExitGuardOpenException was thrown. This indicates that server could not open guard file.";
666 return "ExitGuardOpenWriteException was thrown. This indicates that server could not open guard file for writing.";
668 return "ExitGuardWritePidException was thrown. This indicates that server could not write pid to guard file.";
670 return "ExitGuardWritePortException was thrown. This indicates that server could not write port to guard file.";
672 return "LastCommitNotCleanException was thrown. This indicates that client died during transaction. Recovery action needed.";
674 return "TerminatedException was thrown. This indicates that server died because it received terminating signal.";
676 return "ExitGuardWriteStatusException was thrown. This indicates that server could not write exit status to guard file.";
678 return "EndReadStatusException was thrown. This indicates that server died because it could not read end status from guard file.";
680 return "EndStatusValueException was thrown. This indicates that server died because end value was not zero.";
682 return "ExitGuardReadVersionException was thrown. This indicates that server died because server could not read version from guard file.";
684 return "ExitGuardValueVersionException was thrown. This indicates that server died because server did not support guard file version.";
686 return "ExitGuardWriteVersionException was thrown. This indicates that server died because server could not write version to guard file.";
688 return "ListenPortException was thrown. This indicates that server died because server could not listen to given port or any other port.";
690 return "JournalVersionException was thrown. This indicates that database was written by older version.";
692 return "TailFileException was thrown. This indicates problems with handling of data involved with purge operation.";
695 synchronized void start()
696 throws ProCoreException {
699 } catch (DatabaseLastExitException e) {
700 throw e; // This can be analyzed and handled by start handler.
701 } catch (Throwable t) {
702 ProCoreException pe = null;
703 if (t instanceof ProCoreException)
704 pe = (ProCoreException)t;
706 String msg = "ProCoreServer startup failed."
707 + " Automatic rcovery handling not implemented."
708 + " Database must be fixed manually.";
709 pe = new ProCoreException(msg, t);
711 if (null == pe.getDbFolder())
712 pe.setDbFolder(proCoreProcess.builder.directory());
715 synchronized void stop()
716 throws ProCoreException, InterruptedException{
717 if (null != databaseId)
718 databaseIds.remove(databaseId);
719 boolean dead = tryToStop();
721 throw new ProCoreException("Failed to stop ProCoreServer.");
723 public synchronized String getExitMessage()
724 throws ProCoreException {
725 Integer returnValue = guardFile.getExitValue();
726 if (null == returnValue)
727 throw new ProCoreException("Exit message not available.");
728 return getExitMessage(returnValue);
731 public synchronized ServerAddress getAddress()
732 throws ProCoreException {
733 return new ServerAddress(LOCALHOST, getPort());
736 public synchronized int getPort()
737 throws ProCoreException {
739 if (proCoreClient.isConnected()) {
740 port = proCoreClient.getPort();
744 if (proCoreProcess.isActive()) {
745 port = proCoreProcess.getPort();
749 return guardFile.getPort();
751 public synchronized TailFile.Data getTailData()
752 throws ProCoreException {
753 Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.TAIL_DIR);
754 return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());
756 // public synchronized TailFile.Data getTailDataFromHead()
757 // throws ProCoreException {
758 // Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.BRANCH_DIR);
759 // return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());
761 public static class GuardFile {
762 public static final String GUARD_FILE = ProCoreServer.GUARD_FILE;
763 private static final int VERSION_LINE = 1;
764 private static final int PID_LINE = 2;
765 private static final int PORT_LINE = 3;
766 private static final int EXIT_LINE = 4;
767 private static final int END_LINE = 5;
768 private final File guardFile;
769 GuardFile(File workingDir) {
770 guardFile = new File(workingDir, GUARD_FILE);
772 int getPort() throws ProCoreException {
773 String[] lines = getFileLines(guardFile, END_LINE);
774 if (lines.length < PORT_LINE)
775 throw new ProCoreException("Server port is not available. " + guardFile.getAbsolutePath());
776 return parseInt(lines[PORT_LINE-1]);
778 private void checkVersion(String[] lines) throws ProCoreException {
779 if (lines.length < VERSION_LINE)
780 throw new ProCoreException("Guard file version is not available. " + guardFile.getAbsolutePath());
781 int version = parseInt(lines[VERSION_LINE-1]);
783 throw new GuardFileVersionException(guardFile, "Unsupported guard file version. version=" + version);
786 if (guardFile.exists())
787 return guardFile.delete();
789 return true; // Guard file is deleted i.e. does not exit.
792 if (!guardFile.exists())
795 Files.delete(guardFile.toPath());
796 } catch (Throwable t) {
800 Integer getExitValue() {
801 String lines[] = getFileLines(guardFile, END_LINE);
804 if (lines.length >= EXIT_LINE)
805 return parseInt(lines[EXIT_LINE-1]);
806 else if (lines.length >= PID_LINE && !isRunning(lines[PID_LINE-1]))
807 return -1; // Server has died without writing exit status.
808 return null; // Server might be running.
809 } catch (Exception e) {
810 Logger.defaultLogError("Could not get ProCoreServer exit value.", e);
811 return null; // Server might be running.
814 // void stopProCoreServer() throws ProCoreException {
815 // if (!guardFile.exists())
816 // throw new ServerGuardNotFoundException(guardFile.getParentFile(), "Guard file does not exist.");
817 // String lines[] = new String[0];
818 // for (int i=0; i<LONG_SLEEP_COUNT; ++i, sleep(i, LONG_SLEEP_COUNT)) {
819 // lines = getFileLines(guardFile, END_LINE);
820 // if (lines.length < 1)
821 // throw new ProCoreException("Guard file does not contain lines.");
822 // checkVersion(lines);
823 // if (lines.length >= END_LINE)
824 // return; // Server is dead according to guard file.
825 // if (!isRunning(lines[PID_LINE-1]))
826 // return; // ProCoreServer is not running.
827 // if (lines.length >= PORT_LINE) // ProCoreServer is running according to pid. Either it's slow or pid is for some other file.
830 // if (lines.length < PORT_LINE)
831 // throw new ProCoreException("Guard file does not contain port.");
833 // int port = parseInt(lines[PORT_LINE-1]);
835 // ProCoreClient lProCoreClient = new ProCoreClient();
836 // SessionAddress sa = new SessionAddressImpl(new InetSocketAddress(LOCALHOST, port), true);
837 // lProCoreClient.connect(sa);
839 // String reply = lProCoreClient.execute(QUIT_CMD);
840 // int nConnect = Integer.parseInt(reply);
842 // throw new ProCoreException("More than one active connection in server while giving quit command. folder=" + guardFile.getParent());
844 // lProCoreClient.disconnect();
847 // } catch (Throwable t) {
848 // throw new ProCoreException("Failed to stop procore server cleanly.", t);
850 // // Managed to send quit command to server, let's wait and see if it has any effect.
851 // boolean dead = wait4ServerToDie(null, LONG_SLEEP_COUNT);
853 // throw new ProCoreException("Unable to stop server.");
855 boolean isActive(ProCoreClient proCoreClient) throws ProCoreException {
856 if (proCoreClient.isConnected())
857 throw new ProCoreException("Illegal argument. ProCoreClient must not be connected.");
858 String lines[] = getFileLines(guardFile, END_LINE);
859 if (lines.length != PORT_LINE)
863 int port = parseInt(lines[PORT_LINE-1]);
866 // Checking if core is responsive.
867 // This should be supported regardless of version.
868 SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);
869 proCoreClient.connect(sa);
870 proCoreClient.execute(""); // Dubious, not necessarily supported by all protocol versions.
872 } catch (ProCoreException e) {
873 } finally { // We must disconnect or check protocol version.
874 proCoreClient.disconnect();
877 } catch (Throwable t) {
878 // Intentionally empty.
882 boolean isAlive() throws ProCoreException {
883 String lines[] = getFileLines(guardFile, END_LINE);
884 if (lines.length < 1)
885 return false; // No lines, assuming server is dead.
887 if (lines.length < PID_LINE)
888 return false; // No process id, assuming server is dead.
889 else if (lines.length >= END_LINE)
890 return false; // End status available, assuming server is dead.
891 else if (lines.length >= EXIT_LINE)
892 return false; // Exit status available, assuming server is dying.
893 else if (isRunning(lines[PID_LINE-1]))
894 return true; // Process with given id running. Server might be alive.
896 return false;// Process with given id not running.
899 public static class TailFile {
900 public static class Data {
901 Data(long version, long nextChangeSetId, long nextFreeId) {
902 this.version = version;
903 this.nextChangeSetId = nextChangeSetId;
904 this.nextFreeId = nextFreeId;
907 public long nextChangeSetId;
910 public static void createTailFile(File file, long nextChangeSetId, long nextFreeId, String dbId) throws ProCoreException {
912 throw new TailException("Tail file exists and thus cannot be created. file=" + file);
913 FileOutputStream fos;
915 fos = new FileOutputStream(file);
916 } catch (FileNotFoundException e) {
917 throw new TailException("Tail file cannot be created. file=" + file);
919 PrintStream ps = new PrintStream(fos);
921 ps.println(1); // file version
922 ps.println(nextChangeSetId); // next revision
923 ps.println(nextFreeId); // next free cluster
924 ps.println(dbId); // database id
929 public static Data readTailFile(File file) throws ProCoreException {
930 String lines[] = getFileLines(file, 3);
931 if (lines.length < 3)
932 throw new TailReadException("Could not read data from " + file);
933 long ver = Long.parseLong(lines[0]);
934 long cs = Long.parseLong(lines[1]);
935 long id = Long.parseLong(lines[2]);
936 return new Data(ver, cs, id);
940 class SessionAddress {
941 private final InetSocketAddress socketAddress;
942 private final boolean ignoreProtocolVersion;
943 public SessionAddress(InetSocketAddress socketAddress, boolean ignoreProtocolVersion) {
944 assert(null != socketAddress);
945 this.socketAddress = socketAddress;
946 this.ignoreProtocolVersion = ignoreProtocolVersion;
948 public InetSocketAddress getSocketAddress() {
949 return socketAddress;
951 public boolean ignoreProtocolVersion() {
952 return ignoreProtocolVersion;
955 public String toString() {
956 return socketAddress.toString();
960 private final LinkedBlockingQueue<Packet> packets = new LinkedBlockingQueue<Packet>();
963 void addFirst(Packet packet) {
966 Packet takeFirst() throws InterruptedException {
967 return packets.take();
972 lastTokenIn = ints[0];
974 messageNumber = ints[2];
975 inflateSize = ints[3];
976 deflateSize = ints[4];
984 public String toString() {
985 StringBuilder sb = new StringBuilder(80);
986 sb.append("lastTokenIn=");
987 sb.append(lastTokenIn);
988 sb.append(" token=");
990 sb.append(" messageNumber=");
991 sb.append(messageNumber);
992 sb.append(" inflateSize=");
993 sb.append(inflateSize);
994 sb.append(" deflateSize=");
995 sb.append(deflateSize);
996 return sb.toString();
1002 Packet(int[] header, ByteBuffer bytes) {
1003 this.header = new Header(header);
1007 class ConnectionThread {
1008 private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;
1009 private static final int HEADER_N = 5;
1010 private static final int HEADER_SIZE = HEADER_N * 4;
1011 private static final int DEFLATE = 4;
1012 private static final int INCREMENT_SIZE = 8192;
1013 private static int instanceCount = 0;
1014 class Manager implements Runnable {
1015 private final int instance;
1016 private volatile boolean carryOn = true;
1017 private volatile int lastTokenIn = 0;
1018 private volatile MethodQueue methodQueue = new MethodQueue();
1019 private volatile CountDownLatch running = new CountDownLatch(1);
1020 private final EventHandler defaultEventHandler = new DefaultEventHandler();
1021 // private EventHandler eventHandler = defaultEventHandler;
1023 instance = ++instanceCount;
1026 public String toString() {
1027 return "Connection " + instance;
1029 InetSocketAddress getAddress() {
1030 return connection.getAddress();
1032 boolean isConnected() {
1033 return connection.isConnected();
1035 void reconnect() throws ProCoreException, InterruptedException {
1036 InetSocketAddress a = connection.getAddress();
1037 if (connection.addressNotInitialized() || 0 == a.getPort())
1038 throw new ProCoreException("Address not ok. address=" + connection.getAddress());
1041 void connect(int port) throws ProCoreException, InterruptedException {
1042 InetSocketAddress a = connection.getAddress();
1043 if (connection.addressNotInitialized() || 0 == a.getPort() || (port != 0 && port != a.getPort()))
1044 a = new InetSocketAddress(ProCoreClient.LOCALHOST, port);
1047 void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {
1048 if (connection.isConnected()) {
1049 if (connection.equalsAddress(address))
1052 throw new ProCoreException("Already connected to different address. old=" + connection.getAddress() + " new=" + address);
1054 connection.init4Connection(address);
1055 connectionManager.connect(connection);
1056 // compression = new Compression();
1057 if (thread.isAlive())
1062 void call(Method method) throws ProCoreException, InterruptedException, IOException {
1063 // eventHandler = method.getEventHandler(eventHandler);
1064 connection.sendMethod(methodQueue, method, lastTokenIn);
1067 methodQueue.close();
1068 connection.disconnect();
1071 public void run() { // MARK: CT.Manger.run
1073 running.countDown();
1075 Packet packet = null;
1077 packet = inputQueue.takeFirst();
1078 } catch (InterruptedException e) {
1079 Util.trace("Wait for input packet interrupted. this=" + this);
1084 handlePacket(packet);
1087 Util.trace("Thread " + thread.getName() + " stopped with null packet.");
1089 Util.trace("Thread " + thread.getName() + " stopped with false carryOn.");
1090 } catch (Throwable t) {
1091 Util.logError("Thread " + thread.getName() + " stopped to exception:", t);
1094 running.countDown();
1097 private void handlePacket(Packet packet) throws ProCoreException {
1098 if (packet.header.token > lastTokenIn) {
1099 lastTokenIn = packet.header.token;
1101 Util.showDebug("Setting lastTokeinIn=" + lastTokenIn);
1103 Method method = methodQueue.remove(packet.header.token);
1104 if (null == method) {
1106 switch (packet.header.messageNumber) {
1108 Util.logError("Missing method. token=" + packet.header.token);
1110 case MessageNumber.ChangeSetUpdateRequest: {
1111 defaultEventHandler.on(new ChangeSetUpdateEvent(packet));
1114 } catch (Throwable t) {
1115 Util.logError("Event handler failed:", t);
1120 // eventHandler = method.getEventHandler(defaultEventHandler);
1121 switch (packet.header.messageNumber) {
1123 String s = "Illegal message number " + packet.header.messageNumber;
1124 method.gotException(s);
1126 case MessageNumber.ChangeSetUpdateRequest:
1127 method.getEventHandler(defaultEventHandler).on(new ChangeSetUpdateEvent(packet));
1128 methodQueue.add(method); // Event caused by method. Actual method not yet handled.
1130 case MessageNumber.NullResponse:
1131 //Util.log("Null(ack) message. method=" + method + " packet=" + packet);
1133 case MessageNumber.ExceptionResponse:
1134 method.gotException(packet);
1135 break; // MARK: handlePacket
1136 case MessageNumber.AAAResponse:
1137 case MessageNumber.AskTransactionResponse:
1138 case MessageNumber.CancelCommitResponse:
1139 case MessageNumber.CloseClientSessionResponse:
1140 case MessageNumber.EchoResponse:
1141 case MessageNumber.EndTransactionResponse:
1142 case MessageNumber.ExecuteResponse:
1143 case MessageNumber.GetChangeSetContextResponse:
1144 case MessageNumber.GetChangeSetDataResponse:
1145 case MessageNumber.GetChangeSetsResponse:
1146 case MessageNumber.GetClusterNewResponse:
1147 case MessageNumber.GetRefreshResponse:
1148 case MessageNumber.GetResourceSegmentResponse:
1149 case MessageNumber.GetServerInfoResponse:
1150 case MessageNumber.OpenClientSessionResponse:
1151 case MessageNumber.RefreshResponse:
1152 case MessageNumber.ReserveIdsResponse:
1153 case MessageNumber.UndoResponse:
1154 case MessageNumber.ReconnectResponse:
1155 case MessageNumber.GetRefresh2Response:
1156 case MessageNumber.GetClusterChangesResponse:
1157 case MessageNumber.GetServerInfo2Response:
1158 case MessageNumber.ListClustersResponse: {
1159 method.gotPacket(packet);
1164 private final ConnectionManager connectionManager;
1165 private final PacketQueue inputQueue = new PacketQueue();
1166 private final Connection connection = new Connection(this);
1167 private final Manager manager = new Manager();
1168 private Thread thread = new Thread(manager, manager.toString());
1169 private int[] header = null;
1170 private int[] INTS = new int[HEADER_N];
1171 private int remaining = HEADER_SIZE;
1172 private int deflateSize;
1173 ConnectionThread(ConnectionManager connectionManager) {
1174 this.connectionManager = connectionManager;
1176 ByteBuffer getInputBuffer() {
1177 return getInputBuffer(INCREMENT_SIZE);
1179 ByteBuffer getInputBuffer(int sizeAtLeast) {
1180 ByteBuffer bb = ByteBuffer.allocate(Math.max(sizeAtLeast, INCREMENT_SIZE));
1181 bb.order(ByteOrder.LITTLE_ENDIAN);
1185 public String toString() {
1186 return "header=" + header + " remaining=" + remaining;
1188 private String toString(ByteBuffer buffer, int nRead) {
1189 return this + " nRead=" + nRead + " buffer=" + buffer;
1191 ByteBuffer handleInput(ByteBuffer buffer, int nRead) // nRead > 0
1192 throws IOException { // MARK: handleInput
1193 int position = buffer.position();
1194 if (null == header) {
1195 if (position < HEADER_SIZE)
1197 else { // Got header.
1199 IntBuffer ib = buffer.asIntBuffer();
1203 deflateSize = header[DEFLATE];
1204 if (deflateSize < 0)
1205 throw new IOException("Illegal deflate size=" + deflateSize);
1206 remaining += deflateSize;
1207 if (remaining > buffer.limit()) { // We need more space. This is not optimal but a simple way to handle this case.
1208 ByteBuffer bb = getInputBuffer(remaining);
1210 buffer.limit(position);
1214 buffer.position(position);
1217 if (position < remaining)
1218 return buffer; // Need more data.
1219 else if (position > remaining) { // Got whole packet and then some. This is not optimal but a simple way to handle this case.
1220 ByteBuffer bb = getInputBuffer(remaining);
1222 buffer.limit(remaining);
1224 inputQueue.addFirst(new Packet(header, bb));
1225 ByteBuffer b2 = getInputBuffer(buffer.limit());
1226 buffer.position(remaining);
1227 buffer.limit(position);
1230 remaining = HEADER_SIZE;
1231 return handleInput(b2, b2.position());
1232 } else if (position != remaining)
1233 throw new IOException("Assertion error. this=" + toString(buffer, nRead));
1234 // Got exactly one packet.
1235 inputQueue.addFirst(new Packet(header, buffer));
1237 remaining = HEADER_SIZE;
1238 return getInputBuffer(INCREMENT_SIZE);
1240 InetSocketAddress getAddress() {
1241 return manager.getAddress();
1243 boolean isConnected() {
1244 return manager.isConnected();
1246 void reconnect() throws ProCoreException, InterruptedException {
1247 manager.reconnect();
1249 void connect(int port) throws ProCoreException, InterruptedException {
1250 manager.connect(port);
1252 void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {
1253 manager.connect(address);
1256 manager.disconnect();
1258 Connection call(Method m) throws ProCoreException, InterruptedException, IOException {
1264 private final ProCoreServer proCoreServer;
1265 private final ProCoreClient proCoreClient;
1266 private final ConnectionThread connectionThread;
1267 private boolean closed = false;
1268 Client(ProCoreServer proCoreServer, ProCoreClient proCoreClient, ConnectionThread connectionThread) {
1269 this.proCoreServer = proCoreServer;
1270 this.proCoreClient = proCoreClient;
1271 this.connectionThread = connectionThread;
1273 void open() throws ProCoreException, InterruptedException {
1274 synchronized (proCoreClient) {
1275 if (!connectionThread.isConnected())
1276 connectionThread.connect(proCoreServer.getPort());
1281 synchronized (proCoreClient) {
1282 return connectionThread.isConnected();
1285 boolean isClosed() {
1286 synchronized (proCoreClient) {
1287 return closed || !connectionThread.isConnected();
1291 proCoreClient.deleteClient(this);
1294 String execute(String command) throws ProCoreException {
1295 return proCoreClient.execute(command);
1297 void call(Method m) throws ProCoreException {
1298 proCoreClient.call(m);
1301 class ProCoreClient {
1302 public static final String LOCALHOST = "127.0.0.1";
1304 private final ConnectionManager connectionManager = ConnectionManager.getInstance();
1305 private final ConnectionThread connectionThread = new ConnectionThread(connectionManager);
1309 private final Remote remote = new Remote();
1310 private final Set<Client> clients = new HashSet<Client>();
1311 private final ProCoreServer proCoreServer;
1312 public ProCoreClient(ProCoreServer proCoreServer) throws ProCoreException {
1313 this.proCoreServer = proCoreServer;
1315 synchronized public int getPort() {
1316 return remote.connectionThread.getAddress().getPort();
1318 synchronized public boolean isConnected() {
1319 return remote.connectionThread.isConnected();
1321 synchronized public Client newClient() throws ProCoreException {
1322 return newClient(proCoreServer, remote.connectionThread.getAddress());
1324 synchronized public Client newClient(ProCoreServer proCoreServer, InetSocketAddress address) throws ProCoreException {
1325 if (!remote.connectionThread.isConnected())
1327 remote.connectionThread.connect(address);
1328 } catch (InterruptedException e) {
1329 if (!remote.connectionThread.isConnected()) {
1330 remote.connectionThread.disconnect();
1331 throw new ProCoreException("Failed to create new client.");
1334 else if (!address.equals(remote.connectionThread.getAddress()))
1335 throw new ProCoreException("Illegal newClient call. Already connected to different address.");
1336 Client client = new Client(proCoreServer, this, remote.connectionThread);
1337 clients.add(client);
1340 synchronized int deleteClient(Client client) {
1341 clients.remove(client);
1342 return clients.size();
1344 void closeClient(int aSessionId) throws ProCoreException {
1346 CloseClientSessionFunction f = new CloseClientSessionFunction(aSessionId);
1347 Method m = new Method(f, null, null);
1348 Connection c = remote.connectionThread.call(m);
1350 } catch (InterruptedException e) {
1351 throw new ProCoreException("Thread interrupted during method call.", e);
1352 } catch (IOException e) {
1353 throw new ProCoreException("IOException during method call.", e);
1354 } catch (Throwable e) {
1355 throw new ProCoreException("Throwable during method call.", e);
1358 synchronized public String execute(String command) throws ProCoreException {
1360 ExecuteFunction f = new ExecuteFunction(command);
1361 Method m = new Method(f, null, null);
1362 Connection c = remote.connectionThread.call(m);
1365 } catch (InterruptedException e) {
1366 throw new ProCoreException("Thread interrupted during method call.", e);
1367 } catch (IOException e) {
1368 throw new ProCoreException("IOException during method call.", e);
1369 } catch (Throwable e) {
1370 throw new ProCoreException("Throwable during method call.", e);
1373 private void recall(Method method, IOException e) throws ProCoreException {// :)
1374 if (remote.connectionThread.isConnected())
1375 throw new ProCoreException("IOException during method call.", e);
1377 remote.connectionThread.reconnect();
1378 Connection c = remote.connectionThread.call(method);
1379 method.waitForReply(c);
1380 } catch (Throwable t) {
1381 throw new ProCoreException("IOException during method call. Recall failed.", e);
1384 synchronized public void call(Method method) throws ProCoreException {
1386 Connection c = remote.connectionThread.call(method);
1387 method.waitForReply(c);
1388 } catch (InterruptedException e) {
1389 throw new ProCoreException("Thread interrupted during method call.", e);
1390 } catch (IOException e) {
1392 } catch (ProCoreException e) {
1394 } catch (Throwable e) {
1395 throw new ProCoreException("Throwable during method call.", e);
1398 synchronized public void connect(int port) throws ProCoreException, InterruptedException {
1399 remote.connectionThread.connect(port);
1401 synchronized public void connect(SessionAddress sessionAddress)
1402 throws ProCoreException, InterruptedException {
1403 remote.connectionThread.connect(sessionAddress.getSocketAddress());
1405 synchronized public void disconnect() throws ProCoreException {
1406 remote.connectionThread.disconnect();
1409 abstract class Event {
1411 final Packet packet;
1412 Event(String name, Packet packet) {
1414 this.packet = packet;
1417 return packet.header.token;
1419 void deserialize(org.simantics.db.server.protocol.AbstractEvent event) {
1420 event.deserialize(event.getRequestNumber(), getDataBuffer());
1422 private org.simantics.db.server.protocol.DataBuffer getDataBuffer() {
1423 this.packet.bytes.position(20);
1424 return new org.simantics.db.server.protocol.DataBuffer(this.packet.bytes);
1426 @Override public String toString() {
1427 return name + " token=" + packet.header.token;
1430 class ChangeSetUpdateEvent extends Event {
1431 ChangeSetUpdateEvent(Packet packet) {
1432 super("ChangeSetUpdateEvent", packet);
1435 abstract class EventHandler {
1436 abstract void on(Event event);
1438 class DefaultEventHandler extends EventHandler {
1439 void on(Event event) {
1440 Util.log("Missing event handler. event=" + event);