1 /*******************************************************************************
\r
3 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
4 * in Industry THTH ry.
\r
5 * All rights reserved. This program and the accompanying materials
\r
6 * are made available under the terms of the Eclipse Public License v1.0
\r
7 * which accompanies this distribution, and is available at
\r
8 * http://www.eclipse.org/legal/epl-v10.html
\r
11 * VTT Technical Research Centre of Finland - initial API and implementation
\r
12 *******************************************************************************/
\r
13 package org.simantics.db.server.internal;
\r
15 import java.io.BufferedReader;
\r
16 import java.io.BufferedWriter;
\r
17 import java.io.File;
\r
18 import java.io.FileInputStream;
\r
19 import java.io.FileNotFoundException;
\r
20 import java.io.FileOutputStream;
\r
21 import java.io.IOException;
\r
22 import java.io.InputStreamReader;
\r
23 import java.io.OutputStreamWriter;
\r
24 import java.io.PrintStream;
\r
25 import java.net.InetSocketAddress;
\r
26 import java.nio.ByteBuffer;
\r
27 import java.nio.ByteOrder;
\r
28 import java.nio.IntBuffer;
\r
29 import java.nio.file.Files;
\r
30 import java.nio.file.Path;
\r
31 import java.util.ArrayList;
\r
32 import java.util.HashMap;
\r
33 import java.util.HashSet;
\r
34 import java.util.Map;
\r
35 import java.util.Set;
\r
36 import java.util.concurrent.CountDownLatch;
\r
37 import java.util.concurrent.LinkedBlockingQueue;
\r
39 import org.simantics.db.common.utils.Logger;
\r
40 import org.simantics.db.server.DatabaseLastExitException;
\r
41 import org.simantics.db.server.GuardFileVersionException;
\r
42 import org.simantics.db.server.ProCoreException;
\r
43 import org.simantics.db.server.ServerNotFoundException;
\r
44 import org.simantics.db.server.protocol.CloseClientSessionFunction;
\r
45 import org.simantics.db.server.protocol.ExecuteFunction;
\r
46 import org.simantics.db.server.protocol.MessageNumber;
\r
48 public class ProCoreServer {
\r
49 private static Map<String, ProCoreServer> workingDirs = new HashMap<String, ProCoreServer>();
\r
50 private static Map<String, ProCoreServer> databaseIds = new HashMap<String, ProCoreServer>();
\r
51 private static final String LOCALHOST = ProCoreClient.LOCALHOST;
\r
52 public static final int VERSION_MAJOR = 1;
\r
53 public static final int VERSION_MINOR = 4; // 2015-05-05
\r
54 public static final String BRANCH_DIR = "procore.headClusters.procore";
\r
55 public static final String TAIL_DIR = "procore.tailClusters.procore";
\r
56 public static final String TAIL_FILE = "procore.tail.procore";
\r
57 public static final String CONFIG_FILE = "procore.config.procore";
\r
58 public static final String GUARD_FILE = "procore.guard.procore";
\r
59 public static final String JOURNAL_FILE = "procore.journal.procore";
\r
60 public static final String LOG_FILE = "procore.log.procore";
\r
61 public static final String DCS_FILE = "procore.dumpChangeSets.procore";
\r
62 public static final String RECOVERY_NEEDED_FILE = "recovery.needed";
\r
63 public static final String RECOVERY_IGNORED_FILE = "recovery.ignored";
\r
64 public static final String PROTOCOL_IGNORED_FILE = "protocol.ignored";
\r
65 public static final String PAGE_FILE_PATTERN = "procore.page*.procore";
\r
66 public static final String DATA_PREFIX = "ClusterData.";
\r
67 public static final String DATA_PATTERN = DATA_PREFIX + "*";
\r
68 public static final String INDEX_PREFIX = "ClusterIndex.";
\r
69 public static final String INDEX_PATTERN = INDEX_PREFIX + "*";
\r
70 public static final String VALUE_PREFIX = "ExternalValue.";
\r
71 public static final String VALUE_PATTERN = VALUE_PREFIX + "*";
\r
72 public static final String VALUE_SUFFIX = ".procore";
\r
73 public static final String DELETED_PREFIX = "ClusterDeleted.";
\r
74 public static final String DELETED_PATTERN = DELETED_PREFIX + "*";
\r
75 private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;
\r
76 public static final String QUIT_CMD = "quit";
\r
77 // private static final int SHORT_SLEEP_COUNT = 1;
\r
78 // private static final int LONG_SLEEP_COUNT = 10;
\r
79 // static public void stopProCoreServer(File workingDir) throws ProCoreException {
\r
80 // GuardFile guardFile = new GuardFile(workingDir);
\r
81 // guardFile.stopProCoreServer();
\r
83 private static boolean isRunning(String aPid) throws ProCoreException {
\r
84 boolean isRunning = false;
\r
86 switch (OSType.calculate()) {
\r
88 Process p = Runtime.getRuntime().exec("tasklist.exe /fo csv /nh /fi \"pid eq " + aPid + "\"");
\r
89 BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
\r
90 String line = input.readLine();
\r
91 if (line.matches(".*ProCoreServer.exe.*"))
\r
97 Process p = Runtime.getRuntime().exec("ps -p " + aPid + " -o comm=");
\r
98 BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
\r
99 String line = input.readLine();
\r
100 if (line.matches(".*ProCoreServer.*"))
\r
107 } catch (Exception e) {
\r
108 throw new ProCoreException("Could not get list of running processes.", e);
\r
112 static class ProCoreProcess {
\r
113 private enum State { Created, Initialized, ThreadStarted, ProcessStarted, Prompted, GotPort, ThreadStopped };
\r
114 class Manager implements Runnable {
\r
115 private State state = State.Created;
\r
116 private CountDownLatch serverPrompted;
\r
117 private LinkedBlockingQueue<String> reply;
\r
118 private volatile String command;
\r
119 private volatile Integer exitValue;
\r
120 private volatile boolean carryOn;
\r
121 private volatile int port;
\r
122 private final File dbFolder; // Just for exception i.e. error reporting.
\r
123 Manager(File dbFolder) {
\r
124 this.dbFolder = dbFolder;
\r
127 private void init() {
\r
128 serverPrompted = new CountDownLatch(1);
\r
129 reply = new LinkedBlockingQueue<String>();
\r
133 state = State.Initialized;
\r
138 void tryToStart() throws ProCoreException, InterruptedException {
\r
139 if (thread.isAlive())
\r
141 thread = new Thread(manager, manager.initAndGetName());
\r
143 serverPrompted.await();
\r
144 if (!state.equals(State.Prompted))
\r
145 throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");
\r
146 String reply = giveCommand("print port");
\r
148 port = Integer.parseInt(reply);
\r
149 } catch (NumberFormatException e) {
\r
150 if (null != exitValue && 0 != exitValue)
\r
151 throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");
\r
152 if (reply.equals("End of server thread."))
\r
153 throw new ProCoreException("Server did not start (did not prompt). state=" + state);
\r
155 throw new ProCoreException("Server did not start. Could not parse port. reply=" + reply);
\r
157 state = State.GotPort;
\r
159 String giveCommand(String command) throws ProCoreException, InterruptedException {
\r
160 if (!thread.isAlive())
\r
161 throw new ProCoreException("Server thread not alive.");
\r
162 if (null != this.command)
\r
163 throw new ProCoreException("Old command pending.");
\r
164 this.command = command;
\r
165 return reply.take();
\r
167 private String initAndGetName() {
\r
169 return "ProCoreProcess " + ++instanceCount + " " + builder.directory();
\r
171 boolean isActive() throws ProCoreException {
\r
172 if (null == exitValue && thread.isAlive())
\r
177 boolean tryToStop() throws ProCoreException {
\r
178 if (!thread.isAlive())
\r
181 thread.interrupt();
\r
183 thread.join(10000); // milliseconds
\r
184 } catch (InterruptedException e) {
\r
186 return !thread.isAlive();
\r
189 public void run() {
\r
190 state = State.ThreadStarted;
\r
191 Util.log("ProCoreServer thread started.");
\r
193 Util.trace("ProCoreProcessManager start.");
\r
194 Process process = builder.start();
\r
195 exitValue = exitValue(process);
\r
196 if (null != exitValue)
\r
197 Util.logError("Server process did not start.");
\r
199 state = State.ProcessStarted;
\r
200 runProcessStarted(process); // Process start
\r
202 } catch (IOException e) {
\r
203 Util.logError("Failed during process watch.");
\r
205 reply.add("End of server thread.");
\r
206 Util.trace("ProCoreServerThread stop.");
\r
207 state = State.ThreadStopped;
\r
208 serverPrompted.countDown();
\r
211 private void runProcessStarted(Process process) throws IOException {
\r
212 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
\r
213 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));)
\r
217 if (reader.ready())
\r
218 handleLine(reader.readLine(), reader, writer);
\r
219 else if (null != command) {
\r
220 writer.write(command);
\r
224 String line = getReply(reader);
\r
227 Thread.sleep(100); // milliseconds
\r
228 } catch (InterruptedException e) {
\r
229 Util.trace("ProCoreServerThread interrupted.");
\r
231 exitValue = exitValue(process);
\r
232 if (null == exitValue && !carryOn)
\r
235 writer.write(QUIT_CMD);
\r
238 } catch (Throwable t) {
\r
239 Util.logError("Wait for process death was interrupted.", t);
\r
242 exitValue = process.waitFor();
\r
243 } catch (InterruptedException e) {
\r
244 Util.logError("Wait for process death was interrupted.", e);
\r
248 } while (null == exitValue);
\r
249 } // End of resource try.
\r
251 private String getReply(BufferedReader reader) throws IOException {
\r
252 StringBuilder sb = new StringBuilder();
\r
254 String line = reader.readLine();
\r
255 if (null == line || line.startsWith("Waiting for input?"))
\r
259 return sb.toString();
\r
261 private void handleLine(String line, BufferedReader reader, BufferedWriter writer) throws IOException {
\r
263 Util.showDebug(line);
\r
264 if (line.startsWith("Main end.")) {
\r
265 carryOn = false; // Last line from server.
\r
266 } else if (serverPrompted.getCount() > 0 && line.startsWith("Waiting for input?")) {
\r
267 state = State.Prompted;
\r
268 serverPrompted.countDown(); // Server has been started and is waiting for console input.
\r
270 Util.trace("From server. line=" + line);
\r
272 private Integer exitValue(Process process) {
\r
274 return process.exitValue();
\r
275 } catch (IllegalThreadStateException e) {
\r
276 return null; // Still alive.
\r
280 private static int instanceCount; // Keeps count of thread instances.
\r
281 private final ProcessBuilder builder;
\r
282 private final Manager manager;
\r
283 private Thread thread;
\r
284 ProCoreProcess(File exeFile, File workingDir) {
\r
285 String[] args = new String[0];
\r
286 String[] cmd = new String[1 + args.length];
\r
287 cmd[0] = exeFile.getAbsolutePath().toString();
\r
288 System.arraycopy(args, 0, cmd, 1, args.length);
\r
289 builder = new ProcessBuilder(cmd);
\r
290 builder.redirectErrorStream(true);
\r
291 builder.directory(workingDir);
\r
292 manager = new Manager(workingDir);
\r
293 thread = new Thread(manager, manager.initAndGetName());
\r
295 public void tryToStart() throws ProCoreException, InterruptedException {
\r
296 manager.tryToStart();
\r
298 // public int exitValue() throws IllegalStateException {
\r
299 // Integer i = proCoreServerThread.getExitValue();
\r
301 // throw new IllegalStateException("ProCoreServerThread is still running.");
\r
304 // public boolean ignoreProtocolVersion() {
\r
305 // File wd = builder.directory();
\r
308 // Path ignore = wd.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE);
\r
309 // if (Files.exists(ignore))
\r
313 public boolean isActive() throws ProCoreException {
\r
314 return manager.isActive();
\r
316 public boolean tryToStop() throws ProCoreException {
\r
317 return manager.tryToStop();
\r
319 int getPort() throws ProCoreException {
\r
320 return manager.getPort();
\r
323 private static enum OSType {
\r
324 UNIX, WINDOWS, UNKNOWN;
\r
326 public static OSType calculate() {
\r
327 String osName = System.getProperty("os.name");
\r
328 assert osName != null;
\r
329 osName = osName.toLowerCase();
\r
330 if (osName.startsWith("windows"))
\r
332 if (osName.startsWith("mac os x") || osName.startsWith("linux") || osName.startsWith("sun"))
\r
337 static public boolean ignoreProtocolVersion(File dbFolder) {
\r
338 if (null == dbFolder)
\r
340 if (Files.exists(dbFolder.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE)))
\r
344 static public ProCoreServer getProCoreServer(File serverDir, final File workingDir)
\r
345 throws ProCoreException, ServerNotFoundException {
\r
347 ProCoreServer proCoreServer;
\r
348 synchronized (workingDirs) {
\r
349 proCoreServer = workingDirs.get(workingDir.getCanonicalPath());
\r
350 if (null != proCoreServer)
\r
351 return proCoreServer;
\r
352 if (!serverDir.isDirectory())
\r
353 throw new ProCoreException("Expected server directory as argument");
\r
354 if (!workingDir.isDirectory())
\r
355 throw new ProCoreException("Expected working directory as argument. wd=" + workingDir.getAbsolutePath());
\r
356 String exeSuffix = Activator.getExeSuffix();
\r
357 File executable = new File(serverDir, "ProCoreServer" + exeSuffix);
\r
358 proCoreServer = new ProCoreServer(executable, workingDir);
\r
359 workingDirs.put(workingDir.getCanonicalPath(), proCoreServer);
\r
360 return proCoreServer;
\r
362 } catch (IOException e) {
\r
363 throw new ProCoreException("IOException", e);
\r
367 public static void logVersionInformation() {
\r
368 StringBuilder msg = new StringBuilder(200);
\r
369 String nl = " \n"; // Default log error removes newlines so I add space to make the text easier to read.
\r
370 msg.append("ProCore version information:");
\r
372 msg.append("ProCoreServer version=" + ProCoreServer.VERSION_MAJOR + "." + ProCoreServer.VERSION_MINOR + " protocol=" + MessageNumber.ProtocolVersionMajor + "." + MessageNumber.ProtocolVersionMinor);
\r
374 msg.append(" folder=" + org.simantics.db.server.internal.Activator.getServerFolder());
\r
375 } catch (IOException e) {
\r
376 msg.append(" folder information not available");
\r
379 msg.append("svn id=$Id: ProCoreServer.java r31684 2015-09-10 13:45:00Z $");
\r
380 String str = msg.toString();
\r
381 Logger.defaultLog(str);
\r
383 private static String[] getFileLines(final File aFile, int maxLines) {
\r
384 final String[] STRING_EMPTY_ARRAY = new String[0];
\r
386 if (!aFile.canRead()) {
\r
387 return STRING_EMPTY_ARRAY;
\r
389 try (FileInputStream fis = new FileInputStream(aFile);
\r
390 InputStreamReader isr = new InputStreamReader(fis, "US-ASCII");
\r
391 BufferedReader br = new BufferedReader(isr);){
\r
392 String line = null;
\r
394 ArrayList<String> lines = new ArrayList<String>();
\r
395 while ((line = br.readLine()) != null) {
\r
396 if (++count > maxLines)
\r
397 break; // Caller not interested in more lines.
\r
400 return lines.toArray(STRING_EMPTY_ARRAY);
\r
401 } catch (FileNotFoundException e) {
\r
402 return STRING_EMPTY_ARRAY;
\r
404 } catch (Throwable t) {
\r
406 t.printStackTrace();
\r
407 Logger.defaultLogError(t);
\r
409 return new String[0];
\r
411 private static int parseInt(String s)
\r
412 throws ProCoreException {
\r
414 return Integer.parseInt(s);
\r
415 } catch (NumberFormatException e) {
\r
416 String msg = "Could not convert string to number. string=" + s;
\r
417 Logger.defaultLogError(msg);
\r
418 throw new ProCoreException(msg, e);
\r
421 private String databaseId = null;
\r
422 private final GuardFile guardFile;
\r
423 private final ProCoreClient proCoreClient;
\r
424 ProCoreClient getProCoreClient() {
\r
425 return proCoreClient;
\r
427 private final ProCoreProcess proCoreProcess;
\r
428 private ProCoreServer(File exeFile, File workingDir)
\r
429 throws ProCoreException {
\r
430 if (null == exeFile)
\r
431 throw new ProCoreException("Illegal argument: exeFile is null.");
\r
432 if (null == workingDir)
\r
433 throw new ProCoreException("Illegal argument: workingDir is null.");
\r
434 guardFile = new GuardFile(workingDir);
\r
435 proCoreClient = new ProCoreClient(this);
\r
436 proCoreProcess = new ProCoreProcess(exeFile, workingDir);
\r
437 Util.log("ProCoreServer exe: " + exeFile);
\r
438 Util.log("ProCoreServer folder: " + workingDir);
\r
439 logVersionInformation();
\r
442 public synchronized String execute(String command) throws ProCoreException, InterruptedException {
\r
443 return proCoreClient.execute(command);
\r
445 private void startInternal()
\r
446 throws ProCoreException {
\r
447 if (proCoreClient.isConnected())
\r
449 Exception exception;
\r
451 proCoreProcess.tryToStart();
\r
452 int port = proCoreProcess.getPort();
\r
453 SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);
\r
454 proCoreClient.connect(sa);
\r
456 } catch (InterruptedException e) {
\r
457 Util.logError("ProCoreProcess start was interrupted.", e);
\r
459 } catch (DatabaseLastExitException e) {
\r
460 throw e; // This situation can be analyzed and handled by start handler.
\r
461 } catch (ProCoreException e) {
\r
462 Util.logError("Failed to start ProCoreServer process.", e);
\r
465 boolean isActive = guardFile.isActive(proCoreClient);
\r
467 throw new ProCoreException("Failed to connect to ProCoreServer.", exception);
\r
469 // private int getWaitCount() throws ProCoreException {
\r
470 // if (!proCoreProcess.isActive())
\r
471 // return SHORT_SLEEP_COUNT; // Short wait because server process is not running.
\r
473 // return LONG_SLEEP_COUNT;
\r
475 // private void startInternalReinit(int exitStatus)
\r
476 // throws ProCoreException {
\r
477 // switch (exitStatus) {
\r
478 // default: // What has happened here?
\r
479 // throw new DatabaseStartException(workingDir, getExitMessage(exitStatus));
\r
480 // case -1: // Exit status not available, not sure what has happened here.
\r
481 // case 29: // Server died because it could not read exit status from guard file.
\r
482 // case 30: // Server died because old exit status was not zero.
\r
483 // case 40: // Server died because it received terminating signal.
\r
484 // case 42: // Server died because it could not read end status from guard file.
\r
485 // case 43: // Server died because end value was not zero.
\r
486 // case 44: // Server died because server could not read version from guard file.
\r
487 // case 45: // Server died because server did not support guard file version.
\r
488 // case 47: // Server died because server could not listen to any port.
\r
490 // case 32: // Server died becuase database version did not match.
\r
491 // throw new DatabaseVersionException(workingDir, getExitMessage(exitStatus));
\r
492 // case 33: // Server died because database has been corrupted.
\r
493 // throw new DatabaseCorruptedException(workingDir, getExitMessage(exitStatus));
\r
494 // case 39: // Server died because last exit was not clean.
\r
495 // throw new DatabaseLastExitException(workingDir, getExitMessage(exitStatus));
\r
497 // boolean dead = guardFile.wait4ServerToDie(proCoreClient, getWaitCount());
\r
499 // StringBuilder sb = new StringBuilder(256);
\r
500 // sb.append("ProCoreServer is running according to guard file. Giving up trying to start a new instance. Server exit message:\n");
\r
501 // sb.append(getExitMessage(exitStatus));
\r
502 // throw new DatabaseStartException(workingDir, sb.toString());
\r
504 // boolean deleted = guardFile.delete();
\r
506 // Logger.defaultLogError("ProCoreServer is dead but can not delete guard file. " + getExitMessage(exitStatus));
\r
509 public synchronized boolean isActive()
\r
510 throws ProCoreException {
\r
511 if (!proCoreClient.isConnected())
\r
512 return guardFile.isActive(proCoreClient);
\r
513 proCoreClient.execute("");
\r
516 public synchronized boolean isAlive()
\r
517 throws ProCoreException {
\r
518 if (proCoreClient.isConnected())
\r
520 if (proCoreProcess.isActive())
\r
522 return guardFile.isAlive();
\r
524 Client newClient() throws ProCoreException {
\r
525 if (proCoreClient.isConnected())
\r
526 return proCoreClient.newClient();
\r
527 int port = getPort();
\r
528 InetSocketAddress isa = new InetSocketAddress(LOCALHOST, port);
\r
529 return proCoreClient.newClient(this, isa);
\r
531 public synchronized boolean tryToStop()
\r
532 throws ProCoreException {
\r
533 synchronized (proCoreClient) {
\r
534 ProCoreException ex = null;
\r
535 if (!proCoreClient.isConnected() && !proCoreProcess.isActive()) {
\r
537 proCoreClient.connect(getPort());
\r
538 } catch (InterruptedException e) {
\r
541 if (proCoreClient.isConnected()) {
\r
544 s = proCoreClient.execute(QUIT_CMD);
\r
545 String t = s.replaceAll("\n", "");
\r
546 int n = Integer.parseInt(t);
\r
548 Util.log("Number of connections after quit command is " + n + ".");
\r
549 proCoreClient.closeClient(0);
\r
550 if (!isLocal() && n > 1)
\r
551 throw new ProCoreException("More than one connection for remote. n=" + n);
\r
552 } catch (NumberFormatException e) {
\r
553 Util.logError("Failed to parse number of connections. s=" + s, e);
\r
554 } catch (ProCoreException e) {
\r
557 proCoreClient.disconnect();
\r
561 proCoreProcess.tryToStop();
\r
563 if (proCoreProcess.isActive() && null == ex)
\r
564 ex = new ProCoreException("Failed to stop ProCoreProcess.");
\r
568 return !proCoreClient.isConnected() && !proCoreProcess.isActive();
\r
571 public synchronized boolean isConnected()
\r
572 throws ProCoreException {
\r
573 return proCoreClient.isConnected();
\r
575 public synchronized boolean isLocal()
\r
576 throws ProCoreException {
\r
577 return proCoreClient.isConnected() && proCoreProcess.isActive();
\r
579 public synchronized void disconnect()
\r
580 throws ProCoreException {
\r
581 proCoreClient.disconnect();
\r
583 public synchronized void connect()
\r
584 throws ProCoreException, InterruptedException {
\r
585 int port = getPort();
\r
587 throw new ProCoreException("Port 0 not supported as connection address.");
\r
588 proCoreClient.connect(port);
\r
590 private String getExitMessage(int exitValue) {
\r
591 switch (exitValue) {
\r
593 return "Unexpected exit value = " + exitValue + ".";
\r
595 return "Exception was thrown. This indicates problems with program logic.";
\r
597 return "CallException was thrown. This indicates problems with calls to/from parent server.";
\r
599 return "ProtocolException was thrown. This indicates problems with executable versions between local and parent servers.";
\r
601 return "NoSuchElementException was thrown. This indicates problems with program logic.";
\r
603 return "IllegalArgumentException was thrown. This indicates problems with program logic.";
\r
605 return "IllegalResourceException was thrown. This indicates problems with server data.";
\r
607 return "IllegalClusterException was thrown. This indicates problems with server data.";
\r
609 return "ParseException was thrown. This indicates problems with/during parsing of the configuration file.";
\r
611 return "PortSemaphoreException was thrown. This indicates that some other server is using the same port number.";
\r
613 return "DatabaseSemaphoreException was thrown. This indicates that some other server is using the same database id.";
\r
615 return "SystemException was thrown. This indicates problems with program logic.";
\r
617 return "CppException was thrown. This indicates problems with program logic.";
\r
619 return "UnknownException was thrown. This indicates problems with program logic.";
\r
621 return "ThrowException was thrown. This indicates problems with program logic.";
\r
623 return "AssertException was thrown. This indicates problems with program logic.";
\r
625 return "ParentCheckException was thrown. This indicates problems with the database version of parent.";
\r
627 return "ParentConnectionException was thrown. This indicates problems with parent child relationship.";
\r
629 return "ConnectionException was thrown. This indicates problems with TCP/IP connections.";
\r
631 return "ProxyException was thrown. This indicates problems with TCP/IP connections to proxy.";
\r
633 return "NoSuchFileException was thrown. This indicates problems with a missing file.";
\r
635 return "JournalException was thrown. This indicates problems with journal mechanisim.";
\r
637 return "OutOfSpaceException was thrown. This indicates problems with memory.";
\r
639 return "ExternalValueException was thrown. This indicates problems with large value handling.";
\r
641 return "IllegalTransactionException was thrown. This indicates problems with transaction logic.";
\r
643 return "WriteTransactionException was thrown. This indicates that server failed during write transaction.";
\r
645 return "ConsoleException was thrown. This indicates that server console was closed without quit command.";
\r
647 return "ExitException was thrown. This indicates that server did not exit cleanly.";
\r
649 return "ExitReadPIdException was thrown. This indicates that server did not exit cleanly.";
\r
651 return "ExitReadStatusException was thrown. This indicates that server could not read exit status from guard file.";
\r
653 return "ExitStatusValueException was thrown. This indicates that server did not exit cleanly.";
\r
655 return "UpdateException was thrown. This indicates that server failed during update.";
\r
657 return "DatabaseVersionException was thrown. This indicates that server can not use given database.";
\r
659 return "DatabaseCorruptionException was thrown. This indicates that server has detected database corruption.";
\r
661 return "ExitReadPortException was thrown. This indicates that server did not start or exit cleanly.";
\r
663 return "ExitGuardOpenException was thrown. This indicates that server could not open guard file.";
\r
665 return "ExitGuardOpenWriteException was thrown. This indicates that server could not open guard file for writing.";
\r
667 return "ExitGuardWritePidException was thrown. This indicates that server could not write pid to guard file.";
\r
669 return "ExitGuardWritePortException was thrown. This indicates that server could not write port to guard file.";
\r
671 return "LastCommitNotCleanException was thrown. This indicates that client died during transaction. Recovery action needed.";
\r
673 return "TerminatedException was thrown. This indicates that server died because it received terminating signal.";
\r
675 return "ExitGuardWriteStatusException was thrown. This indicates that server could not write exit status to guard file.";
\r
677 return "EndReadStatusException was thrown. This indicates that server died because it could not read end status from guard file.";
\r
679 return "EndStatusValueException was thrown. This indicates that server died because end value was not zero.";
\r
681 return "ExitGuardReadVersionException was thrown. This indicates that server died because server could not read version from guard file.";
\r
683 return "ExitGuardValueVersionException was thrown. This indicates that server died because server did not support guard file version.";
\r
685 return "ExitGuardWriteVersionException was thrown. This indicates that server died because server could not write version to guard file.";
\r
687 return "ListenPortException was thrown. This indicates that server died because server could not listen to given port or any other port.";
\r
689 return "JournalVersionException was thrown. This indicates that database was written by older version.";
\r
691 return "TailFileException was thrown. This indicates problems with handling of data involved with purge operation.";
\r
694 synchronized void start()
\r
695 throws ProCoreException {
\r
698 } catch (DatabaseLastExitException e) {
\r
699 throw e; // This can be analyzed and handled by start handler.
\r
700 } catch (Throwable t) {
\r
701 ProCoreException pe = null;
\r
702 if (t instanceof ProCoreException)
\r
703 pe = (ProCoreException)t;
\r
705 String msg = "ProCoreServer startup failed."
\r
706 + " Automatic rcovery handling not implemented."
\r
707 + " Database must be fixed manually.";
\r
708 pe = new ProCoreException(msg, t);
\r
710 if (null == pe.getDbFolder())
\r
711 pe.setDbFolder(proCoreProcess.builder.directory());
\r
714 synchronized void stop()
\r
715 throws ProCoreException, InterruptedException{
\r
716 if (null != databaseId)
\r
717 databaseIds.remove(databaseId);
\r
718 boolean dead = tryToStop();
\r
720 throw new ProCoreException("Failed to stop ProCoreServer.");
\r
722 public synchronized String getExitMessage()
\r
723 throws ProCoreException {
\r
724 Integer returnValue = guardFile.getExitValue();
\r
725 if (null == returnValue)
\r
726 throw new ProCoreException("Exit message not available.");
\r
727 return getExitMessage(returnValue);
\r
730 public synchronized ServerAddress getAddress()
\r
731 throws ProCoreException {
\r
732 return new ServerAddress(LOCALHOST, getPort());
\r
735 public synchronized int getPort()
\r
736 throws ProCoreException {
\r
738 if (proCoreClient.isConnected()) {
\r
739 port = proCoreClient.getPort();
\r
743 if (proCoreProcess.isActive()) {
\r
744 port = proCoreProcess.getPort();
\r
748 return guardFile.getPort();
\r
750 public synchronized TailFile.Data getTailData()
\r
751 throws ProCoreException {
\r
752 Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.TAIL_DIR);
\r
753 return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());
\r
755 // public synchronized TailFile.Data getTailDataFromHead()
\r
756 // throws ProCoreException {
\r
757 // Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.BRANCH_DIR);
\r
758 // return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());
\r
760 public static class GuardFile {
\r
761 public static final String GUARD_FILE = ProCoreServer.GUARD_FILE;
\r
762 private static final int VERSION_LINE = 1;
\r
763 private static final int PID_LINE = 2;
\r
764 private static final int PORT_LINE = 3;
\r
765 private static final int EXIT_LINE = 4;
\r
766 private static final int END_LINE = 5;
\r
767 private final File guardFile;
\r
768 GuardFile(File workingDir) {
\r
769 guardFile = new File(workingDir, GUARD_FILE);
\r
771 int getPort() throws ProCoreException {
\r
772 String[] lines = getFileLines(guardFile, END_LINE);
\r
773 if (lines.length < PORT_LINE)
\r
774 throw new ProCoreException("Server port is not available. " + guardFile.getAbsolutePath());
\r
775 return parseInt(lines[PORT_LINE-1]);
\r
777 private void checkVersion(String[] lines) throws ProCoreException {
\r
778 if (lines.length < VERSION_LINE)
\r
779 throw new ProCoreException("Guard file version is not available. " + guardFile.getAbsolutePath());
\r
780 int version = parseInt(lines[VERSION_LINE-1]);
\r
782 throw new GuardFileVersionException(guardFile, "Unsupported guard file version. version=" + version);
\r
785 if (guardFile.exists())
\r
786 return guardFile.delete();
\r
788 return true; // Guard file is deleted i.e. does not exit.
\r
791 if (!guardFile.exists())
\r
794 Files.delete(guardFile.toPath());
\r
795 } catch (Throwable t) {
\r
796 t.printStackTrace();
\r
799 Integer getExitValue() {
\r
800 String lines[] = getFileLines(guardFile, END_LINE);
\r
802 checkVersion(lines);
\r
803 if (lines.length >= EXIT_LINE)
\r
804 return parseInt(lines[EXIT_LINE-1]);
\r
805 else if (lines.length >= PID_LINE && !isRunning(lines[PID_LINE-1]))
\r
806 return -1; // Server has died without writing exit status.
\r
807 return null; // Server might be running.
\r
808 } catch (Exception e) {
\r
809 Logger.defaultLogError("Could not get ProCoreServer exit value.", e);
\r
810 return null; // Server might be running.
\r
813 // void stopProCoreServer() throws ProCoreException {
\r
814 // if (!guardFile.exists())
\r
815 // throw new ServerGuardNotFoundException(guardFile.getParentFile(), "Guard file does not exist.");
\r
816 // String lines[] = new String[0];
\r
817 // for (int i=0; i<LONG_SLEEP_COUNT; ++i, sleep(i, LONG_SLEEP_COUNT)) {
\r
818 // lines = getFileLines(guardFile, END_LINE);
\r
819 // if (lines.length < 1)
\r
820 // throw new ProCoreException("Guard file does not contain lines.");
\r
821 // checkVersion(lines);
\r
822 // if (lines.length >= END_LINE)
\r
823 // return; // Server is dead according to guard file.
\r
824 // if (!isRunning(lines[PID_LINE-1]))
\r
825 // return; // ProCoreServer is not running.
\r
826 // if (lines.length >= PORT_LINE) // ProCoreServer is running according to pid. Either it's slow or pid is for some other file.
\r
829 // if (lines.length < PORT_LINE)
\r
830 // throw new ProCoreException("Guard file does not contain port.");
\r
832 // int port = parseInt(lines[PORT_LINE-1]);
\r
834 // ProCoreClient lProCoreClient = new ProCoreClient();
\r
835 // SessionAddress sa = new SessionAddressImpl(new InetSocketAddress(LOCALHOST, port), true);
\r
836 // lProCoreClient.connect(sa);
\r
838 // String reply = lProCoreClient.execute(QUIT_CMD);
\r
839 // int nConnect = Integer.parseInt(reply);
\r
840 // if (nConnect > 1)
\r
841 // throw new ProCoreException("More than one active connection in server while giving quit command. folder=" + guardFile.getParent());
\r
843 // lProCoreClient.disconnect();
\r
846 // } catch (Throwable t) {
\r
847 // throw new ProCoreException("Failed to stop procore server cleanly.", t);
\r
849 // // Managed to send quit command to server, let's wait and see if it has any effect.
\r
850 // boolean dead = wait4ServerToDie(null, LONG_SLEEP_COUNT);
\r
852 // throw new ProCoreException("Unable to stop server.");
\r
854 boolean isActive(ProCoreClient proCoreClient) throws ProCoreException {
\r
855 if (proCoreClient.isConnected())
\r
856 throw new ProCoreException("Illegal argument. ProCoreClient must not be connected.");
\r
857 String lines[] = getFileLines(guardFile, END_LINE);
\r
858 if (lines.length != PORT_LINE)
\r
860 checkVersion(lines);
\r
862 int port = parseInt(lines[PORT_LINE-1]);
\r
865 // Checking if core is responsive.
\r
866 // This should be supported regardless of version.
\r
867 SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);
\r
868 proCoreClient.connect(sa);
\r
869 proCoreClient.execute(""); // Dubious, not necessarily supported by all protocol versions.
\r
871 } catch (ProCoreException e) {
\r
872 } finally { // We must disconnect or check protocol version.
\r
873 proCoreClient.disconnect();
\r
876 } catch (Throwable t) {
\r
877 // Intentionally empty.
\r
881 boolean isAlive() throws ProCoreException {
\r
882 String lines[] = getFileLines(guardFile, END_LINE);
\r
883 if (lines.length < 1)
\r
884 return false; // No lines, assuming server is dead.
\r
885 checkVersion(lines);
\r
886 if (lines.length < PID_LINE)
\r
887 return false; // No process id, assuming server is dead.
\r
888 else if (lines.length >= END_LINE)
\r
889 return false; // End status available, assuming server is dead.
\r
890 else if (lines.length >= EXIT_LINE)
\r
891 return false; // Exit status available, assuming server is dying.
\r
892 else if (isRunning(lines[PID_LINE-1]))
\r
893 return true; // Process with given id running. Server might be alive.
\r
895 return false;// Process with given id not running.
\r
898 public static class TailFile {
\r
899 public static class Data {
\r
900 Data(long version, long nextChangeSetId, long nextFreeId) {
\r
901 this.version = version;
\r
902 this.nextChangeSetId = nextChangeSetId;
\r
903 this.nextFreeId = nextFreeId;
\r
906 public long nextChangeSetId;
\r
909 public static void createTailFile(File file, long nextChangeSetId, long nextFreeId, String dbId) throws ProCoreException {
\r
911 throw new TailException("Tail file exists and thus cannot be created. file=" + file);
\r
912 FileOutputStream fos;
\r
914 fos = new FileOutputStream(file);
\r
915 } catch (FileNotFoundException e) {
\r
916 throw new TailException("Tail file cannot be created. file=" + file);
\r
918 PrintStream ps = new PrintStream(fos);
\r
920 ps.println(1); // file version
\r
921 ps.println(nextChangeSetId); // next revision
\r
922 ps.println(nextFreeId); // next free cluster
\r
923 ps.println(dbId); // database id
\r
928 public static Data readTailFile(File file) throws ProCoreException {
\r
929 String lines[] = getFileLines(file, 3);
\r
930 if (lines.length < 3)
\r
931 throw new TailReadException("Could not read data from " + file);
\r
932 long ver = Long.parseLong(lines[0]);
\r
933 long cs = Long.parseLong(lines[1]);
\r
934 long id = Long.parseLong(lines[2]);
\r
935 return new Data(ver, cs, id);
\r
939 class SessionAddress {
\r
940 private final InetSocketAddress socketAddress;
\r
941 private final boolean ignoreProtocolVersion;
\r
942 public SessionAddress(InetSocketAddress socketAddress, boolean ignoreProtocolVersion) {
\r
943 assert(null != socketAddress);
\r
944 this.socketAddress = socketAddress;
\r
945 this.ignoreProtocolVersion = ignoreProtocolVersion;
\r
947 public InetSocketAddress getSocketAddress() {
\r
948 return socketAddress;
\r
950 public boolean ignoreProtocolVersion() {
\r
951 return ignoreProtocolVersion;
\r
954 public String toString() {
\r
955 return socketAddress.toString();
\r
958 class PacketQueue {
\r
959 private final LinkedBlockingQueue<Packet> packets = new LinkedBlockingQueue<Packet>();
\r
962 void addFirst(Packet packet) {
\r
963 packets.add(packet);
\r
965 Packet takeFirst() throws InterruptedException {
\r
966 return packets.take();
\r
970 Header(int[] ints) {
\r
971 lastTokenIn = ints[0];
\r
973 messageNumber = ints[2];
\r
974 inflateSize = ints[3];
\r
975 deflateSize = ints[4];
\r
983 public String toString() {
\r
984 StringBuilder sb = new StringBuilder(80);
\r
985 sb.append("lastTokenIn=");
\r
986 sb.append(lastTokenIn);
\r
987 sb.append(" token=");
\r
989 sb.append(" messageNumber=");
\r
990 sb.append(messageNumber);
\r
991 sb.append(" inflateSize=");
\r
992 sb.append(inflateSize);
\r
993 sb.append(" deflateSize=");
\r
994 sb.append(deflateSize);
\r
995 return sb.toString();
\r
1001 Packet(int[] header, ByteBuffer bytes) {
\r
1002 this.header = new Header(header);
\r
1003 this.bytes = bytes;
\r
1006 class ConnectionThread {
\r
1007 private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;
\r
1008 private static final int HEADER_N = 5;
\r
1009 private static final int HEADER_SIZE = HEADER_N * 4;
\r
1010 private static final int DEFLATE = 4;
\r
1011 private static final int INCREMENT_SIZE = 8192;
\r
1012 private static int instanceCount = 0;
\r
1013 class Manager implements Runnable {
\r
1014 private final int instance;
\r
1015 private volatile boolean carryOn = true;
\r
1016 private volatile int lastTokenIn = 0;
\r
1017 private volatile MethodQueue methodQueue = new MethodQueue();
\r
1018 private volatile CountDownLatch running = new CountDownLatch(1);
\r
1019 private final EventHandler defaultEventHandler = new DefaultEventHandler();
\r
1020 // private EventHandler eventHandler = defaultEventHandler;
\r
1022 instance = ++instanceCount;
\r
1025 public String toString() {
\r
1026 return "Connection " + instance;
\r
1028 InetSocketAddress getAddress() {
\r
1029 return connection.getAddress();
\r
1031 boolean isConnected() {
\r
1032 return connection.isConnected();
\r
1034 void reconnect() throws ProCoreException, InterruptedException {
\r
1035 InetSocketAddress a = connection.getAddress();
\r
1036 if (connection.addressNotInitialized() || 0 == a.getPort())
\r
1037 throw new ProCoreException("Address not ok. address=" + connection.getAddress());
\r
1040 void connect(int port) throws ProCoreException, InterruptedException {
\r
1041 InetSocketAddress a = connection.getAddress();
\r
1042 if (connection.addressNotInitialized() || 0 == a.getPort() || (port != 0 && port != a.getPort()))
\r
1043 a = new InetSocketAddress(ProCoreClient.LOCALHOST, port);
\r
1046 void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {
\r
1047 if (connection.isConnected()) {
\r
1048 if (connection.equalsAddress(address))
\r
1051 throw new ProCoreException("Already connected to different address. old=" + connection.getAddress() + " new=" + address);
\r
1053 connection.init4Connection(address);
\r
1054 connectionManager.connect(connection);
\r
1055 // compression = new Compression();
\r
1056 if (thread.isAlive())
\r
1061 void call(Method method) throws ProCoreException, InterruptedException, IOException {
\r
1062 // eventHandler = method.getEventHandler(eventHandler);
\r
1063 connection.sendMethod(methodQueue, method, lastTokenIn);
\r
1065 void disconnect() {
\r
1066 methodQueue.close();
\r
1067 connection.disconnect();
\r
1070 public void run() { // MARK: CT.Manger.run
\r
1072 running.countDown();
\r
1074 Packet packet = null;
\r
1076 packet = inputQueue.takeFirst();
\r
1077 } catch (InterruptedException e) {
\r
1078 Util.trace("Wait for input packet interrupted. this=" + this);
\r
1081 if (null == packet)
\r
1083 handlePacket(packet);
\r
1086 Util.trace("Thread " + thread.getName() + " stopped with null packet.");
\r
1088 Util.trace("Thread " + thread.getName() + " stopped with false carryOn.");
\r
1089 } catch (Throwable t) {
\r
1090 Util.logError("Thread " + thread.getName() + " stopped to exception:", t);
\r
1093 running.countDown();
\r
1096 private void handlePacket(Packet packet) throws ProCoreException {
\r
1097 if (packet.header.token > lastTokenIn) {
\r
1098 lastTokenIn = packet.header.token;
\r
1100 Util.showDebug("Setting lastTokeinIn=" + lastTokenIn);
\r
1102 Method method = methodQueue.remove(packet.header.token);
\r
1103 if (null == method) {
\r
1105 switch (packet.header.messageNumber) {
\r
1107 Util.logError("Missing method. token=" + packet.header.token);
\r
1109 case MessageNumber.ChangeSetUpdateRequest: {
\r
1110 defaultEventHandler.on(new ChangeSetUpdateEvent(packet));
\r
1113 } catch (Throwable t) {
\r
1114 Util.logError("Event handler failed:", t);
\r
1119 // eventHandler = method.getEventHandler(defaultEventHandler);
\r
1120 switch (packet.header.messageNumber) {
\r
1122 String s = "Illegal message number " + packet.header.messageNumber;
\r
1123 method.gotException(s);
\r
1125 case MessageNumber.ChangeSetUpdateRequest:
\r
1126 method.getEventHandler(defaultEventHandler).on(new ChangeSetUpdateEvent(packet));
\r
1127 methodQueue.add(method); // Event caused by method. Actual method not yet handled.
\r
1129 case MessageNumber.NullResponse:
\r
1130 //Util.log("Null(ack) message. method=" + method + " packet=" + packet);
\r
1132 case MessageNumber.ExceptionResponse:
\r
1133 method.gotException(packet);
\r
1134 break; // MARK: handlePacket
\r
1135 case MessageNumber.AAAResponse:
\r
1136 case MessageNumber.AskTransactionResponse:
\r
1137 case MessageNumber.CancelCommitResponse:
\r
1138 case MessageNumber.CloseClientSessionResponse:
\r
1139 case MessageNumber.EchoResponse:
\r
1140 case MessageNumber.EndTransactionResponse:
\r
1141 case MessageNumber.ExecuteResponse:
\r
1142 case MessageNumber.GetChangeSetContextResponse:
\r
1143 case MessageNumber.GetChangeSetDataResponse:
\r
1144 case MessageNumber.GetChangeSetsResponse:
\r
1145 case MessageNumber.GetClusterNewResponse:
\r
1146 case MessageNumber.GetRefreshResponse:
\r
1147 case MessageNumber.GetResourceSegmentResponse:
\r
1148 case MessageNumber.GetServerInfoResponse:
\r
1149 case MessageNumber.OpenClientSessionResponse:
\r
1150 case MessageNumber.RefreshResponse:
\r
1151 case MessageNumber.ReserveIdsResponse:
\r
1152 case MessageNumber.UndoResponse:
\r
1153 case MessageNumber.ReconnectResponse:
\r
1154 case MessageNumber.GetRefresh2Response:
\r
1155 case MessageNumber.GetClusterChangesResponse:
\r
1156 case MessageNumber.GetServerInfo2Response:
\r
1157 case MessageNumber.ListClustersResponse: {
\r
1158 method.gotPacket(packet);
\r
1163 private final ConnectionManager connectionManager;
\r
1164 private final PacketQueue inputQueue = new PacketQueue();
\r
1165 private final Connection connection = new Connection(this);
\r
1166 private final Manager manager = new Manager();
\r
1167 private Thread thread = new Thread(manager, manager.toString());
\r
1168 private int[] header = null;
\r
1169 private int[] INTS = new int[HEADER_N];
\r
1170 private int remaining = HEADER_SIZE;
\r
1171 private int deflateSize;
\r
1172 ConnectionThread(ConnectionManager connectionManager) {
\r
1173 this.connectionManager = connectionManager;
\r
1175 ByteBuffer getInputBuffer() {
\r
1176 return getInputBuffer(INCREMENT_SIZE);
\r
1178 ByteBuffer getInputBuffer(int sizeAtLeast) {
\r
1179 ByteBuffer bb = ByteBuffer.allocate(Math.max(sizeAtLeast, INCREMENT_SIZE));
\r
1180 bb.order(ByteOrder.LITTLE_ENDIAN);
\r
1184 public String toString() {
\r
1185 return "header=" + header + " remaining=" + remaining;
\r
1187 private String toString(ByteBuffer buffer, int nRead) {
\r
1188 return this + " nRead=" + nRead + " buffer=" + buffer;
\r
1190 ByteBuffer handleInput(ByteBuffer buffer, int nRead) // nRead > 0
\r
1191 throws IOException { // MARK: handleInput
\r
1192 int position = buffer.position();
\r
1193 if (null == header) {
\r
1194 if (position < HEADER_SIZE)
\r
1196 else { // Got header.
\r
1197 buffer.position(0);
\r
1198 IntBuffer ib = buffer.asIntBuffer();
\r
1202 deflateSize = header[DEFLATE];
\r
1203 if (deflateSize < 0)
\r
1204 throw new IOException("Illegal deflate size=" + deflateSize);
\r
1205 remaining += deflateSize;
\r
1206 if (remaining > buffer.limit()) { // We need more space. This is not optimal but a simple way to handle this case.
\r
1207 ByteBuffer bb = getInputBuffer(remaining);
\r
1208 buffer.position(0);
\r
1209 buffer.limit(position);
\r
1213 buffer.position(position);
\r
1216 if (position < remaining)
\r
1217 return buffer; // Need more data.
\r
1218 else if (position > remaining) { // Got whole packet and then some. This is not optimal but a simple way to handle this case.
\r
1219 ByteBuffer bb = getInputBuffer(remaining);
\r
1220 buffer.position(0);
\r
1221 buffer.limit(remaining);
\r
1223 inputQueue.addFirst(new Packet(header, bb));
\r
1224 ByteBuffer b2 = getInputBuffer(buffer.limit());
\r
1225 buffer.position(remaining);
\r
1226 buffer.limit(position);
\r
1229 remaining = HEADER_SIZE;
\r
1230 return handleInput(b2, b2.position());
\r
1231 } else if (position != remaining)
\r
1232 throw new IOException("Assertion error. this=" + toString(buffer, nRead));
\r
1233 // Got exactly one packet.
\r
1234 inputQueue.addFirst(new Packet(header, buffer));
\r
1236 remaining = HEADER_SIZE;
\r
1237 return getInputBuffer(INCREMENT_SIZE);
\r
1239 InetSocketAddress getAddress() {
\r
1240 return manager.getAddress();
\r
1242 boolean isConnected() {
\r
1243 return manager.isConnected();
\r
1245 void reconnect() throws ProCoreException, InterruptedException {
\r
1246 manager.reconnect();
\r
1248 void connect(int port) throws ProCoreException, InterruptedException {
\r
1249 manager.connect(port);
\r
1251 void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {
\r
1252 manager.connect(address);
\r
1254 void disconnect() {
\r
1255 manager.disconnect();
\r
1257 Connection call(Method m) throws ProCoreException, InterruptedException, IOException {
\r
1259 return connection;
\r
1263 private final ProCoreServer proCoreServer;
\r
1264 private final ProCoreClient proCoreClient;
\r
1265 private final ConnectionThread connectionThread;
\r
1266 private boolean closed = false;
\r
1267 Client(ProCoreServer proCoreServer, ProCoreClient proCoreClient, ConnectionThread connectionThread) {
\r
1268 this.proCoreServer = proCoreServer;
\r
1269 this.proCoreClient = proCoreClient;
\r
1270 this.connectionThread = connectionThread;
\r
1272 void open() throws ProCoreException, InterruptedException {
\r
1273 synchronized (proCoreClient) {
\r
1274 if (!connectionThread.isConnected())
\r
1275 connectionThread.connect(proCoreServer.getPort());
\r
1279 boolean isOpen() {
\r
1280 synchronized (proCoreClient) {
\r
1281 return connectionThread.isConnected();
\r
1284 boolean isClosed() {
\r
1285 synchronized (proCoreClient) {
\r
1286 return closed || !connectionThread.isConnected();
\r
1290 proCoreClient.deleteClient(this);
\r
1293 String execute(String command) throws ProCoreException {
\r
1294 return proCoreClient.execute(command);
\r
1296 void call(Method m) throws ProCoreException {
\r
1297 proCoreClient.call(m);
\r
1300 class ProCoreClient {
\r
1301 public static final String LOCALHOST = "127.0.0.1";
\r
1303 private final ConnectionManager connectionManager = ConnectionManager.getInstance();
\r
1304 private final ConnectionThread connectionThread = new ConnectionThread(connectionManager);
\r
1308 private final Remote remote = new Remote();
\r
1309 private final Set<Client> clients = new HashSet<Client>();
\r
1310 private final ProCoreServer proCoreServer;
\r
1311 public ProCoreClient(ProCoreServer proCoreServer) throws ProCoreException {
\r
1312 this.proCoreServer = proCoreServer;
\r
1314 synchronized public int getPort() {
\r
1315 return remote.connectionThread.getAddress().getPort();
\r
1317 synchronized public boolean isConnected() {
\r
1318 return remote.connectionThread.isConnected();
\r
1320 synchronized public Client newClient() throws ProCoreException {
\r
1321 return newClient(proCoreServer, remote.connectionThread.getAddress());
\r
1323 synchronized public Client newClient(ProCoreServer proCoreServer, InetSocketAddress address) throws ProCoreException {
\r
1324 if (!remote.connectionThread.isConnected())
\r
1326 remote.connectionThread.connect(address);
\r
1327 } catch (InterruptedException e) {
\r
1328 if (!remote.connectionThread.isConnected()) {
\r
1329 remote.connectionThread.disconnect();
\r
1330 throw new ProCoreException("Failed to create new client.");
\r
1333 else if (!address.equals(remote.connectionThread.getAddress()))
\r
1334 throw new ProCoreException("Illegal newClient call. Already connected to different address.");
\r
1335 Client client = new Client(proCoreServer, this, remote.connectionThread);
\r
1336 clients.add(client);
\r
1339 synchronized int deleteClient(Client client) {
\r
1340 clients.remove(client);
\r
1341 return clients.size();
\r
1343 void closeClient(int aSessionId) throws ProCoreException {
\r
1345 CloseClientSessionFunction f = new CloseClientSessionFunction(aSessionId);
\r
1346 Method m = new Method(f, null, null);
\r
1347 Connection c = remote.connectionThread.call(m);
\r
1348 m.waitForReply(c);
\r
1349 } catch (InterruptedException e) {
\r
1350 throw new ProCoreException("Thread interrupted during method call.", e);
\r
1351 } catch (IOException e) {
\r
1352 throw new ProCoreException("IOException during method call.", e);
\r
1353 } catch (Throwable e) {
\r
1354 throw new ProCoreException("Throwable during method call.", e);
\r
1357 synchronized public String execute(String command) throws ProCoreException {
\r
1359 ExecuteFunction f = new ExecuteFunction(command);
\r
1360 Method m = new Method(f, null, null);
\r
1361 Connection c = remote.connectionThread.call(m);
\r
1362 m.waitForReply(c);
\r
1364 } catch (InterruptedException e) {
\r
1365 throw new ProCoreException("Thread interrupted during method call.", e);
\r
1366 } catch (IOException e) {
\r
1367 throw new ProCoreException("IOException during method call.", e);
\r
1368 } catch (Throwable e) {
\r
1369 throw new ProCoreException("Throwable during method call.", e);
\r
1372 private void recall(Method method, IOException e) throws ProCoreException {// :)
\r
1373 if (remote.connectionThread.isConnected())
\r
1374 throw new ProCoreException("IOException during method call.", e);
\r
1376 remote.connectionThread.reconnect();
\r
1377 Connection c = remote.connectionThread.call(method);
\r
1378 method.waitForReply(c);
\r
1379 } catch (Throwable t) {
\r
1380 throw new ProCoreException("IOException during method call. Recall failed.", e);
\r
1383 synchronized public void call(Method method) throws ProCoreException {
\r
1385 Connection c = remote.connectionThread.call(method);
\r
1386 method.waitForReply(c);
\r
1387 } catch (InterruptedException e) {
\r
1388 throw new ProCoreException("Thread interrupted during method call.", e);
\r
1389 } catch (IOException e) {
\r
1390 recall(method, e);
\r
1391 } catch (ProCoreException e) {
\r
1393 } catch (Throwable e) {
\r
1394 throw new ProCoreException("Throwable during method call.", e);
\r
1397 synchronized public void connect(int port) throws ProCoreException, InterruptedException {
\r
1398 remote.connectionThread.connect(port);
\r
1400 synchronized public void connect(SessionAddress sessionAddress)
\r
1401 throws ProCoreException, InterruptedException {
\r
1402 remote.connectionThread.connect(sessionAddress.getSocketAddress());
\r
1404 synchronized public void disconnect() throws ProCoreException {
\r
1405 remote.connectionThread.disconnect();
\r
1408 abstract class Event {
\r
1409 final String name;
\r
1410 final Packet packet;
\r
1411 Event(String name, Packet packet) {
\r
1413 this.packet = packet;
\r
1416 return packet.header.token;
\r
1418 void deserialize(org.simantics.db.server.protocol.AbstractEvent event) {
\r
1419 event.deserialize(event.getRequestNumber(), getDataBuffer());
\r
1421 private org.simantics.db.server.protocol.DataBuffer getDataBuffer() {
\r
1422 this.packet.bytes.position(20);
\r
1423 return new org.simantics.db.server.protocol.DataBuffer(this.packet.bytes);
\r
1425 @Override public String toString() {
\r
1426 return name + " token=" + packet.header.token;
\r
1429 class ChangeSetUpdateEvent extends Event {
\r
1430 ChangeSetUpdateEvent(Packet packet) {
\r
1431 super("ChangeSetUpdateEvent", packet);
\r
1434 abstract class EventHandler {
\r
1435 abstract void on(Event event);
\r
1437 class DefaultEventHandler extends EventHandler {
\r
1438 void on(Event event) {
\r
1439 Util.log("Missing event handler. event=" + event);
\r