]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ConnectionManager.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.db.server / src / org / simantics / db / server / internal / ConnectionManager.java
diff --git a/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ConnectionManager.java b/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ConnectionManager.java
new file mode 100644 (file)
index 0000000..fb01a06
--- /dev/null
@@ -0,0 +1,809 @@
+package org.simantics.db.server.internal;\r
+\r
+import java.io.IOException;\r
+import java.net.InetSocketAddress;\r
+import java.nio.ByteBuffer;\r
+import java.nio.ByteOrder;\r
+import java.nio.channels.CancelledKeyException;\r
+import java.nio.channels.ClosedChannelException;\r
+import java.nio.channels.SelectionKey;\r
+import java.nio.channels.Selector;\r
+import java.nio.channels.SocketChannel;\r
+import java.util.Collections;\r
+import java.util.Iterator;\r
+import java.util.LinkedList;\r
+import java.util.List;\r
+import java.util.concurrent.CountDownLatch;\r
+import java.util.concurrent.TimeUnit;\r
+\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.server.ProCoreException;\r
+import org.simantics.db.server.protocol.AbstractFunction;\r
+import org.simantics.db.server.protocol.DataBuffer;\r
+import org.simantics.db.server.protocol.ExceptionFunction;\r
+import org.simantics.db.server.protocol.GetClusterNewFunction;\r
+import org.simantics.db.server.protocol.MessageNumber;\r
+\r
+import gnu.trove.iterator.TIntObjectIterator;\r
+import gnu.trove.map.hash.TIntObjectHashMap;\r
+\r
+public class ConnectionManager {\r
+    private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS;\r
+    private static final ConnectionManager connectionManager = new ConnectionManager();\r
+    public static ConnectionManager getInstance() {\r
+        return connectionManager.instance();\r
+    }\r
+    private final Manager manager = new Manager();\r
+    private final Thread thread = new Thread(manager, "Connection Manager");\r
+    private boolean stop = false;\r
+    protected List<Connection> connections = Collections.synchronizedList(new LinkedList<Connection>());\r
+    private ConnectionManager() {\r
+        Util.trace("ConnectionManager.new");\r
+    }\r
+    private ConnectionManager instance() {\r
+        if (!thread.isAlive())\r
+            start();\r
+        return this;\r
+    }\r
+    protected class Manager implements Runnable {\r
+        private Selector selector;\r
+        private CountDownLatch selectorOpen = new CountDownLatch(1);\r
+        Manager() {\r
+        }\r
+        public void run() {\r
+            try {\r
+                selector = Selector.open();\r
+                selectorOpen.countDown();\r
+                while (true) {\r
+                    int n = selector.select();\r
+                    try {\r
+                        if (n > 0)\r
+                            processSelected();\r
+                        processConnectors();\r
+                        if (stop) {\r
+                            selector.close();\r
+                            return;\r
+                        }\r
+                    } catch (CancelledKeyException e) {\r
+                        Logger.defaultLogError("Cancelled key in select loop.", e);\r
+                    }\r
+                }\r
+            } catch (Throwable x) {\r
+                Logger.defaultLogError("Select loop failed.", x);\r
+            }\r
+        }\r
+        void processConnectors() throws ClosedChannelException {\r
+            while (connections.size() > 0) {\r
+                Connection c = connections.remove(0);\r
+                c.register(selector);\r
+            }\r
+        }\r
+        void processSelected() {\r
+            Iterator<SelectionKey> i = selector.selectedKeys().iterator();\r
+            while (i.hasNext()) {\r
+                SelectionKey sk = (SelectionKey)i.next();\r
+                i.remove();\r
+                try {\r
+                    if (!sk.isValid())\r
+                        processDisconnect(sk);\r
+                    else if (sk.isConnectable())\r
+                        processConnect(sk);\r
+                    else if (sk.isReadable())\r
+                        processRead(sk);\r
+                } catch (CancelledKeyException e) {\r
+                    processDisconnect(sk);\r
+                }\r
+            }\r
+        }\r
+        private void processDisconnect(SelectionKey key) {\r
+            Connection c = (Connection)key.attachment();\r
+            if (null == c)\r
+                return;\r
+            c.onDisconnect();\r
+            key.attach(null); // Just to make sure that reference to connection is released.\r
+            key.cancel();\r
+        }\r
+        private void processConnect(SelectionKey key) {\r
+                Connection connection = (Connection)key.attachment();\r
+                SocketChannel sc = (SocketChannel)key.channel();\r
+                try {\r
+                    if (sc.finishConnect()) {\r
+                        int ops = key.interestOps() & ~SelectionKey.OP_CONNECT;\r
+                        ops |= SelectionKey.OP_READ;\r
+                        key.interestOps(ops);\r
+                        Connection c = connection.onConnectSucceeded(key);\r
+                        if (c == null) { // Connector refused to accept connection.\r
+                            key.attach(null); // Just to make sure that reference to connector is released.\r
+                            key.cancel();\r
+                            sc.close();\r
+                        } else\r
+                            key.attach(c);\r
+                    }\r
+                } catch (Throwable x) {\r
+                    try {\r
+                        if (key.isValid())\r
+                            key.attach(null); // Just to make sure that reference to connector is released.\r
+                            key.cancel();\r
+                        try {\r
+                            sc.close();\r
+                        } catch (IOException e) {\r
+                            if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used.\r
+                                Logger.defaultLogError("Failed to close connection.", e);\r
+                        }\r
+                    } finally {\r
+                        connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x));\r
+                    }\r
+                }\r
+            }\r
+        }\r
+        private void processRead(SelectionKey key) {\r
+            Connection c = (Connection)key.attachment();\r
+            SocketChannel sc = (SocketChannel)key.channel();\r
+            try {\r
+                int ops = key.interestOps();\r
+                ops |= SelectionKey.OP_READ;\r
+                key.interestOps(ops);\r
+                c.onRead();\r
+            } catch (IOException x) {\r
+                if (key.isValid()) {\r
+                    key.attach(null); // Just to make sure that reference to connection is released.\r
+                    key.cancel();\r
+                }\r
+                if (sc.isOpen())\r
+                    try {\r
+                        sc.close();\r
+                    } catch (IOException e) {\r
+                    }\r
+            }\r
+        }\r
+        void connect(Connection connection) throws ProCoreException {\r
+            try {\r
+                if (connections.contains(connection))\r
+                    throw new ProCoreException("Connection already registered. connection=" + connection);\r
+                connection.prepare4Connection(SocketChannel.open());\r
+                connections.add(connection);\r
+                manager.selector.wakeup();\r
+                connection.wait4Connection();\r
+            } catch(IOException e) {\r
+                throw new ConnectionException("Failed to connect to " + connection + ".", e);\r
+            } catch (InterruptedException e) {\r
+                if (!connection.isConnected())\r
+                    throw new NotConnectedException("Connection interrupted to "  + connection + ".");\r
+            }\r
+        }\r
+    private void start() {\r
+        thread.start();\r
+        boolean once = true;\r
+        while (manager.selectorOpen.getCount() > 0) {\r
+            try {\r
+                manager.selectorOpen.await();\r
+            } catch (InterruptedException e) {\r
+                if (once) {\r
+                    once = false;\r
+                    Util.logError("Wait for selector open interrupted. Continuing wait.", e);\r
+                }\r
+            }\r
+        }\r
+    }\r
+    public void stop() {\r
+        stop = true;\r
+        manager.selector.wakeup();\r
+    }\r
+}\r
+//class MethodQueue {\r
+//    private LinkedList<Method> methods = new LinkedList<Method>();\r
+//    MethodQueue() {\r
+//    }\r
+//    public synchronized int size() {\r
+//        return methods.size();\r
+//    }\r
+//    public synchronized void addFirst(Method method) {\r
+//        methods.addFirst(method);\r
+//    }\r
+//    public synchronized void addLast(Method method) {\r
+//        methods.addLast(method);\r
+//    }\r
+//    public synchronized Method removeFirst() {\r
+//        try {\r
+//            Method method = methods.removeFirst();\r
+//            return method;\r
+//        } catch (NoSuchElementException e) {\r
+//            return null;\r
+//        }\r
+//    }\r
+//    public synchronized void close() {\r
+//        while (size() > 0) {\r
+//            Method method = removeFirst();\r
+//            method.setExceptionText("Closing.");\r
+//            method.gotResponse();\r
+//        }\r
+//    }\r
+//\r
+//}\r
+//class RequestQueue {\r
+//    private LinkedList<AbstractFunction> functions = new LinkedList<AbstractFunction>();\r
+//\r
+//    RequestQueue() {\r
+//    }\r
+//    public synchronized int size() {\r
+//        return functions.size();\r
+//    }\r
+//    public synchronized void addFirst(AbstractFunction e) {\r
+//        functions.addFirst(e);\r
+//    }\r
+//    public synchronized void addLast(AbstractFunction e) {\r
+//        functions.addLast(e);\r
+//    }\r
+//    public synchronized AbstractFunction removeFirst() {\r
+//        try {\r
+//            AbstractFunction ret = functions.removeFirst();\r
+//            return ret;\r
+//        } catch (NoSuchElementException e) {\r
+//            return null;\r
+//        }\r
+//    }\r
+//}\r
+class MethodQueue {\r
+    private TIntObjectHashMap<Method> methodMap = new TIntObjectHashMap<Method>();\r
+    MethodQueue() {\r
+    }\r
+    public synchronized int size() {\r
+        return methodMap.size();\r
+    }\r
+    public synchronized boolean isEmpty() {\r
+        return 0 == size();\r
+    }\r
+    public synchronized void add(Method method) throws ProCoreException {\r
+        int token = method.getToken();\r
+        if (0 == token)\r
+            throw new ProCoreException("Illegal token number.");\r
+        Method oldMethod = methodMap.put(token, method);\r
+        if (null != oldMethod)\r
+            throw new ProCoreException("Duplicate token number.");\r
+    }\r
+    public synchronized Method remove(int token) {\r
+        Method method = methodMap.remove(token);\r
+        return method;\r
+    }\r
+    synchronized void close() {\r
+        try {\r
+            TIntObjectIterator<Method> iterator = methodMap.iterator();\r
+            while (iterator.hasNext()) {\r
+              iterator.advance();\r
+              Method method = iterator.value();\r
+              if (null == method)\r
+                  Util.logError("Null function in method queue.");\r
+              method.gotException("Method queue closing.");\r
+          }\r
+        } finally {\r
+            methodMap.clear();\r
+        }\r
+    }\r
+//    public synchronized Method removeAny() throws ProCoreException {\r
+//        TIntObjectIterator<Method> iterator = methodMap.iterator();\r
+//        if (!iterator.hasNext())\r
+//            return null;\r
+//        iterator.advance();\r
+//        Method method = iterator.value();\r
+//        if (null == method)\r
+//            throw new ProCoreException("Null function in response queue.");\r
+//        methodMap.remove(method.getToken());\r
+//        return method;\r
+//    }\r
+}\r
+class Channel {\r
+    private SelectionKey selectionKey = null;\r
+    private SocketChannel socketChannel = null;\r
+    Channel() {\r
+    }\r
+    void register(Selector selector, Connection connection)\r
+    throws ClosedChannelException {\r
+        if (null == socketChannel)\r
+            throw new ClosedChannelException();\r
+        socketChannel.register(selector, SelectionKey.OP_CONNECT, connection);\r
+    }\r
+    void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address)\r
+    throws ProCoreException, IOException {\r
+        if (null == socketChannel)\r
+            throw new NotConnectedException("Illegal argument. No socket channel to connect to.");\r
+        if (null == address)\r
+            throw new NotConnectedException("Illegal argument. No address to connect to.");\r
+        if (null != this.socketChannel)\r
+            throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this);\r
+        socketChannel.configureBlocking(false);\r
+        boolean connected = socketChannel.connect(address);\r
+        if (connected)\r
+            if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
+                Util.showDebug("Connected to " + socketChannel + ".");\r
+        this.socketChannel = socketChannel; // Waiting for connect.\r
+    }\r
+//    private static final int IPTOS_LOWDELAY = 0x10;\r
+    synchronized void connect(SelectionKey key) {\r
+        this.selectionKey = key;\r
+        this.socketChannel = (SocketChannel)key.channel();\r
+        if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
+            Util.showDebug("Connected to " + socketChannel + ".");\r
+//        try {\r
+//            this.socketChannel.socket().setKeepAlive(true);\r
+//            this.socketChannel.socket().setSoLinger(true, 1);\r
+//        } catch (SocketException e) {\r
+//            e.printStackTrace();\r
+//        }\r
+//        try {\r
+//            this.socketChannel.socket().setTcpNoDelay(true);\r
+        // Crashes Kalle!\r
+        // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY);\r
+//        } catch (SocketException e) {\r
+//            e.printStackTrace();\r
+//        }\r
+    }\r
+    synchronized void disconnect() {\r
+        if (selectionKey == null)\r
+            return;\r
+        try {\r
+            try {\r
+                if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
+                    Util.showDebug("Disconnected from " + socketChannel);\r
+                selectionKey.interestOps(0);\r
+                selectionKey.cancel();\r
+                selectionKey.selector().wakeup();\r
+            } catch (CancelledKeyException e) {\r
+                if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
+                    Logger.defaultLogError("Failed to disconnect cleanly.", e);\r
+            } catch (IllegalArgumentException e) {\r
+                if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
+                    Logger.defaultLogError("Failed to disconnect cleanly.", e);\r
+            }\r
+            try { // Note that this must be after the selectionKey operations above.\r
+                socketChannel.socket().close();\r
+            } catch (IOException e) {\r
+                if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
+                    Logger.defaultLogError("Failed to close socket cleanly.", e);\r
+            }\r
+        } finally {\r
+            selectionKey = null;\r
+            socketChannel = null;\r
+        }\r
+    }\r
+    synchronized boolean isConnected() {\r
+        boolean ret = selectionKey != null && socketChannel.isConnected();\r
+        return ret;\r
+    }\r
+    synchronized SocketChannel socket() throws IOException {\r
+        if (null == socketChannel)\r
+            throw new IOException("Socket channel is null.");\r
+        return socketChannel;\r
+    }\r
+    synchronized void listenReadEvents()\r
+    throws IOException {\r
+        if (null == selectionKey)\r
+            throw new IOException("Trying to listen connection which is not connected.");\r
+        int ops = selectionKey.interestOps() | SelectionKey.OP_READ;\r
+        selectionKey.interestOps(ops);\r
+        selectionKey.selector().wakeup();\r
+    }\r
+}\r
+class Connection {\r
+    private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS;\r
+    private static int lastSentTokenNumber = 0;\r
+    private final ConnectionThread connectionThread;\r
+    private InetSocketAddress address;\r
+    private Channel channel;\r
+    private CountDownLatch connectionCreated;\r
+    private ProCoreException exception;\r
+    private ByteBuffer readBuffer;\r
+    Connection(ConnectionThread connectionThread) {\r
+        this.connectionThread = connectionThread;\r
+        init4Connection(InetSocketAddress.createUnresolved("", 0));\r
+    }\r
+    @Override\r
+    public String toString() {\r
+        return "Connection address=" + address + " token=" + lastSentTokenNumber;\r
+    }\r
+    boolean addressNotInitialized() {\r
+        return address.equals(InetSocketAddress.createUnresolved("", 0));\r
+    }\r
+    void register(Selector selector)\r
+    throws ClosedChannelException {\r
+        channel.register(selector, this);\r
+    }\r
+    InetSocketAddress getAddress() {\r
+        return address;\r
+    }\r
+    boolean equalsAddress(InetSocketAddress address) {\r
+        return address.equals(address);\r
+    }\r
+    void init4Connection(InetSocketAddress address) {\r
+        this.address = address;\r
+        if (null != channel && channel.isConnected())\r
+            channel.disconnect();\r
+        channel = new Channel();\r
+        connectionCreated = new CountDownLatch(1);\r
+        exception = null;\r
+        readBuffer = connectionThread.getInputBuffer();\r
+    }\r
+    void prepare4Connection(SocketChannel socketChannel)\r
+    throws ProCoreException, IOException {\r
+        if (isConnected())\r
+            throw new ProCoreException("Illegal state exception. Already connected.");\r
+        if (address.isUnresolved())\r
+            address = new InetSocketAddress(address.getHostName(), address.getPort());\r
+        if (address.getPort() == 0)\r
+            throw new ProCoreException("Port 0 not supported as connection address.");\r
+        channel.prepare4Connection(socketChannel, address);\r
+    }\r
+    void wait4Connection() throws InterruptedException {\r
+        connectionCreated.await();\r
+    }\r
+    synchronized void disconnect() {\r
+        if (!isConnected())\r
+            return;\r
+        channel.disconnect();\r
+    }\r
+    private void throwIOExceptionFromRead(IOException e)\r
+    throws IOException {\r
+        disconnect(); // This ensures that isConnected will be false after this.\r
+        if (null != e)\r
+            throw e;\r
+        throw new IOException("Failed to read.");\r
+    }\r
+    // Called by synchronized handleInput.\r
+    private void sendMessage(ByteBuffer byteBuffer)\r
+    throws IOException, InterruptedException {\r
+        int left = byteBuffer.remaining();\r
+        while (left > 0) {\r
+            ByteBuffer slice = byteBuffer.slice();\r
+            int size = Math.min(left, 1000000);\r
+            slice.limit(size);\r
+            sendBuffer(slice);\r
+            int position = byteBuffer.position() + size;\r
+            byteBuffer.position(position);\r
+            left -= size;\r
+        }\r
+    }\r
+    private void sendBuffer(ByteBuffer byteBuffer)\r
+    throws IOException, InterruptedException {\r
+        int left = byteBuffer.remaining();\r
+        while (left > 0) {\r
+            int n = channel.socket().write(byteBuffer);\r
+            if (n > 0) {\r
+                left -= n;\r
+                continue;\r
+            }\r
+            // System.err.println("############### YIELD ##############################");\r
+            Thread.yield();\r
+            if (left > 0)\r
+                Logger.defaultLogTrace("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining()\r
+                        + ", write return =" + n);\r
+        }\r
+    }\r
+    static public class MessageHeader {\r
+        public int token;\r
+        public int lastTokenIn;\r
+        public MessageHeader(int token, int lastTokenIn) {\r
+            this.token = token;\r
+            this.lastTokenIn = lastTokenIn;\r
+        }\r
+    }\r
+    // Called by synchronized call.\r
+    protected void call(MessageHeader messageHeader, Method method)\r
+    throws IOException, InterruptedException {\r
+        byte[] bytes = new byte[20];\r
+        ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20);\r
+        header.order(ByteOrder.LITTLE_ENDIAN);\r
+        header.putInt(messageHeader.lastTokenIn);\r
+        header.putInt(messageHeader.token);\r
+        int messageNumber = method.requestNumber;\r
+        header.putInt(messageNumber);\r
+        header.putInt(0); // inflate size, zero if no compression\r
+        ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order.\r
+        header.putInt(data.position());\r
+        header.clear();\r
+        synchronized (this) {\r
+            if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
+                Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber);\r
+            // Receiver expects header and data to form continuous block.\r
+            sendMessage(header);\r
+            data.limit(data.position());\r
+            data.rewind();\r
+            sendMessage(data);\r
+            if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
+                Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber);\r
+        }\r
+    }\r
+    void sendMethod(MethodQueue mq, Method m, int lastTokenIn)\r
+    throws ProCoreException, IOException, InterruptedException {\r
+        m.prepareForSendingRequest();\r
+        if (DEBUG)\r
+            Util.showDebug("Setting last token out=" + lastTokenIn);\r
+        synchronized (this) {\r
+            MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn);\r
+            m.setToken(messageHeader.token);\r
+            mq.add(m);\r
+            call(messageHeader, m);\r
+        }\r
+    }\r
+    // Called by session manager.\r
+    void onRead() throws IOException {\r
+        if (readBuffer.remaining() < 1)\r
+            throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead."));\r
+        int n = -1;\r
+        try {\r
+            n = channel.socket().read(readBuffer);\r
+        } catch (IOException e) {\r
+            throwIOExceptionFromRead(e);\r
+        }\r
+        if (n < 0) // Assuming that this is same as broken connection.\r
+            throwIOExceptionFromRead(new IOException("Failed to read."));\r
+        else if (n < 1)\r
+            return; // No data, no error.\r
+        try {\r
+            readBuffer = connectionThread.handleInput(readBuffer, n);\r
+        } catch (Throwable t) {\r
+            if (t instanceof IOException)\r
+                throw (IOException)t;\r
+            else\r
+                throw new IOException("Throwable from handleInput.", t);\r
+        }\r
+    }\r
+    // Called by session manager.\r
+    void onConnectFailed(ProCoreException exception) {\r
+        if (null != exception)\r
+            this.exception = exception;\r
+        else\r
+            this.exception = new NotConnectedException("Failed to create connection.");\r
+        connectionCreated.countDown();\r
+    }\r
+    // Called by session manager.\r
+    Connection onConnectSucceeded(SelectionKey key) {\r
+        Connection ret = null;\r
+        try {\r
+            channel.connect(key);\r
+            ret = this;\r
+        } catch (Throwable t) {\r
+            exception = new NotConnectedException("Failed to finalize connection.", t);\r
+            // Connection refused.\r
+        }\r
+        connectionCreated.countDown();\r
+        return ret;\r
+    }\r
+    // Called by session manager.\r
+    void onDisconnect() {\r
+        disconnect();\r
+    }\r
+    boolean isConnected() {\r
+        return channel.isConnected();\r
+    }\r
+//    boolean isStopping() {\r
+//    }\r
+    boolean isExcepted() {\r
+        return null == exception;\r
+    }\r
+    boolean isOk() {\r
+        return isConnected() && !isExcepted();\r
+    }\r
+}\r
+class HeaderData {\r
+    int lastTokenIn;\r
+    int token;\r
+    int messageNumber;\r
+    int inflateSize;\r
+    int deflateSize;\r
+}\r
+class ReceiveData {\r
+    public int lastTokenIn;\r
+    public int token;\r
+    public int messageNumber;\r
+//    public DataBuffer inflateBuffer;\r
+    public ByteBuffer buffer;\r
+    public int inflateSize;\r
+    ReceiveData(HeaderData headerData) {\r
+        lastTokenIn = headerData.lastTokenIn;\r
+        token = headerData.token;\r
+        messageNumber = headerData.messageNumber;\r
+    }\r
+}\r
+abstract class MethodHandler {\r
+    abstract void got(Packet packet, AbstractFunction function) throws ProCoreException;\r
+    DataBuffer getDataBuffer(Packet packet) {\r
+        packet.bytes.position(20);\r
+        DataBuffer db = new DataBuffer(packet.bytes, 0);\r
+        return db;\r
+    }\r
+}\r
+class ErrorHandler extends MethodHandler {\r
+    private final ProCoreException exception;\r
+    ErrorHandler(String s) {\r
+        if (null == s || s == "")\r
+            s = "Ilegal argument. Missing description for remote call exception.";\r
+        this.exception = new ProCoreException(s);\r
+    }\r
+    @Override\r
+    void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
+        throw exception;\r
+    }\r
+}\r
+class ExceptionHandler extends MethodHandler {\r
+    @Override\r
+    void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
+        packet.bytes.position(20);\r
+        ExceptionFunction exception = new ExceptionFunction();\r
+        exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0));\r
+        throw new ProCoreException(exception.what);\r
+    }\r
+}\r
+class DefaultHandler extends MethodHandler {\r
+    @Override\r
+    void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
+        function.deserialize(function.getResponseNumber(), getDataBuffer(packet));\r
+    }\r
+}\r
+class GetClusterHandler extends MethodHandler {\r
+    private final GetClusterNewFunction function;\r
+    public GetClusterHandler(final GetClusterNewFunction function) {\r
+        this.function = function;\r
+    }\r
+    @Override\r
+    void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
+        this.function.inflateSize = packet.header.inflateSize;\r
+        packet.bytes.position(20);\r
+        packet.bytes.limit(20 + packet.header.deflateSize);\r
+        this.function.deflated = packet.bytes.compact();\r
+        this.function.deflated.position(0);\r
+        this.function.deflated.limit(packet.header.deflateSize);\r
+    }\r
+}\r
+//class AckHandler extends MethodHandler {\r
+//    @Override\r
+//    void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
+//        packet.first.position(20);\r
+//        NullFunction ack = new NullFunction();\r
+//        ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0));\r
+//        Util.showDebug("ack message number=" + ack.messageNumber);\r
+//    }\r
+//}\r
+abstract class AbstractMethod {\r
+//  abstract ByteBuffer serialize(ByteOrder byteOrder);\r
+  protected final int requestNumber;\r
+  protected final int responseNumber;\r
+  protected int receivedNumber = 0;\r
+  boolean hasResponse() {\r
+      return responseNumber != 0;\r
+  }\r
+  private int token = 0;\r
+  void setToken(int token) {\r
+      this.token = token;\r
+  }\r
+  int getToken() {\r
+      return token;\r
+  }\r
+  AbstractMethod(int requestNumber, int responseNumber) {\r
+      this.requestNumber = requestNumber;\r
+      this.responseNumber = responseNumber;\r
+  }\r
+  void prepareForSendingRequest() {\r
+      receivedNumber = 0;\r
+  }\r
+}\r
+class Method extends AbstractMethod {\r
+    private final ExceptionHandler exceptionHandler = new ExceptionHandler();\r
+    private final AbstractFunction function;\r
+    private Packet packet = null;\r
+    private CountDownLatch gotPacket = new CountDownLatch(1);\r
+    private MethodHandler handler = new DefaultHandler();\r
+    private final EventHandler eventHandler;\r
+    Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) {\r
+        super(function.getRequestNumber(), function.getResponseNumber());\r
+        this.eventHandler = eventHandler;\r
+        if (null != handler)\r
+            this.handler = handler;\r
+        switch (function.getRequestNumber()) {\r
+            default:\r
+                break;\r
+            case MessageNumber.AcceptCommitRequest:\r
+            case MessageNumber.UpdateClusterRequest:\r
+                gotPacket.countDown();\r
+            break;\r
+        }\r
+        this.function = function;\r
+    } // MARK: waitforReply\r
+    void waitForReply(Connection c) throws ProCoreException, InterruptedException {\r
+        while (gotPacket.getCount() > 0) {\r
+            gotPacket.await(30, TimeUnit.SECONDS);\r
+            if (gotPacket.getCount() > 0) {\r
+                System.err.println("waitForReply timeout:");\r
+                System.err.println("wait token=" + getToken());\r
+                // MARK: c.onWait(getToken());\r
+            }\r
+        }\r
+        if (null != packet)\r
+            handler.got(packet, function);\r
+    }\r
+    EventHandler getEventHandler(EventHandler eventHandler) {\r
+        if (null != this.eventHandler)\r
+            return this.eventHandler;\r
+        else\r
+            return eventHandler;\r
+    }\r
+    ByteBuffer serialize(ByteOrder byteOrder) {\r
+        DataBuffer db = function.serialize(byteOrder);\r
+        return db.getByteBuffer();\r
+    }\r
+    void gotPacket(Packet packet) {\r
+        this.packet = packet;\r
+        gotPacket.countDown();\r
+    }\r
+    void gotException(String s) {\r
+        handler = new ErrorHandler(s);\r
+        gotPacket.countDown();\r
+    }\r
+    void gotException(Packet packet) {\r
+        this.packet = packet;\r
+        handler = exceptionHandler;\r
+        gotPacket.countDown();\r
+    }\r
+//    void gotAck(Packet packet) {\r
+//        this.packet = packet;\r
+//        handler = new AckHandler();\r
+//        gotPacket.countDown();\r
+//    }\r
+}\r
+class DebugPolicy {\r
+    static final boolean REPORT_DEBUG_EVENTS = false;\r
+    static final boolean REPORT_SERVER_EVENTS = false;\r
+    static final boolean REPORT_DEBUG_OPERATIONS = false;\r
+    static final boolean REPORT_TIME_CONSUMING_REQUESTS = false;\r
+    static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false;\r
+\r
+    private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS;\r
+    private static final long LONG_EXECUTION_REPORT_PERIOD = 10000;\r
+    static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD);\r
+\r
+    // Time to wait for an event (in milliseconds).\r
+    // Code logic should assume that this wait never expires.\r
+    // Should be big enough so that if this time is expired it indicates an error situation.\r
+    private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour\r
+    private static final boolean REPORT_WAIT_EVENTS = true;\r
+    private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true;\r
+    /**\r
+     * Calls object.wait.\r
+     * @param waitMs time to wait in milliseconds.\r
+     * @param object to wait for (used for calling object.wait)\r
+     * @param string text used in debug prints.\r
+     * @see java.lang.Object.wait\r
+     */\r
+    static final void wait(long waitMs, Object object, String string) {\r
+        waitImpl(waitMs, object, string);\r
+    }\r
+    /**\r
+     * Calls object.wait.\r
+     * @param object to wait for (used for calling object.wait)\r
+     * @param string text used in debug prints.\r
+     * @see java.lang.Object.wait\r
+     */\r
+    static final void wait(Object object, String string) {\r
+        waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string);\r
+    }\r
+    private static final void report(String string, Object object) {\r
+        String msg = "DEBUG: " + string\r
+        + " " + object.getClass().getSimpleName()\r
+        + " hid=" + System.identityHashCode(object);\r
+        if (REPORT_WAIT_EVENTS_VERBOSE)\r
+            new Throwable(msg).printStackTrace();\r
+        else\r
+            System.err.println(msg);\r
+    }\r
+    private static final void waitImpl(long waitMs, Object object, String string) {\r
+        if (REPORT_WAIT_EVENTS)\r
+            report("Wait for event started.", object);\r
+        try {\r
+            object.wait(waitMs); // milliseconds\r
+        } catch (InterruptedException e) {\r
+            if (REPORT_WAIT_EVENTS)\r
+                report("Wait for event interrupted.", object);\r
+        } finally {\r
+            if (REPORT_WAIT_EVENTS)\r
+                report("Wait for event stopped.", object);\r
+        }\r
+    }\r
+}\r