]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ProCoreServer.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.db.server / src / org / simantics / db / server / internal / ProCoreServer.java
1 /*******************************************************************************\r
2 \r
3  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
4  * in Industry THTH ry.\r
5  * All rights reserved. This program and the accompanying materials\r
6  * are made available under the terms of the Eclipse Public License v1.0\r
7  * which accompanies this distribution, and is available at\r
8  * http://www.eclipse.org/legal/epl-v10.html\r
9  *\r
10  * Contributors:\r
11  *     VTT Technical Research Centre of Finland - initial API and implementation\r
12  *******************************************************************************/\r
13 package org.simantics.db.server.internal;\r
14 \r
15 import java.io.BufferedReader;\r
16 import java.io.BufferedWriter;\r
17 import java.io.File;\r
18 import java.io.FileInputStream;\r
19 import java.io.FileNotFoundException;\r
20 import java.io.FileOutputStream;\r
21 import java.io.IOException;\r
22 import java.io.InputStreamReader;\r
23 import java.io.OutputStreamWriter;\r
24 import java.io.PrintStream;\r
25 import java.net.InetSocketAddress;\r
26 import java.nio.ByteBuffer;\r
27 import java.nio.ByteOrder;\r
28 import java.nio.IntBuffer;\r
29 import java.nio.file.Files;\r
30 import java.nio.file.Path;\r
31 import java.util.ArrayList;\r
32 import java.util.HashMap;\r
33 import java.util.HashSet;\r
34 import java.util.Map;\r
35 import java.util.Set;\r
36 import java.util.concurrent.CountDownLatch;\r
37 import java.util.concurrent.LinkedBlockingQueue;\r
38 \r
39 import org.simantics.db.common.utils.Logger;\r
40 import org.simantics.db.server.DatabaseLastExitException;\r
41 import org.simantics.db.server.GuardFileVersionException;\r
42 import org.simantics.db.server.ProCoreException;\r
43 import org.simantics.db.server.ServerNotFoundException;\r
44 import org.simantics.db.server.protocol.CloseClientSessionFunction;\r
45 import org.simantics.db.server.protocol.ExecuteFunction;\r
46 import org.simantics.db.server.protocol.MessageNumber;\r
47 \r
48 public class ProCoreServer {\r
49     private static Map<String, ProCoreServer> workingDirs = new HashMap<String, ProCoreServer>();\r
50     private static Map<String, ProCoreServer> databaseIds = new HashMap<String, ProCoreServer>();\r
51     private static final String LOCALHOST = ProCoreClient.LOCALHOST;\r
52     public static final int VERSION_MAJOR = 1;\r
53     public static final int VERSION_MINOR = 4; // 2015-05-05\r
54     public static final String BRANCH_DIR = "procore.headClusters.procore";\r
55     public static final String TAIL_DIR = "procore.tailClusters.procore";\r
56     public static final String TAIL_FILE = "procore.tail.procore";\r
57     public static final String CONFIG_FILE = "procore.config.procore";\r
58     public static final String GUARD_FILE = "procore.guard.procore";\r
59     public static final String JOURNAL_FILE = "procore.journal.procore";\r
60     public static final String LOG_FILE = "procore.log.procore";\r
61     public static final String DCS_FILE = "procore.dumpChangeSets.procore";\r
62     public static final String RECOVERY_NEEDED_FILE = "recovery.needed";\r
63     public static final String RECOVERY_IGNORED_FILE = "recovery.ignored";\r
64     public static final String PROTOCOL_IGNORED_FILE = "protocol.ignored";\r
65     public static final String PAGE_FILE_PATTERN = "procore.page*.procore";\r
66     public static final String DATA_PREFIX = "ClusterData.";\r
67     public static final String DATA_PATTERN = DATA_PREFIX + "*";\r
68     public static final String INDEX_PREFIX = "ClusterIndex.";\r
69     public static final String INDEX_PATTERN = INDEX_PREFIX + "*";\r
70     public static final String VALUE_PREFIX = "ExternalValue.";\r
71     public static final String VALUE_PATTERN = VALUE_PREFIX + "*";\r
72     public static final String VALUE_SUFFIX = ".procore";\r
73     public static final String DELETED_PREFIX = "ClusterDeleted.";\r
74     public static final String DELETED_PATTERN = DELETED_PREFIX + "*";\r
75     private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;\r
76     public static final String QUIT_CMD = "quit";\r
77 //    private static final int SHORT_SLEEP_COUNT = 1;\r
78 //    private static final int LONG_SLEEP_COUNT = 10;\r
79 //    static public void stopProCoreServer(File workingDir) throws ProCoreException {\r
80 //        GuardFile guardFile = new GuardFile(workingDir);\r
81 //        guardFile.stopProCoreServer();\r
82 //    }\r
83     private static boolean isRunning(String aPid) throws ProCoreException {\r
84         boolean isRunning = false;\r
85         try {\r
86             switch (OSType.calculate()) {\r
87             case WINDOWS: {\r
88                 Process p = Runtime.getRuntime().exec("tasklist.exe /fo csv /nh /fi \"pid eq " + aPid + "\"");\r
89                 BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));\r
90                 String line = input.readLine();\r
91                 if (line.matches(".*ProCoreServer.exe.*"))\r
92                     isRunning = true;\r
93                 input.close();\r
94                 break;\r
95             }\r
96             case UNIX: {\r
97                 Process p = Runtime.getRuntime().exec("ps -p " + aPid + " -o comm=");\r
98                 BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));\r
99                 String line = input.readLine();\r
100                 if (line.matches(".*ProCoreServer.*"))\r
101                     isRunning = true;\r
102                 input.close();\r
103                 break;\r
104             }\r
105             default:\r
106             }\r
107         } catch (Exception e) {\r
108             throw new ProCoreException("Could not get list of running processes.", e);\r
109         }\r
110         return isRunning;\r
111     }\r
112     static class ProCoreProcess {\r
113         private enum State { Created, Initialized, ThreadStarted, ProcessStarted, Prompted, GotPort, ThreadStopped };\r
114         class Manager implements Runnable {\r
115             private State state = State.Created;\r
116             private CountDownLatch serverPrompted;\r
117             private LinkedBlockingQueue<String> reply;\r
118             private volatile String command;\r
119             private volatile Integer exitValue;\r
120             private volatile boolean carryOn;\r
121             private volatile int port;\r
122             private final File dbFolder; // Just for exception i.e. error reporting.\r
123             Manager(File dbFolder) {\r
124                 this.dbFolder = dbFolder;\r
125                 init();\r
126             }\r
127             private void init() {\r
128                 serverPrompted = new CountDownLatch(1);\r
129                 reply = new LinkedBlockingQueue<String>();\r
130                 exitValue = null;\r
131                 carryOn = true;\r
132                 port = 0;\r
133                 state = State.Initialized;\r
134             }\r
135             int getPort() {\r
136                 return port;\r
137             }\r
138             void tryToStart() throws ProCoreException, InterruptedException {\r
139                 if (thread.isAlive())\r
140                     return;\r
141                 thread = new Thread(manager, manager.initAndGetName());\r
142                 thread.start();\r
143                 serverPrompted.await();\r
144                 if (!state.equals(State.Prompted))\r
145                     throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");\r
146                 String reply = giveCommand("print port");\r
147                 try {\r
148                     port = Integer.parseInt(reply);\r
149                 } catch (NumberFormatException e) {\r
150                     if (null != exitValue && 0 != exitValue)\r
151                         throw new DatabaseLastExitException(dbFolder, "Server did not prompt.");\r
152                     if (reply.equals("End of server thread."))\r
153                         throw new ProCoreException("Server did not start (did not prompt). state=" + state);\r
154                     else\r
155                         throw new ProCoreException("Server did not start. Could not parse port. reply=" + reply);\r
156                 }\r
157                 state = State.GotPort;\r
158             }\r
159             String giveCommand(String command) throws ProCoreException, InterruptedException {\r
160                 if (!thread.isAlive())\r
161                     throw new ProCoreException("Server thread not alive.");\r
162                 if (null != this.command)\r
163                     throw new ProCoreException("Old command pending.");\r
164                 this.command = command;\r
165                 return reply.take();\r
166             }\r
167             private String initAndGetName() {\r
168                 init();\r
169                 return "ProCoreProcess " + ++instanceCount + " " + builder.directory();\r
170             }\r
171             boolean isActive() throws ProCoreException {\r
172                 if (null == exitValue && thread.isAlive())\r
173                     return true;\r
174                 else\r
175                     return false;\r
176             }\r
177             boolean tryToStop() throws ProCoreException {\r
178                 if (!thread.isAlive())\r
179                     return true;\r
180                 carryOn = false;\r
181                 thread.interrupt();\r
182                 try {\r
183                     thread.join(10000); // milliseconds\r
184                 } catch (InterruptedException e) {\r
185                 }\r
186                 return !thread.isAlive();\r
187             }\r
188             @Override\r
189             public void run() {\r
190                 state = State.ThreadStarted;\r
191                 Util.log("ProCoreServer thread started.");\r
192                 try {\r
193                     Util.trace("ProCoreProcessManager start.");\r
194                     Process process = builder.start();\r
195                     exitValue = exitValue(process);\r
196                     if (null != exitValue)\r
197                         Util.logError("Server process did not start.");\r
198                     else {\r
199                         state = State.ProcessStarted;\r
200                         runProcessStarted(process); // Process start\r
201                     }\r
202                 } catch (IOException e) {\r
203                     Util.logError("Failed during process watch.");\r
204                 } finally {\r
205                     reply.add("End of server thread.");\r
206                     Util.trace("ProCoreServerThread stop.");\r
207                     state = State.ThreadStopped;\r
208                     serverPrompted.countDown();\r
209                 }\r
210             }\r
211             private void runProcessStarted(Process process) throws IOException {\r
212                 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));\r
213                      final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));)\r
214                 {\r
215                     do {\r
216                         try {\r
217                             if (reader.ready())\r
218                                 handleLine(reader.readLine(), reader, writer);\r
219                             else if (null != command) {\r
220                                 writer.write(command);\r
221                                 command = null;\r
222                                 writer.newLine();\r
223                                 writer.flush();\r
224                                 String line = getReply(reader);\r
225                                 reply.add(line);\r
226                             } else\r
227                                 Thread.sleep(100); // milliseconds\r
228                         } catch (InterruptedException e) {\r
229                             Util.trace("ProCoreServerThread interrupted.");\r
230                         }\r
231                         exitValue = exitValue(process);\r
232                         if (null == exitValue && !carryOn)\r
233                             try {\r
234                                 try {\r
235                                     writer.write(QUIT_CMD);\r
236                                     writer.newLine();\r
237                                     writer.flush();\r
238                                 } catch (Throwable t) {\r
239                                     Util.logError("Wait for process death was interrupted.", t);\r
240                                 } finally {\r
241                                 }\r
242                                 exitValue = process.waitFor();\r
243                             } catch (InterruptedException e) {\r
244                                 Util.logError("Wait for process death was interrupted.", e);\r
245                                 carryOn = true;\r
246                                 continue;\r
247                             }\r
248                     } while (null == exitValue);\r
249                 } // End of resource try.\r
250             }\r
251             private String getReply(BufferedReader reader) throws IOException {\r
252                 StringBuilder sb = new StringBuilder();\r
253                 while (true) {\r
254                     String line = reader.readLine();\r
255                     if (null == line || line.startsWith("Waiting for input?"))\r
256                         break;\r
257                     sb.append(line);\r
258                 }\r
259                 return sb.toString();\r
260             }\r
261             private void handleLine(String line, BufferedReader reader, BufferedWriter writer) throws IOException {\r
262                 if (DEBUG)\r
263                     Util.showDebug(line);\r
264                 if (line.startsWith("Main end.")) {\r
265                     carryOn = false; // Last line from server.\r
266                 } else if (serverPrompted.getCount() > 0 && line.startsWith("Waiting for input?")) {\r
267                     state = State.Prompted;\r
268                     serverPrompted.countDown(); // Server has been started and is waiting for console input.\r
269                 } else\r
270                     Util.trace("From server. line=" + line);\r
271             }\r
272             private Integer exitValue(Process process) {\r
273                 try {\r
274                     return process.exitValue();\r
275                 } catch (IllegalThreadStateException e) {\r
276                     return null; // Still alive.\r
277                 }\r
278             }\r
279         }\r
280         private static int instanceCount; // Keeps count of thread instances.\r
281         private final ProcessBuilder builder;\r
282         private final Manager manager;\r
283         private Thread thread;\r
284         ProCoreProcess(File exeFile, File workingDir) {\r
285             String[] args = new String[0];\r
286             String[] cmd = new String[1 + args.length];\r
287             cmd[0] = exeFile.getAbsolutePath().toString();\r
288             System.arraycopy(args, 0, cmd, 1, args.length);\r
289             builder = new ProcessBuilder(cmd);\r
290             builder.redirectErrorStream(true);\r
291             builder.directory(workingDir);\r
292             manager = new Manager(workingDir);\r
293             thread = new Thread(manager, manager.initAndGetName());\r
294         }\r
295         public void tryToStart() throws ProCoreException, InterruptedException {\r
296             manager.tryToStart();\r
297         }\r
298 //        public int exitValue() throws IllegalStateException {\r
299 //            Integer i = proCoreServerThread.getExitValue();\r
300 //            if (null == i)\r
301 //                throw new IllegalStateException("ProCoreServerThread is still running.");\r
302 //            return i;\r
303 //        }\r
304 //        public boolean ignoreProtocolVersion() {\r
305 //            File wd = builder.directory();\r
306 //            if (null == wd)\r
307 //                return false;\r
308 //            Path ignore = wd.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE);\r
309 //            if (Files.exists(ignore))\r
310 //                return true;\r
311 //            return false;\r
312 //        }\r
313         public boolean isActive() throws ProCoreException {\r
314             return manager.isActive();\r
315         }\r
316         public boolean tryToStop() throws ProCoreException {\r
317             return manager.tryToStop();\r
318         }\r
319         int getPort() throws ProCoreException {\r
320             return manager.getPort();\r
321         }\r
322     }\r
323     private static enum OSType {\r
324         UNIX, WINDOWS, UNKNOWN;\r
325 \r
326         public static OSType calculate() {\r
327             String osName = System.getProperty("os.name");\r
328             assert osName != null;\r
329             osName = osName.toLowerCase();\r
330             if (osName.startsWith("windows"))\r
331                 return WINDOWS;\r
332             if (osName.startsWith("mac os x") || osName.startsWith("linux") || osName.startsWith("sun"))\r
333                 return UNIX;\r
334             return UNKNOWN;\r
335         }\r
336     }\r
337     static public boolean ignoreProtocolVersion(File dbFolder) {\r
338         if (null == dbFolder)\r
339             return false;\r
340         if (Files.exists(dbFolder.toPath().resolve(ProCoreServer.PROTOCOL_IGNORED_FILE)))\r
341             return true;\r
342         return false;\r
343     }\r
344     static public ProCoreServer getProCoreServer(File serverDir, final File workingDir)\r
345     throws ProCoreException, ServerNotFoundException {\r
346         try {\r
347             ProCoreServer proCoreServer;\r
348             synchronized (workingDirs) {\r
349                 proCoreServer = workingDirs.get(workingDir.getCanonicalPath());\r
350                 if (null != proCoreServer)\r
351                     return proCoreServer;\r
352                 if (!serverDir.isDirectory())\r
353                     throw new ProCoreException("Expected server directory as argument");\r
354                 if (!workingDir.isDirectory())\r
355                     throw new ProCoreException("Expected working directory as argument. wd=" + workingDir.getAbsolutePath());\r
356                 String exeSuffix = Activator.getExeSuffix();\r
357                 File executable = new File(serverDir, "ProCoreServer" + exeSuffix);\r
358                 proCoreServer = new ProCoreServer(executable, workingDir);\r
359                 workingDirs.put(workingDir.getCanonicalPath(), proCoreServer);\r
360                 return proCoreServer;\r
361             }\r
362         } catch (IOException e) {\r
363             throw new ProCoreException("IOException", e);\r
364         }\r
365     }\r
366 \r
367     public static void logVersionInformation() {\r
368         StringBuilder msg = new StringBuilder(200);\r
369         String nl = " \n"; // Default log error removes newlines so I add space to make the text easier to read.\r
370         msg.append("ProCore version information:");\r
371         msg.append(nl);\r
372         msg.append("ProCoreServer version=" + ProCoreServer.VERSION_MAJOR + "." + ProCoreServer.VERSION_MINOR + " protocol=" + MessageNumber.ProtocolVersionMajor + "." + MessageNumber.ProtocolVersionMinor);\r
373         try {\r
374             msg.append(" folder=" + org.simantics.db.server.internal.Activator.getServerFolder());\r
375         } catch (IOException e) {\r
376             msg.append(" folder information not available");\r
377         }\r
378         msg.append(nl);\r
379         msg.append("svn id=$Id: ProCoreServer.java r31684 2015-09-10 13:45:00Z $");\r
380         String str = msg.toString();\r
381         Logger.defaultLog(str);\r
382     }\r
383     private static String[] getFileLines(final File aFile, int maxLines) {\r
384         final String[] STRING_EMPTY_ARRAY = new String[0];\r
385         try {\r
386             if (!aFile.canRead()) {\r
387                 return STRING_EMPTY_ARRAY;\r
388             }\r
389             try (FileInputStream  fis = new FileInputStream(aFile);\r
390                 InputStreamReader isr = new InputStreamReader(fis, "US-ASCII");\r
391                 BufferedReader br = new BufferedReader(isr);){\r
392                 String line = null;\r
393                 int count = 0;\r
394                 ArrayList<String> lines = new ArrayList<String>();\r
395                 while ((line = br.readLine()) != null) {\r
396                     if (++count > maxLines)\r
397                         break; // Caller not interested in more lines.\r
398                     lines.add(line);\r
399                 }\r
400                 return lines.toArray(STRING_EMPTY_ARRAY);\r
401             } catch (FileNotFoundException e) {\r
402                 return STRING_EMPTY_ARRAY;\r
403             }\r
404         } catch (Throwable t) {\r
405             if (DEBUG)\r
406                 t.printStackTrace();\r
407             Logger.defaultLogError(t);\r
408         }\r
409         return new String[0];\r
410     }\r
411     private static int parseInt(String s)\r
412     throws ProCoreException {\r
413         try  {\r
414             return Integer.parseInt(s);\r
415         } catch (NumberFormatException e) {\r
416             String msg = "Could not convert string to number. string=" + s;\r
417             Logger.defaultLogError(msg);\r
418             throw new ProCoreException(msg, e);\r
419         }\r
420     }\r
421     private String databaseId = null;\r
422     private final GuardFile guardFile;\r
423     private final ProCoreClient proCoreClient;\r
424     ProCoreClient getProCoreClient() {\r
425         return proCoreClient;\r
426     }\r
427     private final ProCoreProcess proCoreProcess;\r
428     private ProCoreServer(File exeFile, File workingDir)\r
429     throws ProCoreException {\r
430         if (null == exeFile)\r
431             throw new ProCoreException("Illegal argument: exeFile is null.");\r
432         if (null == workingDir)\r
433             throw new ProCoreException("Illegal argument: workingDir is null.");\r
434         guardFile = new GuardFile(workingDir);\r
435         proCoreClient = new ProCoreClient(this);\r
436         proCoreProcess = new ProCoreProcess(exeFile, workingDir);\r
437         Util.log("ProCoreServer exe: " + exeFile);\r
438         Util.log("ProCoreServer folder: " + workingDir);\r
439         logVersionInformation();\r
440     }\r
441 //    //@Override\r
442     public synchronized String execute(String command) throws ProCoreException, InterruptedException {\r
443         return proCoreClient.execute(command);\r
444     }\r
445     private void startInternal()\r
446     throws ProCoreException {\r
447         if (proCoreClient.isConnected())\r
448             return;\r
449         Exception exception;\r
450         try {\r
451             proCoreProcess.tryToStart();\r
452             int port = proCoreProcess.getPort();\r
453             SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);\r
454             proCoreClient.connect(sa);\r
455             return;\r
456         } catch (InterruptedException e) {\r
457             Util.logError("ProCoreProcess start was interrupted.", e);\r
458             exception = e;\r
459         } catch (DatabaseLastExitException e) {\r
460             throw e; // This situation can be analyzed and handled by start handler.\r
461         } catch (ProCoreException e) {\r
462             Util.logError("Failed to start ProCoreServer process.", e);\r
463             exception = e;\r
464         }\r
465         boolean isActive = guardFile.isActive(proCoreClient);\r
466         if (!isActive)\r
467             throw new ProCoreException("Failed to connect to ProCoreServer.", exception);\r
468     }\r
469 //    private int getWaitCount() throws ProCoreException {\r
470 //        if (!proCoreProcess.isActive())\r
471 //            return SHORT_SLEEP_COUNT; // Short wait because server process is not running.\r
472 //        else\r
473 //            return LONG_SLEEP_COUNT;\r
474 //    }\r
475 //    private void startInternalReinit(int exitStatus)\r
476 //    throws ProCoreException {\r
477 //        switch (exitStatus) {\r
478 //            default: // What has happened here?\r
479 //                throw new DatabaseStartException(workingDir, getExitMessage(exitStatus));\r
480 //            case -1: // Exit status not available, not sure what has happened here.\r
481 //            case 29: // Server died because it could not read exit status from guard file.\r
482 //            case 30: // Server died because old exit status was not zero.\r
483 //            case 40: // Server died because it received terminating signal.\r
484 //            case 42: // Server died because it could not read end status from guard file.\r
485 //            case 43: // Server died because end value was not zero.\r
486 //            case 44: // Server died because server could not read version from guard file.\r
487 //            case 45: // Server died because server did not support guard file version.\r
488 //            case 47: // Server died because server could not listen to any port.\r
489 //                break;\r
490 //            case 32: // Server died becuase database version did not match.\r
491 //                throw new DatabaseVersionException(workingDir, getExitMessage(exitStatus));\r
492 //            case 33: // Server died because database has been corrupted.\r
493 //                throw new DatabaseCorruptedException(workingDir, getExitMessage(exitStatus));\r
494 //            case 39: // Server died because last exit was not clean.\r
495 //                throw new DatabaseLastExitException(workingDir, getExitMessage(exitStatus));\r
496 //        }\r
497 //        boolean dead = guardFile.wait4ServerToDie(proCoreClient, getWaitCount());\r
498 //        if (!dead) {\r
499 //            StringBuilder sb = new StringBuilder(256);\r
500 //            sb.append("ProCoreServer is running according to guard file. Giving up trying to start a new instance. Server exit message:\n");\r
501 //            sb.append(getExitMessage(exitStatus));\r
502 //            throw new DatabaseStartException(workingDir, sb.toString());\r
503 //        }\r
504 //        boolean deleted = guardFile.delete();\r
505 //        if (!deleted)\r
506 //            Logger.defaultLogError("ProCoreServer is dead but can not delete guard file. " + getExitMessage(exitStatus));\r
507 //    }\r
508     //@Override\r
509     public synchronized boolean isActive()\r
510     throws ProCoreException {\r
511         if (!proCoreClient.isConnected())\r
512             return guardFile.isActive(proCoreClient);\r
513         proCoreClient.execute("");\r
514         return true;\r
515     }\r
516     public synchronized boolean isAlive()\r
517     throws ProCoreException {\r
518         if (proCoreClient.isConnected())\r
519             return true;\r
520         if (proCoreProcess.isActive())\r
521             return true;\r
522         return guardFile.isAlive();\r
523     }\r
524     Client newClient() throws ProCoreException {\r
525         if (proCoreClient.isConnected())\r
526             return proCoreClient.newClient();\r
527         int port = getPort();\r
528         InetSocketAddress isa = new InetSocketAddress(LOCALHOST, port);\r
529         return proCoreClient.newClient(this, isa);\r
530     }\r
531     public synchronized boolean tryToStop()\r
532     throws ProCoreException {\r
533         synchronized (proCoreClient) {\r
534             ProCoreException ex = null;\r
535             if (!proCoreClient.isConnected() && !proCoreProcess.isActive()) {\r
536                 try {\r
537                     proCoreClient.connect(getPort());\r
538                 } catch (InterruptedException e) {\r
539                 }\r
540             }\r
541             if (proCoreClient.isConnected()) {\r
542                 String s = "";\r
543                 try {\r
544                     s = proCoreClient.execute(QUIT_CMD);\r
545                     String t = s.replaceAll("\n", "");\r
546                     int n = Integer.parseInt(t);\r
547                     if (n != 1)\r
548                         Util.log("Number of connections after quit command is " + n + ".");\r
549                     proCoreClient.closeClient(0);\r
550                     if (!isLocal() && n > 1)\r
551                         throw new ProCoreException("More than one connection for remote. n=" + n);\r
552                 } catch (NumberFormatException e) {\r
553                     Util.logError("Failed to parse number of connections. s=" + s, e);\r
554                 } catch (ProCoreException e) {\r
555                     ex = e;\r
556                 } finally {\r
557                     proCoreClient.disconnect();\r
558                 }\r
559             }\r
560             try {\r
561                 proCoreProcess.tryToStop();\r
562             } finally {\r
563                 if (proCoreProcess.isActive() && null == ex)\r
564                     ex = new ProCoreException("Failed to stop ProCoreProcess.");\r
565             }\r
566             if (null != ex)\r
567                 throw ex;\r
568             return !proCoreClient.isConnected() && !proCoreProcess.isActive();\r
569         }\r
570     }\r
571     public synchronized boolean isConnected()\r
572     throws ProCoreException {\r
573         return proCoreClient.isConnected();\r
574     }\r
575     public synchronized boolean isLocal()\r
576     throws ProCoreException {\r
577         return proCoreClient.isConnected() && proCoreProcess.isActive();\r
578     }\r
579     public synchronized void disconnect()\r
580     throws ProCoreException {\r
581         proCoreClient.disconnect();\r
582     }\r
583     public synchronized void connect()\r
584     throws ProCoreException, InterruptedException {\r
585         int port = getPort();\r
586         if (0 == port)\r
587             throw new ProCoreException("Port 0 not supported as connection address.");\r
588         proCoreClient.connect(port);\r
589     }\r
590     private String getExitMessage(int exitValue) {\r
591         switch (exitValue) {\r
592         default:\r
593             return "Unexpected exit value = " + exitValue + ".";\r
594         case 1:\r
595             return "Exception was thrown. This indicates problems with program logic.";\r
596         case 2:\r
597             return "CallException was thrown. This indicates problems with calls to/from parent server.";\r
598         case 3:\r
599             return "ProtocolException was thrown. This indicates problems with executable versions between local and parent servers.";\r
600         case 4:\r
601             return "NoSuchElementException was thrown. This indicates problems with program logic.";\r
602         case 5:\r
603             return "IllegalArgumentException was thrown. This indicates problems with program logic.";\r
604         case 6:\r
605             return "IllegalResourceException was thrown. This indicates problems with server data.";\r
606         case 7:\r
607             return "IllegalClusterException was thrown. This indicates problems with server data.";\r
608         case 8:\r
609             return "ParseException was thrown. This indicates problems with/during parsing of the configuration file.";\r
610         case 9:\r
611             return "PortSemaphoreException was thrown. This indicates that some other server is using the same port number.";\r
612         case 10:\r
613             return "DatabaseSemaphoreException was thrown. This indicates that some other server is using the same database id.";\r
614         case 11:\r
615             return "SystemException was thrown. This indicates problems with program logic.";\r
616         case 12:\r
617             return "CppException was thrown. This indicates problems with program logic.";\r
618         case 13:\r
619             return "UnknownException was thrown. This indicates problems with program logic.";\r
620         case 14:\r
621             return "ThrowException was thrown. This indicates problems with program logic.";\r
622         case 15:\r
623             return "AssertException was thrown. This indicates problems with program logic.";\r
624         case 16:\r
625             return "ParentCheckException was thrown. This indicates problems with the database version of parent.";\r
626         case 17:\r
627             return "ParentConnectionException was thrown. This indicates problems with parent child relationship.";\r
628         case 18:\r
629             return "ConnectionException was thrown. This indicates problems with TCP/IP connections.";\r
630         case 19:\r
631             return "ProxyException was thrown. This indicates problems with TCP/IP connections to proxy.";\r
632         case 20:\r
633             return "NoSuchFileException was thrown. This indicates problems with a missing file.";\r
634         case 21:\r
635             return "JournalException was thrown. This indicates problems with journal mechanisim.";\r
636         case 22:\r
637             return "OutOfSpaceException was thrown. This indicates problems with memory.";\r
638         case 23:\r
639             return "ExternalValueException was thrown. This indicates problems with large value handling.";\r
640         case 24:\r
641             return "IllegalTransactionException was thrown. This indicates problems with transaction logic.";\r
642         case 25:\r
643             return "WriteTransactionException was thrown. This indicates that server failed during write transaction.";\r
644         case 26:\r
645             return "ConsoleException was thrown. This indicates that server console was closed without quit command.";\r
646         case 27:\r
647             return "ExitException was thrown. This indicates that server did not exit cleanly.";\r
648         case 28:\r
649             return "ExitReadPIdException was thrown. This indicates that server did not exit cleanly.";\r
650         case 29:\r
651             return "ExitReadStatusException was thrown. This indicates that server could not read exit status from guard file.";\r
652         case 30:\r
653             return "ExitStatusValueException was thrown. This indicates that server did not exit cleanly.";\r
654         case 31:\r
655             return "UpdateException was thrown. This indicates that server failed during update.";\r
656         case 32:\r
657             return "DatabaseVersionException was thrown. This indicates that server can not use given database.";\r
658         case 33:\r
659             return "DatabaseCorruptionException was thrown. This indicates that server has detected database corruption.";\r
660         case 34:\r
661             return "ExitReadPortException was thrown. This indicates that server did not start or exit cleanly.";\r
662         case 35:\r
663             return "ExitGuardOpenException was thrown. This indicates that server could not open guard file.";\r
664         case 36:\r
665             return "ExitGuardOpenWriteException was thrown. This indicates that server could not open guard file for writing.";\r
666         case 37:\r
667             return "ExitGuardWritePidException was thrown. This indicates that server could not write pid to guard file.";\r
668         case 38:\r
669             return "ExitGuardWritePortException was thrown. This indicates that server could not write port to guard file.";\r
670         case 39:\r
671             return "LastCommitNotCleanException was thrown. This indicates that client died during transaction. Recovery action needed.";\r
672         case 40:\r
673             return "TerminatedException was thrown. This indicates that server died because it received terminating signal.";\r
674         case 41:\r
675             return "ExitGuardWriteStatusException was thrown. This indicates that server could not write exit status to guard file.";\r
676         case 42:\r
677             return "EndReadStatusException was thrown. This indicates that server died because it could not read end status from guard file.";\r
678         case 43:\r
679             return "EndStatusValueException was thrown. This indicates that server died because end value was not zero.";\r
680         case 44:\r
681             return "ExitGuardReadVersionException was thrown. This indicates that server died because server could not read version from guard file.";\r
682         case 45:\r
683             return "ExitGuardValueVersionException was thrown. This indicates that server died because server did not support guard file version.";\r
684         case 46:\r
685             return "ExitGuardWriteVersionException was thrown. This indicates that server died because server could not write version to guard file.";\r
686         case 47:\r
687             return "ListenPortException was thrown. This indicates that server died because server could not listen to given port or any other port.";\r
688         case 48:\r
689             return "JournalVersionException was thrown. This indicates that database was written by older version.";\r
690         case 49:\r
691             return "TailFileException was thrown. This indicates problems with handling of data involved with purge operation.";\r
692         }\r
693     }\r
694     synchronized void start()\r
695     throws ProCoreException {\r
696         try {\r
697             startInternal();\r
698         } catch (DatabaseLastExitException e) {\r
699             throw e; // This can be analyzed and handled by start handler.\r
700         } catch (Throwable t) {\r
701             ProCoreException pe = null;\r
702             if (t instanceof ProCoreException)\r
703                 pe = (ProCoreException)t;\r
704             else {\r
705                 String msg = "ProCoreServer startup failed."\r
706                         + " Automatic rcovery handling not implemented."\r
707                         + " Database must be fixed manually.";\r
708                 pe = new ProCoreException(msg, t);\r
709             }\r
710             if (null == pe.getDbFolder())\r
711                 pe.setDbFolder(proCoreProcess.builder.directory());\r
712         }\r
713     }\r
714     synchronized void stop()\r
715     throws ProCoreException, InterruptedException{\r
716         if (null != databaseId)\r
717             databaseIds.remove(databaseId);\r
718         boolean dead = tryToStop();\r
719         if (!dead)\r
720             throw new ProCoreException("Failed to stop ProCoreServer.");\r
721     }\r
722     public synchronized String getExitMessage()\r
723     throws ProCoreException {\r
724         Integer returnValue = guardFile.getExitValue();\r
725         if (null == returnValue)\r
726             throw new ProCoreException("Exit message not available.");\r
727         return getExitMessage(returnValue);\r
728     }\r
729     //@Override\r
730     public synchronized ServerAddress getAddress()\r
731     throws ProCoreException {\r
732         return new ServerAddress(LOCALHOST, getPort());\r
733     }\r
734     //@Override\r
735     public synchronized int getPort()\r
736     throws ProCoreException {\r
737         int port = 0;\r
738         if (proCoreClient.isConnected()) {\r
739             port = proCoreClient.getPort();\r
740             if (0 != port)\r
741                 return port;\r
742         }\r
743         if (proCoreProcess.isActive()) {\r
744             port = proCoreProcess.getPort();\r
745             if (0 != port)\r
746                 return port;\r
747         }\r
748         return guardFile.getPort();\r
749     }\r
750     public synchronized TailFile.Data getTailData()\r
751     throws ProCoreException {\r
752         Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.TAIL_DIR);\r
753         return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());\r
754     }\r
755 //    public synchronized TailFile.Data getTailDataFromHead()\r
756 //    throws ProCoreException {\r
757 //        Path tailFolder = proCoreProcess.builder.directory().toPath().resolve(ProCoreServer.BRANCH_DIR);\r
758 //        return TailFile.readTailFile(tailFolder.resolve(ProCoreServer.TAIL_FILE).toFile());\r
759 //    }\r
760     public static class GuardFile {\r
761         public static final String GUARD_FILE = ProCoreServer.GUARD_FILE;\r
762         private static final int VERSION_LINE = 1;\r
763         private static final int PID_LINE = 2;\r
764         private static final int PORT_LINE = 3;\r
765         private static final int EXIT_LINE = 4;\r
766         private static final int END_LINE = 5;\r
767         private final File guardFile;\r
768         GuardFile(File workingDir) {\r
769             guardFile = new File(workingDir, GUARD_FILE);\r
770         }\r
771         int getPort() throws ProCoreException {\r
772             String[] lines = getFileLines(guardFile, END_LINE);\r
773             if (lines.length < PORT_LINE)\r
774                 throw new ProCoreException("Server port is not available. " + guardFile.getAbsolutePath());\r
775             return parseInt(lines[PORT_LINE-1]);\r
776         }\r
777         private void checkVersion(String[] lines) throws ProCoreException {\r
778             if (lines.length < VERSION_LINE)\r
779                 throw new ProCoreException("Guard file version is not available. " + guardFile.getAbsolutePath());\r
780             int version = parseInt(lines[VERSION_LINE-1]);\r
781             if (1 != version)\r
782                 throw new GuardFileVersionException(guardFile, "Unsupported guard file version. version=" + version);\r
783         }\r
784         boolean delete() {\r
785             if (guardFile.exists())\r
786                 return guardFile.delete();\r
787             else\r
788                 return true; // Guard file is deleted i.e. does not exit.\r
789         }\r
790         void deleteLog() {\r
791             if (!guardFile.exists())\r
792                 return;\r
793             try {\r
794                 Files.delete(guardFile.toPath());\r
795             } catch (Throwable t) {\r
796                 t.printStackTrace();\r
797             }\r
798         }\r
799         Integer getExitValue() {\r
800             String lines[] = getFileLines(guardFile, END_LINE);\r
801             try {\r
802                 checkVersion(lines);\r
803                 if (lines.length >= EXIT_LINE)\r
804                     return parseInt(lines[EXIT_LINE-1]);\r
805                 else if (lines.length >= PID_LINE && !isRunning(lines[PID_LINE-1]))\r
806                     return -1; // Server has died without writing exit status.\r
807                 return null; // Server might be running.\r
808             } catch (Exception e) {\r
809                 Logger.defaultLogError("Could not get ProCoreServer exit value.", e);\r
810                 return null; // Server might be running.\r
811             }\r
812         }\r
813 //        void stopProCoreServer() throws ProCoreException {\r
814 //            if (!guardFile.exists())\r
815 //                throw new ServerGuardNotFoundException(guardFile.getParentFile(), "Guard file does not exist.");\r
816 //            String lines[] = new String[0];\r
817 //            for (int i=0; i<LONG_SLEEP_COUNT; ++i, sleep(i, LONG_SLEEP_COUNT)) {\r
818 //                 lines = getFileLines(guardFile, END_LINE);\r
819 //                if (lines.length < 1)\r
820 //                    throw new ProCoreException("Guard file does not contain lines.");\r
821 //                checkVersion(lines);\r
822 //                if (lines.length >= END_LINE)\r
823 //                    return; // Server is dead according to guard file.\r
824 //                if (!isRunning(lines[PID_LINE-1]))\r
825 //                    return; // ProCoreServer is not running.\r
826 //                if (lines.length >= PORT_LINE) //  ProCoreServer is running according to pid. Either it's slow or pid is for some other file.\r
827 //                    break;\r
828 //            }\r
829 //            if (lines.length < PORT_LINE)\r
830 //                throw new ProCoreException("Guard file does not contain port.");\r
831 //            try {\r
832 //                int port = parseInt(lines[PORT_LINE-1]);\r
833 //                if (port > 0) {\r
834 //                    ProCoreClient lProCoreClient = new ProCoreClient();\r
835 //                    SessionAddress sa = new SessionAddressImpl(new InetSocketAddress(LOCALHOST, port), true);\r
836 //                    lProCoreClient.connect(sa);\r
837 //                    try {\r
838 //                        String reply = lProCoreClient.execute(QUIT_CMD);\r
839 //                        int nConnect = Integer.parseInt(reply);\r
840 //                        if (nConnect > 1)\r
841 //                            throw new ProCoreException("More than one active connection in server while giving quit command. folder=" + guardFile.getParent());\r
842 //                    } finally {\r
843 //                        lProCoreClient.disconnect();\r
844 //                    }\r
845 //                }\r
846 //            } catch (Throwable t) {\r
847 //                throw new ProCoreException("Failed to stop procore server cleanly.", t);\r
848 //            }\r
849 //            // Managed to send quit command to server, let's wait and see if it has any effect.\r
850 //            boolean dead = wait4ServerToDie(null, LONG_SLEEP_COUNT);\r
851 //            if (!dead)\r
852 //                throw new ProCoreException("Unable to stop server.");\r
853 //        }\r
854         boolean isActive(ProCoreClient proCoreClient) throws ProCoreException {\r
855             if (proCoreClient.isConnected())\r
856                 throw new ProCoreException("Illegal argument. ProCoreClient must not be connected.");\r
857             String lines[] = getFileLines(guardFile, END_LINE);\r
858             if (lines.length != PORT_LINE)\r
859                 return false;\r
860             checkVersion(lines);\r
861             try {\r
862                 int port = parseInt(lines[PORT_LINE-1]);\r
863                 if (port > 0) {\r
864                     try {\r
865                         // Checking if core is responsive.\r
866                         // This should be supported regardless of version.\r
867                         SessionAddress sa = new SessionAddress(new InetSocketAddress(LOCALHOST, port), true);\r
868                         proCoreClient.connect(sa);\r
869                         proCoreClient.execute(""); // Dubious, not necessarily supported by all protocol versions.\r
870                         return true;\r
871                     } catch (ProCoreException e) {\r
872                     } finally { // We must disconnect or check protocol version.\r
873                         proCoreClient.disconnect();\r
874                     }\r
875                 }\r
876             } catch (Throwable t) {\r
877                 // Intentionally empty.\r
878             }\r
879             return false;\r
880         }\r
881         boolean isAlive() throws ProCoreException {\r
882             String lines[] = getFileLines(guardFile, END_LINE);\r
883             if (lines.length < 1)\r
884                 return false; // No lines, assuming server is dead.\r
885             checkVersion(lines);\r
886             if (lines.length < PID_LINE)\r
887                 return false; // No process id, assuming server is dead.\r
888             else if (lines.length >= END_LINE)\r
889                 return false; // End status available, assuming server is dead.\r
890             else if (lines.length >= EXIT_LINE)\r
891                 return false; // Exit status available, assuming server is dying.\r
892             else if (isRunning(lines[PID_LINE-1]))\r
893                 return true; // Process with given id running. Server might be alive.\r
894             else\r
895                 return false;// Process with given id not running.\r
896         }\r
897     }\r
898     public static class TailFile {\r
899         public static class Data {\r
900             Data(long version, long nextChangeSetId, long nextFreeId) {\r
901                 this.version = version;\r
902                 this.nextChangeSetId = nextChangeSetId;\r
903                 this.nextFreeId = nextFreeId;\r
904             }\r
905             long version;\r
906             public long nextChangeSetId;\r
907             long nextFreeId;\r
908         }\r
909         public static void createTailFile(File file, long nextChangeSetId, long nextFreeId, String dbId) throws ProCoreException {\r
910             if (file.exists())\r
911                 throw new TailException("Tail file exists and thus cannot be created. file=" + file);\r
912             FileOutputStream fos;\r
913             try {\r
914                 fos = new FileOutputStream(file);\r
915             } catch (FileNotFoundException e) {\r
916                 throw new TailException("Tail file cannot be created. file=" + file);\r
917             }\r
918             PrintStream ps = new PrintStream(fos);\r
919             try {\r
920                 ps.println(1); // file version\r
921                 ps.println(nextChangeSetId); // next revision\r
922                 ps.println(nextFreeId); // next free cluster\r
923                 ps.println(dbId); // database id\r
924             } finally {\r
925                 ps.close();\r
926             }\r
927         }\r
928         public static Data readTailFile(File file) throws ProCoreException {\r
929             String lines[] = getFileLines(file, 3);\r
930             if (lines.length < 3)\r
931                 throw new TailReadException("Could not read data from " + file);\r
932             long ver = Long.parseLong(lines[0]);\r
933             long cs = Long.parseLong(lines[1]);\r
934             long id = Long.parseLong(lines[2]);\r
935             return new Data(ver, cs, id);\r
936         }\r
937     }\r
938 }\r
939 class SessionAddress {\r
940     private final InetSocketAddress socketAddress;\r
941     private final boolean ignoreProtocolVersion;\r
942     public SessionAddress(InetSocketAddress socketAddress, boolean ignoreProtocolVersion) {\r
943         assert(null != socketAddress);\r
944         this.socketAddress = socketAddress;\r
945         this.ignoreProtocolVersion = ignoreProtocolVersion;\r
946     }\r
947     public InetSocketAddress getSocketAddress() {\r
948         return socketAddress;\r
949     }\r
950     public boolean ignoreProtocolVersion() {\r
951         return ignoreProtocolVersion;\r
952     }\r
953     @Override\r
954     public String toString() {\r
955         return socketAddress.toString();\r
956     }\r
957 }\r
958 class PacketQueue {\r
959     private final LinkedBlockingQueue<Packet> packets = new LinkedBlockingQueue<Packet>();\r
960     PacketQueue() {\r
961     }\r
962     void addFirst(Packet packet) {\r
963         packets.add(packet);\r
964     }\r
965     Packet takeFirst() throws InterruptedException {\r
966         return packets.take();\r
967     }\r
968 }\r
969 class Header {\r
970     Header(int[] ints) {\r
971         lastTokenIn = ints[0];\r
972         token = ints[1];\r
973         messageNumber = ints[2];\r
974         inflateSize = ints[3];\r
975         deflateSize = ints[4];\r
976     }\r
977     int lastTokenIn;\r
978     int token;\r
979     int messageNumber;\r
980     int inflateSize;\r
981     int deflateSize;\r
982     @Override\r
983     public String toString() {\r
984         StringBuilder sb = new StringBuilder(80);\r
985         sb.append("lastTokenIn=");\r
986         sb.append(lastTokenIn);\r
987         sb.append(" token=");\r
988         sb.append(token);\r
989         sb.append(" messageNumber=");\r
990         sb.append(messageNumber);\r
991         sb.append(" inflateSize=");\r
992         sb.append(inflateSize);\r
993         sb.append(" deflateSize=");\r
994         sb.append(deflateSize);\r
995         return sb.toString();\r
996     }\r
997 }\r
998 class Packet {\r
999     Header header;\r
1000     ByteBuffer bytes;\r
1001     Packet(int[] header, ByteBuffer bytes) {\r
1002         this.header = new Header(header);\r
1003         this.bytes = bytes;\r
1004     }\r
1005 }\r
1006 class ConnectionThread {\r
1007     private static final boolean DEBUG = DebugPolicy.REPORT_SERVER_EVENTS;\r
1008     private static final int HEADER_N = 5;\r
1009     private static final int HEADER_SIZE = HEADER_N * 4;\r
1010     private static final int DEFLATE = 4;\r
1011     private static final int INCREMENT_SIZE = 8192;\r
1012     private static int instanceCount = 0;\r
1013     class Manager implements Runnable {\r
1014         private final int instance;\r
1015         private volatile boolean carryOn = true;\r
1016         private volatile int lastTokenIn = 0;\r
1017         private volatile MethodQueue methodQueue = new MethodQueue();\r
1018         private volatile CountDownLatch running = new CountDownLatch(1);\r
1019         private final EventHandler defaultEventHandler = new DefaultEventHandler();\r
1020 //        private EventHandler eventHandler = defaultEventHandler;\r
1021         Manager() {\r
1022             instance = ++instanceCount;\r
1023         }\r
1024         @Override\r
1025         public String toString() {\r
1026             return "Connection " + instance;\r
1027         }\r
1028         InetSocketAddress getAddress() {\r
1029             return connection.getAddress();\r
1030         }\r
1031         boolean isConnected() {\r
1032             return connection.isConnected();\r
1033         }\r
1034         void reconnect() throws ProCoreException, InterruptedException {\r
1035             InetSocketAddress a = connection.getAddress();\r
1036             if (connection.addressNotInitialized() || 0 == a.getPort())\r
1037                 throw new ProCoreException("Address not ok. address=" + connection.getAddress());\r
1038             connect(a);\r
1039         }\r
1040         void connect(int port) throws ProCoreException, InterruptedException {\r
1041             InetSocketAddress a = connection.getAddress();\r
1042             if (connection.addressNotInitialized() || 0 == a.getPort() || (port != 0 && port != a.getPort()))\r
1043                 a = new InetSocketAddress(ProCoreClient.LOCALHOST, port);\r
1044             connect(a);\r
1045         }\r
1046         void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {\r
1047             if (connection.isConnected()) {\r
1048                 if (connection.equalsAddress(address))\r
1049                     return;\r
1050                 else\r
1051                     throw new ProCoreException("Already connected to different address. old=" + connection.getAddress() + " new=" + address);\r
1052             }\r
1053             connection.init4Connection(address);\r
1054             connectionManager.connect(connection);\r
1055 //            compression = new Compression();\r
1056             if (thread.isAlive())\r
1057                 return;\r
1058             thread.start();\r
1059             running.await();\r
1060         }\r
1061         void call(Method method) throws ProCoreException, InterruptedException, IOException {\r
1062 //            eventHandler = method.getEventHandler(eventHandler);\r
1063             connection.sendMethod(methodQueue, method, lastTokenIn);\r
1064         }\r
1065         void disconnect() {\r
1066             methodQueue.close();\r
1067             connection.disconnect();\r
1068         }\r
1069         @Override\r
1070         public void run() { // MARK: CT.Manger.run\r
1071             try {\r
1072                 running.countDown();\r
1073                 while (carryOn) {\r
1074                     Packet packet = null;\r
1075                     try {\r
1076                         packet = inputQueue.takeFirst();\r
1077                     } catch (InterruptedException e) {\r
1078                         Util.trace("Wait for input packet interrupted. this=" + this);\r
1079                         continue;\r
1080                     }\r
1081                     if (null == packet)\r
1082                         break;\r
1083                     handlePacket(packet);\r
1084                 }\r
1085                 if (carryOn)\r
1086                     Util.trace("Thread " + thread.getName() + " stopped with null packet.");\r
1087                 else\r
1088                     Util.trace("Thread " + thread.getName() + " stopped with false carryOn.");\r
1089             } catch (Throwable t) {\r
1090                 Util.logError("Thread " + thread.getName() + " stopped to exception:", t);\r
1091             } finally {\r
1092                 disconnect();\r
1093                 running.countDown();\r
1094             }\r
1095         }\r
1096         private void handlePacket(Packet packet) throws ProCoreException {\r
1097             if (packet.header.token > lastTokenIn) {\r
1098                 lastTokenIn =  packet.header.token;\r
1099                 if (DEBUG)\r
1100                     Util.showDebug("Setting lastTokeinIn=" + lastTokenIn);\r
1101             }\r
1102             Method method = methodQueue.remove(packet.header.token);\r
1103             if (null == method) {\r
1104                 try {\r
1105                     switch (packet.header.messageNumber) {\r
1106                         default: {\r
1107                             Util.logError("Missing method. token=" + packet.header.token);\r
1108                         } return;\r
1109                         case MessageNumber.ChangeSetUpdateRequest: {\r
1110                             defaultEventHandler.on(new ChangeSetUpdateEvent(packet));\r
1111                         } break;\r
1112                     }\r
1113                 } catch (Throwable t) {\r
1114                     Util.logError("Event handler failed:", t);\r
1115                 }\r
1116                 return;\r
1117             }\r
1118 //            else\r
1119 //                eventHandler = method.getEventHandler(defaultEventHandler);\r
1120             switch (packet.header.messageNumber) {\r
1121             default: {\r
1122                 String s = "Illegal message number " + packet.header.messageNumber;\r
1123                 method.gotException(s);\r
1124             } break;\r
1125             case MessageNumber.ChangeSetUpdateRequest:\r
1126                 method.getEventHandler(defaultEventHandler).on(new ChangeSetUpdateEvent(packet));\r
1127                 methodQueue.add(method); // Event caused by method. Actual method not yet handled.\r
1128                 break;\r
1129             case MessageNumber.NullResponse:\r
1130                 //Util.log("Null(ack) message. method=" + method + " packet=" + packet);\r
1131             break;\r
1132             case MessageNumber.ExceptionResponse:\r
1133                 method.gotException(packet);\r
1134             break; // MARK: handlePacket\r
1135             case MessageNumber.AAAResponse:\r
1136             case MessageNumber.AskTransactionResponse:\r
1137             case MessageNumber.CancelCommitResponse:\r
1138             case MessageNumber.CloseClientSessionResponse:\r
1139             case MessageNumber.EchoResponse:\r
1140             case MessageNumber.EndTransactionResponse:\r
1141             case MessageNumber.ExecuteResponse:\r
1142             case MessageNumber.GetChangeSetContextResponse:\r
1143             case MessageNumber.GetChangeSetDataResponse:\r
1144             case MessageNumber.GetChangeSetsResponse:\r
1145             case MessageNumber.GetClusterNewResponse:\r
1146             case MessageNumber.GetRefreshResponse:\r
1147             case MessageNumber.GetResourceSegmentResponse:\r
1148             case MessageNumber.GetServerInfoResponse:\r
1149             case MessageNumber.OpenClientSessionResponse:\r
1150             case MessageNumber.RefreshResponse:\r
1151             case MessageNumber.ReserveIdsResponse:\r
1152             case MessageNumber.UndoResponse:\r
1153             case MessageNumber.ReconnectResponse:\r
1154             case MessageNumber.GetRefresh2Response:\r
1155             case MessageNumber.GetClusterChangesResponse:\r
1156             case MessageNumber.GetServerInfo2Response:\r
1157             case MessageNumber.ListClustersResponse: {\r
1158                 method.gotPacket(packet);\r
1159             } break;\r
1160             }\r
1161         }\r
1162     }\r
1163     private final ConnectionManager connectionManager;\r
1164     private final PacketQueue inputQueue = new PacketQueue();\r
1165     private final Connection connection = new Connection(this);\r
1166     private final Manager manager = new Manager();\r
1167     private Thread thread = new Thread(manager, manager.toString());\r
1168     private int[] header = null;\r
1169     private int[] INTS = new int[HEADER_N];\r
1170     private int remaining = HEADER_SIZE;\r
1171     private int deflateSize;\r
1172     ConnectionThread(ConnectionManager connectionManager) {\r
1173         this.connectionManager = connectionManager;\r
1174    }\r
1175     ByteBuffer getInputBuffer() {\r
1176         return getInputBuffer(INCREMENT_SIZE);\r
1177     }\r
1178     ByteBuffer getInputBuffer(int sizeAtLeast) {\r
1179         ByteBuffer bb = ByteBuffer.allocate(Math.max(sizeAtLeast, INCREMENT_SIZE));\r
1180         bb.order(ByteOrder.LITTLE_ENDIAN);\r
1181         return bb;\r
1182     }\r
1183     @Override\r
1184     public String toString() {\r
1185         return "header=" + header + " remaining=" + remaining;\r
1186     }\r
1187     private String toString(ByteBuffer buffer, int nRead) {\r
1188         return this + " nRead=" + nRead + " buffer=" + buffer;\r
1189     }\r
1190     ByteBuffer handleInput(ByteBuffer buffer, int nRead) // nRead > 0\r
1191     throws IOException { // MARK: handleInput\r
1192         int position = buffer.position();\r
1193         if (null == header) {\r
1194             if (position < HEADER_SIZE)\r
1195                 return buffer;\r
1196             else { // Got header.\r
1197                 buffer.position(0);\r
1198                 IntBuffer ib = buffer.asIntBuffer();\r
1199                 ib.get(INTS);\r
1200                 ib = null;\r
1201                 header = INTS;\r
1202                 deflateSize = header[DEFLATE];\r
1203                 if (deflateSize < 0)\r
1204                     throw new IOException("Illegal deflate size=" + deflateSize);\r
1205                 remaining += deflateSize;\r
1206                 if (remaining > buffer.limit()) { // We need more space. This is not optimal but a simple way to handle this case.\r
1207                     ByteBuffer bb = getInputBuffer(remaining);\r
1208                     buffer.position(0);\r
1209                     buffer.limit(position);\r
1210                     bb.put(buffer);\r
1211                     return bb;\r
1212                 }\r
1213                 buffer.position(position);\r
1214             }\r
1215         }\r
1216         if (position < remaining)\r
1217             return buffer; // Need more data.\r
1218         else if (position > remaining) { // Got whole packet and then some. This is not optimal but a simple way to handle this case.\r
1219             ByteBuffer bb = getInputBuffer(remaining);\r
1220             buffer.position(0);\r
1221             buffer.limit(remaining);\r
1222             bb.put(buffer);\r
1223             inputQueue.addFirst(new Packet(header, bb));\r
1224             ByteBuffer b2 = getInputBuffer(buffer.limit());\r
1225             buffer.position(remaining);\r
1226             buffer.limit(position);\r
1227             b2.put(buffer);\r
1228             header = null;\r
1229             remaining = HEADER_SIZE;\r
1230             return handleInput(b2, b2.position());\r
1231         } else if (position != remaining)\r
1232             throw new IOException("Assertion error. this=" + toString(buffer, nRead));\r
1233         // Got exactly one packet.\r
1234         inputQueue.addFirst(new Packet(header, buffer));\r
1235         header = null;\r
1236         remaining = HEADER_SIZE;\r
1237         return getInputBuffer(INCREMENT_SIZE);\r
1238     }\r
1239     InetSocketAddress getAddress() {\r
1240         return manager.getAddress();\r
1241     }\r
1242     boolean isConnected() {\r
1243         return manager.isConnected();\r
1244     }\r
1245     void reconnect() throws ProCoreException, InterruptedException {\r
1246         manager.reconnect();\r
1247     }\r
1248     void connect(int port) throws ProCoreException, InterruptedException {\r
1249         manager.connect(port);\r
1250     }\r
1251     void connect(InetSocketAddress address) throws ProCoreException, InterruptedException {\r
1252         manager.connect(address);\r
1253     }\r
1254     void disconnect() {\r
1255         manager.disconnect();\r
1256     }\r
1257     Connection call(Method m) throws ProCoreException, InterruptedException, IOException {\r
1258         manager.call(m);\r
1259         return connection;\r
1260     }\r
1261 }\r
1262 class Client {\r
1263     private final ProCoreServer proCoreServer;\r
1264     private final ProCoreClient proCoreClient;\r
1265     private final ConnectionThread connectionThread;\r
1266     private boolean closed = false;\r
1267     Client(ProCoreServer proCoreServer, ProCoreClient proCoreClient, ConnectionThread connectionThread) {\r
1268         this.proCoreServer = proCoreServer;\r
1269         this.proCoreClient = proCoreClient;\r
1270         this.connectionThread = connectionThread;\r
1271     }\r
1272     void open() throws ProCoreException, InterruptedException {\r
1273         synchronized (proCoreClient) {\r
1274             if (!connectionThread.isConnected())\r
1275                 connectionThread.connect(proCoreServer.getPort());\r
1276         }\r
1277         closed = false;\r
1278     }\r
1279     boolean isOpen() {\r
1280         synchronized (proCoreClient) {\r
1281             return connectionThread.isConnected();\r
1282         }\r
1283     }\r
1284     boolean isClosed() {\r
1285         synchronized (proCoreClient) {\r
1286             return closed || !connectionThread.isConnected();\r
1287         }\r
1288     }\r
1289     void close() {\r
1290         proCoreClient.deleteClient(this);\r
1291         closed = true;\r
1292     }\r
1293     String execute(String command) throws ProCoreException {\r
1294         return proCoreClient.execute(command);\r
1295     }\r
1296     void call(Method m) throws ProCoreException {\r
1297         proCoreClient.call(m);\r
1298     }\r
1299 }\r
1300 class ProCoreClient {\r
1301     public static final String LOCALHOST = "127.0.0.1";\r
1302     class Remote {\r
1303         private final ConnectionManager connectionManager = ConnectionManager.getInstance();\r
1304         private final ConnectionThread connectionThread = new ConnectionThread(connectionManager);\r
1305         Remote() {\r
1306         }\r
1307     }\r
1308     private final Remote remote = new Remote();\r
1309     private final Set<Client> clients = new  HashSet<Client>();\r
1310     private final ProCoreServer proCoreServer;\r
1311     public ProCoreClient(ProCoreServer proCoreServer) throws ProCoreException {\r
1312         this.proCoreServer = proCoreServer;\r
1313     }\r
1314     synchronized public int getPort() {\r
1315         return remote.connectionThread.getAddress().getPort();\r
1316     }\r
1317     synchronized public boolean isConnected() {\r
1318         return remote.connectionThread.isConnected();\r
1319     }\r
1320     synchronized public Client newClient() throws ProCoreException {\r
1321         return newClient(proCoreServer, remote.connectionThread.getAddress());\r
1322     }\r
1323     synchronized public Client newClient(ProCoreServer proCoreServer, InetSocketAddress address) throws ProCoreException {\r
1324         if (!remote.connectionThread.isConnected())\r
1325             try {\r
1326                 remote.connectionThread.connect(address);\r
1327             } catch (InterruptedException e) {\r
1328                 if (!remote.connectionThread.isConnected()) {\r
1329                     remote.connectionThread.disconnect();\r
1330                     throw new ProCoreException("Failed to create new client.");\r
1331                 }\r
1332             }\r
1333         else if (!address.equals(remote.connectionThread.getAddress()))\r
1334             throw new ProCoreException("Illegal newClient call. Already connected to different address.");\r
1335         Client client = new Client(proCoreServer, this, remote.connectionThread);\r
1336         clients.add(client);\r
1337         return client;\r
1338     }\r
1339     synchronized int deleteClient(Client client) {\r
1340         clients.remove(client);\r
1341         return clients.size();\r
1342     }\r
1343     void closeClient(int aSessionId) throws ProCoreException {\r
1344         try {\r
1345             CloseClientSessionFunction f = new CloseClientSessionFunction(aSessionId);\r
1346             Method m = new Method(f, null, null);\r
1347             Connection c = remote.connectionThread.call(m);\r
1348             m.waitForReply(c);\r
1349         } catch (InterruptedException e) {\r
1350             throw new ProCoreException("Thread interrupted during method call.", e);\r
1351         } catch (IOException e) {\r
1352             throw new ProCoreException("IOException during method call.", e);\r
1353         } catch (Throwable e) {\r
1354             throw new ProCoreException("Throwable during method call.", e);\r
1355         }\r
1356  }\r
1357     synchronized public String execute(String command) throws ProCoreException {\r
1358         try {\r
1359             ExecuteFunction f = new ExecuteFunction(command);\r
1360             Method m = new Method(f, null, null);\r
1361             Connection c = remote.connectionThread.call(m);\r
1362             m.waitForReply(c);\r
1363             return f.out;\r
1364         } catch (InterruptedException e) {\r
1365             throw new ProCoreException("Thread interrupted during method call.", e);\r
1366         } catch (IOException e) {\r
1367             throw new ProCoreException("IOException during method call.", e);\r
1368         } catch (Throwable e) {\r
1369             throw new ProCoreException("Throwable during method call.", e);\r
1370         }\r
1371     }\r
1372     private void recall(Method method, IOException e) throws ProCoreException {// :)\r
1373         if (remote.connectionThread.isConnected())\r
1374             throw new ProCoreException("IOException during method call.", e);\r
1375         try {\r
1376             remote.connectionThread.reconnect();\r
1377             Connection c = remote.connectionThread.call(method);\r
1378             method.waitForReply(c);\r
1379         } catch (Throwable t) {\r
1380             throw new ProCoreException("IOException during method call. Recall failed.", e);\r
1381         }\r
1382     }\r
1383     synchronized public void call(Method method) throws ProCoreException {\r
1384         try {\r
1385             Connection c = remote.connectionThread.call(method);\r
1386             method.waitForReply(c);\r
1387         } catch (InterruptedException e) {\r
1388             throw new ProCoreException("Thread interrupted during method call.", e);\r
1389         } catch (IOException e) {\r
1390             recall(method, e);\r
1391         } catch (ProCoreException e) {\r
1392             throw e;\r
1393         } catch (Throwable e) {\r
1394             throw new ProCoreException("Throwable during method call.", e);\r
1395         }\r
1396     }\r
1397     synchronized public void connect(int port) throws ProCoreException, InterruptedException {\r
1398         remote.connectionThread.connect(port);\r
1399     }\r
1400     synchronized public void connect(SessionAddress sessionAddress)\r
1401     throws ProCoreException, InterruptedException {\r
1402         remote.connectionThread.connect(sessionAddress.getSocketAddress());\r
1403     }\r
1404     synchronized public void disconnect() throws ProCoreException {\r
1405         remote.connectionThread.disconnect();\r
1406     }\r
1407 }\r
1408 abstract class Event {\r
1409     final String name;\r
1410     final Packet packet;\r
1411     Event(String name, Packet packet) {\r
1412         this.name = name;\r
1413         this.packet = packet;\r
1414     }\r
1415     int getToken() {\r
1416         return packet.header.token;\r
1417     }\r
1418     void deserialize(org.simantics.db.server.protocol.AbstractEvent event) {\r
1419         event.deserialize(event.getRequestNumber(), getDataBuffer());\r
1420     }\r
1421     private org.simantics.db.server.protocol.DataBuffer getDataBuffer() {\r
1422         this.packet.bytes.position(20);\r
1423         return new org.simantics.db.server.protocol.DataBuffer(this.packet.bytes);\r
1424     }\r
1425     @Override public String toString() {\r
1426         return name + " token=" + packet.header.token;\r
1427     }\r
1428 }\r
1429 class ChangeSetUpdateEvent extends Event {\r
1430     ChangeSetUpdateEvent(Packet packet) {\r
1431         super("ChangeSetUpdateEvent", packet);\r
1432     }\r
1433 }\r
1434 abstract class EventHandler {\r
1435     abstract void on(Event event);\r
1436 }\r
1437 class DefaultEventHandler extends EventHandler {\r
1438     void on(Event event) {\r
1439         Util.log("Missing event handler. event=" + event);\r
1440     }\r
1441 }\r