]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ConnectionManager.java
Goodbye db-client.log
[simantics/platform.git] / bundles / org.simantics.db.server / src / org / simantics / db / server / internal / ConnectionManager.java
index fb01a06d02ad56cceec49b32df91c9c098b600ea..8eb46f5df844c0c2950b364e936337f8d753f944 100644 (file)
-package org.simantics.db.server.internal;\r
-\r
-import java.io.IOException;\r
-import java.net.InetSocketAddress;\r
-import java.nio.ByteBuffer;\r
-import java.nio.ByteOrder;\r
-import java.nio.channels.CancelledKeyException;\r
-import java.nio.channels.ClosedChannelException;\r
-import java.nio.channels.SelectionKey;\r
-import java.nio.channels.Selector;\r
-import java.nio.channels.SocketChannel;\r
-import java.util.Collections;\r
-import java.util.Iterator;\r
-import java.util.LinkedList;\r
-import java.util.List;\r
-import java.util.concurrent.CountDownLatch;\r
-import java.util.concurrent.TimeUnit;\r
-\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.server.ProCoreException;\r
-import org.simantics.db.server.protocol.AbstractFunction;\r
-import org.simantics.db.server.protocol.DataBuffer;\r
-import org.simantics.db.server.protocol.ExceptionFunction;\r
-import org.simantics.db.server.protocol.GetClusterNewFunction;\r
-import org.simantics.db.server.protocol.MessageNumber;\r
-\r
-import gnu.trove.iterator.TIntObjectIterator;\r
-import gnu.trove.map.hash.TIntObjectHashMap;\r
-\r
-public class ConnectionManager {\r
-    private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS;\r
-    private static final ConnectionManager connectionManager = new ConnectionManager();\r
-    public static ConnectionManager getInstance() {\r
-        return connectionManager.instance();\r
-    }\r
-    private final Manager manager = new Manager();\r
-    private final Thread thread = new Thread(manager, "Connection Manager");\r
-    private boolean stop = false;\r
-    protected List<Connection> connections = Collections.synchronizedList(new LinkedList<Connection>());\r
-    private ConnectionManager() {\r
-        Util.trace("ConnectionManager.new");\r
-    }\r
-    private ConnectionManager instance() {\r
-        if (!thread.isAlive())\r
-            start();\r
-        return this;\r
-    }\r
-    protected class Manager implements Runnable {\r
-        private Selector selector;\r
-        private CountDownLatch selectorOpen = new CountDownLatch(1);\r
-        Manager() {\r
-        }\r
-        public void run() {\r
-            try {\r
-                selector = Selector.open();\r
-                selectorOpen.countDown();\r
-                while (true) {\r
-                    int n = selector.select();\r
-                    try {\r
-                        if (n > 0)\r
-                            processSelected();\r
-                        processConnectors();\r
-                        if (stop) {\r
-                            selector.close();\r
-                            return;\r
-                        }\r
-                    } catch (CancelledKeyException e) {\r
-                        Logger.defaultLogError("Cancelled key in select loop.", e);\r
-                    }\r
-                }\r
-            } catch (Throwable x) {\r
-                Logger.defaultLogError("Select loop failed.", x);\r
-            }\r
-        }\r
-        void processConnectors() throws ClosedChannelException {\r
-            while (connections.size() > 0) {\r
-                Connection c = connections.remove(0);\r
-                c.register(selector);\r
-            }\r
-        }\r
-        void processSelected() {\r
-            Iterator<SelectionKey> i = selector.selectedKeys().iterator();\r
-            while (i.hasNext()) {\r
-                SelectionKey sk = (SelectionKey)i.next();\r
-                i.remove();\r
-                try {\r
-                    if (!sk.isValid())\r
-                        processDisconnect(sk);\r
-                    else if (sk.isConnectable())\r
-                        processConnect(sk);\r
-                    else if (sk.isReadable())\r
-                        processRead(sk);\r
-                } catch (CancelledKeyException e) {\r
-                    processDisconnect(sk);\r
-                }\r
-            }\r
-        }\r
-        private void processDisconnect(SelectionKey key) {\r
-            Connection c = (Connection)key.attachment();\r
-            if (null == c)\r
-                return;\r
-            c.onDisconnect();\r
-            key.attach(null); // Just to make sure that reference to connection is released.\r
-            key.cancel();\r
-        }\r
-        private void processConnect(SelectionKey key) {\r
-                Connection connection = (Connection)key.attachment();\r
-                SocketChannel sc = (SocketChannel)key.channel();\r
-                try {\r
-                    if (sc.finishConnect()) {\r
-                        int ops = key.interestOps() & ~SelectionKey.OP_CONNECT;\r
-                        ops |= SelectionKey.OP_READ;\r
-                        key.interestOps(ops);\r
-                        Connection c = connection.onConnectSucceeded(key);\r
-                        if (c == null) { // Connector refused to accept connection.\r
-                            key.attach(null); // Just to make sure that reference to connector is released.\r
-                            key.cancel();\r
-                            sc.close();\r
-                        } else\r
-                            key.attach(c);\r
-                    }\r
-                } catch (Throwable x) {\r
-                    try {\r
-                        if (key.isValid())\r
-                            key.attach(null); // Just to make sure that reference to connector is released.\r
-                            key.cancel();\r
-                        try {\r
-                            sc.close();\r
-                        } catch (IOException e) {\r
-                            if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used.\r
-                                Logger.defaultLogError("Failed to close connection.", e);\r
-                        }\r
-                    } finally {\r
-                        connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x));\r
-                    }\r
-                }\r
-            }\r
-        }\r
-        private void processRead(SelectionKey key) {\r
-            Connection c = (Connection)key.attachment();\r
-            SocketChannel sc = (SocketChannel)key.channel();\r
-            try {\r
-                int ops = key.interestOps();\r
-                ops |= SelectionKey.OP_READ;\r
-                key.interestOps(ops);\r
-                c.onRead();\r
-            } catch (IOException x) {\r
-                if (key.isValid()) {\r
-                    key.attach(null); // Just to make sure that reference to connection is released.\r
-                    key.cancel();\r
-                }\r
-                if (sc.isOpen())\r
-                    try {\r
-                        sc.close();\r
-                    } catch (IOException e) {\r
-                    }\r
-            }\r
-        }\r
-        void connect(Connection connection) throws ProCoreException {\r
-            try {\r
-                if (connections.contains(connection))\r
-                    throw new ProCoreException("Connection already registered. connection=" + connection);\r
-                connection.prepare4Connection(SocketChannel.open());\r
-                connections.add(connection);\r
-                manager.selector.wakeup();\r
-                connection.wait4Connection();\r
-            } catch(IOException e) {\r
-                throw new ConnectionException("Failed to connect to " + connection + ".", e);\r
-            } catch (InterruptedException e) {\r
-                if (!connection.isConnected())\r
-                    throw new NotConnectedException("Connection interrupted to "  + connection + ".");\r
-            }\r
-        }\r
-    private void start() {\r
-        thread.start();\r
-        boolean once = true;\r
-        while (manager.selectorOpen.getCount() > 0) {\r
-            try {\r
-                manager.selectorOpen.await();\r
-            } catch (InterruptedException e) {\r
-                if (once) {\r
-                    once = false;\r
-                    Util.logError("Wait for selector open interrupted. Continuing wait.", e);\r
-                }\r
-            }\r
-        }\r
-    }\r
-    public void stop() {\r
-        stop = true;\r
-        manager.selector.wakeup();\r
-    }\r
-}\r
-//class MethodQueue {\r
-//    private LinkedList<Method> methods = new LinkedList<Method>();\r
-//    MethodQueue() {\r
-//    }\r
-//    public synchronized int size() {\r
-//        return methods.size();\r
-//    }\r
-//    public synchronized void addFirst(Method method) {\r
-//        methods.addFirst(method);\r
-//    }\r
-//    public synchronized void addLast(Method method) {\r
-//        methods.addLast(method);\r
-//    }\r
-//    public synchronized Method removeFirst() {\r
-//        try {\r
-//            Method method = methods.removeFirst();\r
-//            return method;\r
-//        } catch (NoSuchElementException e) {\r
-//            return null;\r
-//        }\r
-//    }\r
-//    public synchronized void close() {\r
-//        while (size() > 0) {\r
-//            Method method = removeFirst();\r
-//            method.setExceptionText("Closing.");\r
-//            method.gotResponse();\r
-//        }\r
-//    }\r
-//\r
-//}\r
-//class RequestQueue {\r
-//    private LinkedList<AbstractFunction> functions = new LinkedList<AbstractFunction>();\r
-//\r
-//    RequestQueue() {\r
-//    }\r
-//    public synchronized int size() {\r
-//        return functions.size();\r
-//    }\r
-//    public synchronized void addFirst(AbstractFunction e) {\r
-//        functions.addFirst(e);\r
-//    }\r
-//    public synchronized void addLast(AbstractFunction e) {\r
-//        functions.addLast(e);\r
-//    }\r
-//    public synchronized AbstractFunction removeFirst() {\r
-//        try {\r
-//            AbstractFunction ret = functions.removeFirst();\r
-//            return ret;\r
-//        } catch (NoSuchElementException e) {\r
-//            return null;\r
-//        }\r
-//    }\r
-//}\r
-class MethodQueue {\r
-    private TIntObjectHashMap<Method> methodMap = new TIntObjectHashMap<Method>();\r
-    MethodQueue() {\r
-    }\r
-    public synchronized int size() {\r
-        return methodMap.size();\r
-    }\r
-    public synchronized boolean isEmpty() {\r
-        return 0 == size();\r
-    }\r
-    public synchronized void add(Method method) throws ProCoreException {\r
-        int token = method.getToken();\r
-        if (0 == token)\r
-            throw new ProCoreException("Illegal token number.");\r
-        Method oldMethod = methodMap.put(token, method);\r
-        if (null != oldMethod)\r
-            throw new ProCoreException("Duplicate token number.");\r
-    }\r
-    public synchronized Method remove(int token) {\r
-        Method method = methodMap.remove(token);\r
-        return method;\r
-    }\r
-    synchronized void close() {\r
-        try {\r
-            TIntObjectIterator<Method> iterator = methodMap.iterator();\r
-            while (iterator.hasNext()) {\r
-              iterator.advance();\r
-              Method method = iterator.value();\r
-              if (null == method)\r
-                  Util.logError("Null function in method queue.");\r
-              method.gotException("Method queue closing.");\r
-          }\r
-        } finally {\r
-            methodMap.clear();\r
-        }\r
-    }\r
-//    public synchronized Method removeAny() throws ProCoreException {\r
-//        TIntObjectIterator<Method> iterator = methodMap.iterator();\r
-//        if (!iterator.hasNext())\r
-//            return null;\r
-//        iterator.advance();\r
-//        Method method = iterator.value();\r
-//        if (null == method)\r
-//            throw new ProCoreException("Null function in response queue.");\r
-//        methodMap.remove(method.getToken());\r
-//        return method;\r
-//    }\r
-}\r
-class Channel {\r
-    private SelectionKey selectionKey = null;\r
-    private SocketChannel socketChannel = null;\r
-    Channel() {\r
-    }\r
-    void register(Selector selector, Connection connection)\r
-    throws ClosedChannelException {\r
-        if (null == socketChannel)\r
-            throw new ClosedChannelException();\r
-        socketChannel.register(selector, SelectionKey.OP_CONNECT, connection);\r
-    }\r
-    void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address)\r
-    throws ProCoreException, IOException {\r
-        if (null == socketChannel)\r
-            throw new NotConnectedException("Illegal argument. No socket channel to connect to.");\r
-        if (null == address)\r
-            throw new NotConnectedException("Illegal argument. No address to connect to.");\r
-        if (null != this.socketChannel)\r
-            throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this);\r
-        socketChannel.configureBlocking(false);\r
-        boolean connected = socketChannel.connect(address);\r
-        if (connected)\r
-            if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
-                Util.showDebug("Connected to " + socketChannel + ".");\r
-        this.socketChannel = socketChannel; // Waiting for connect.\r
-    }\r
-//    private static final int IPTOS_LOWDELAY = 0x10;\r
-    synchronized void connect(SelectionKey key) {\r
-        this.selectionKey = key;\r
-        this.socketChannel = (SocketChannel)key.channel();\r
-        if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
-            Util.showDebug("Connected to " + socketChannel + ".");\r
-//        try {\r
-//            this.socketChannel.socket().setKeepAlive(true);\r
-//            this.socketChannel.socket().setSoLinger(true, 1);\r
-//        } catch (SocketException e) {\r
-//            e.printStackTrace();\r
-//        }\r
-//        try {\r
-//            this.socketChannel.socket().setTcpNoDelay(true);\r
-        // Crashes Kalle!\r
-        // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY);\r
-//        } catch (SocketException e) {\r
-//            e.printStackTrace();\r
-//        }\r
-    }\r
-    synchronized void disconnect() {\r
-        if (selectionKey == null)\r
-            return;\r
-        try {\r
-            try {\r
-                if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
-                    Util.showDebug("Disconnected from " + socketChannel);\r
-                selectionKey.interestOps(0);\r
-                selectionKey.cancel();\r
-                selectionKey.selector().wakeup();\r
-            } catch (CancelledKeyException e) {\r
-                if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
-                    Logger.defaultLogError("Failed to disconnect cleanly.", e);\r
-            } catch (IllegalArgumentException e) {\r
-                if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
-                    Logger.defaultLogError("Failed to disconnect cleanly.", e);\r
-            }\r
-            try { // Note that this must be after the selectionKey operations above.\r
-                socketChannel.socket().close();\r
-            } catch (IOException e) {\r
-                if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
-                    Logger.defaultLogError("Failed to close socket cleanly.", e);\r
-            }\r
-        } finally {\r
-            selectionKey = null;\r
-            socketChannel = null;\r
-        }\r
-    }\r
-    synchronized boolean isConnected() {\r
-        boolean ret = selectionKey != null && socketChannel.isConnected();\r
-        return ret;\r
-    }\r
-    synchronized SocketChannel socket() throws IOException {\r
-        if (null == socketChannel)\r
-            throw new IOException("Socket channel is null.");\r
-        return socketChannel;\r
-    }\r
-    synchronized void listenReadEvents()\r
-    throws IOException {\r
-        if (null == selectionKey)\r
-            throw new IOException("Trying to listen connection which is not connected.");\r
-        int ops = selectionKey.interestOps() | SelectionKey.OP_READ;\r
-        selectionKey.interestOps(ops);\r
-        selectionKey.selector().wakeup();\r
-    }\r
-}\r
-class Connection {\r
-    private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS;\r
-    private static int lastSentTokenNumber = 0;\r
-    private final ConnectionThread connectionThread;\r
-    private InetSocketAddress address;\r
-    private Channel channel;\r
-    private CountDownLatch connectionCreated;\r
-    private ProCoreException exception;\r
-    private ByteBuffer readBuffer;\r
-    Connection(ConnectionThread connectionThread) {\r
-        this.connectionThread = connectionThread;\r
-        init4Connection(InetSocketAddress.createUnresolved("", 0));\r
-    }\r
-    @Override\r
-    public String toString() {\r
-        return "Connection address=" + address + " token=" + lastSentTokenNumber;\r
-    }\r
-    boolean addressNotInitialized() {\r
-        return address.equals(InetSocketAddress.createUnresolved("", 0));\r
-    }\r
-    void register(Selector selector)\r
-    throws ClosedChannelException {\r
-        channel.register(selector, this);\r
-    }\r
-    InetSocketAddress getAddress() {\r
-        return address;\r
-    }\r
-    boolean equalsAddress(InetSocketAddress address) {\r
-        return address.equals(address);\r
-    }\r
-    void init4Connection(InetSocketAddress address) {\r
-        this.address = address;\r
-        if (null != channel && channel.isConnected())\r
-            channel.disconnect();\r
-        channel = new Channel();\r
-        connectionCreated = new CountDownLatch(1);\r
-        exception = null;\r
-        readBuffer = connectionThread.getInputBuffer();\r
-    }\r
-    void prepare4Connection(SocketChannel socketChannel)\r
-    throws ProCoreException, IOException {\r
-        if (isConnected())\r
-            throw new ProCoreException("Illegal state exception. Already connected.");\r
-        if (address.isUnresolved())\r
-            address = new InetSocketAddress(address.getHostName(), address.getPort());\r
-        if (address.getPort() == 0)\r
-            throw new ProCoreException("Port 0 not supported as connection address.");\r
-        channel.prepare4Connection(socketChannel, address);\r
-    }\r
-    void wait4Connection() throws InterruptedException {\r
-        connectionCreated.await();\r
-    }\r
-    synchronized void disconnect() {\r
-        if (!isConnected())\r
-            return;\r
-        channel.disconnect();\r
-    }\r
-    private void throwIOExceptionFromRead(IOException e)\r
-    throws IOException {\r
-        disconnect(); // This ensures that isConnected will be false after this.\r
-        if (null != e)\r
-            throw e;\r
-        throw new IOException("Failed to read.");\r
-    }\r
-    // Called by synchronized handleInput.\r
-    private void sendMessage(ByteBuffer byteBuffer)\r
-    throws IOException, InterruptedException {\r
-        int left = byteBuffer.remaining();\r
-        while (left > 0) {\r
-            ByteBuffer slice = byteBuffer.slice();\r
-            int size = Math.min(left, 1000000);\r
-            slice.limit(size);\r
-            sendBuffer(slice);\r
-            int position = byteBuffer.position() + size;\r
-            byteBuffer.position(position);\r
-            left -= size;\r
-        }\r
-    }\r
-    private void sendBuffer(ByteBuffer byteBuffer)\r
-    throws IOException, InterruptedException {\r
-        int left = byteBuffer.remaining();\r
-        while (left > 0) {\r
-            int n = channel.socket().write(byteBuffer);\r
-            if (n > 0) {\r
-                left -= n;\r
-                continue;\r
-            }\r
-            // System.err.println("############### YIELD ##############################");\r
-            Thread.yield();\r
-            if (left > 0)\r
-                Logger.defaultLogTrace("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining()\r
-                        + ", write return =" + n);\r
-        }\r
-    }\r
-    static public class MessageHeader {\r
-        public int token;\r
-        public int lastTokenIn;\r
-        public MessageHeader(int token, int lastTokenIn) {\r
-            this.token = token;\r
-            this.lastTokenIn = lastTokenIn;\r
-        }\r
-    }\r
-    // Called by synchronized call.\r
-    protected void call(MessageHeader messageHeader, Method method)\r
-    throws IOException, InterruptedException {\r
-        byte[] bytes = new byte[20];\r
-        ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20);\r
-        header.order(ByteOrder.LITTLE_ENDIAN);\r
-        header.putInt(messageHeader.lastTokenIn);\r
-        header.putInt(messageHeader.token);\r
-        int messageNumber = method.requestNumber;\r
-        header.putInt(messageNumber);\r
-        header.putInt(0); // inflate size, zero if no compression\r
-        ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order.\r
-        header.putInt(data.position());\r
-        header.clear();\r
-        synchronized (this) {\r
-            if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
-                Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber);\r
-            // Receiver expects header and data to form continuous block.\r
-            sendMessage(header);\r
-            data.limit(data.position());\r
-            data.rewind();\r
-            sendMessage(data);\r
-            if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
-                Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber);\r
-        }\r
-    }\r
-    void sendMethod(MethodQueue mq, Method m, int lastTokenIn)\r
-    throws ProCoreException, IOException, InterruptedException {\r
-        m.prepareForSendingRequest();\r
-        if (DEBUG)\r
-            Util.showDebug("Setting last token out=" + lastTokenIn);\r
-        synchronized (this) {\r
-            MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn);\r
-            m.setToken(messageHeader.token);\r
-            mq.add(m);\r
-            call(messageHeader, m);\r
-        }\r
-    }\r
-    // Called by session manager.\r
-    void onRead() throws IOException {\r
-        if (readBuffer.remaining() < 1)\r
-            throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead."));\r
-        int n = -1;\r
-        try {\r
-            n = channel.socket().read(readBuffer);\r
-        } catch (IOException e) {\r
-            throwIOExceptionFromRead(e);\r
-        }\r
-        if (n < 0) // Assuming that this is same as broken connection.\r
-            throwIOExceptionFromRead(new IOException("Failed to read."));\r
-        else if (n < 1)\r
-            return; // No data, no error.\r
-        try {\r
-            readBuffer = connectionThread.handleInput(readBuffer, n);\r
-        } catch (Throwable t) {\r
-            if (t instanceof IOException)\r
-                throw (IOException)t;\r
-            else\r
-                throw new IOException("Throwable from handleInput.", t);\r
-        }\r
-    }\r
-    // Called by session manager.\r
-    void onConnectFailed(ProCoreException exception) {\r
-        if (null != exception)\r
-            this.exception = exception;\r
-        else\r
-            this.exception = new NotConnectedException("Failed to create connection.");\r
-        connectionCreated.countDown();\r
-    }\r
-    // Called by session manager.\r
-    Connection onConnectSucceeded(SelectionKey key) {\r
-        Connection ret = null;\r
-        try {\r
-            channel.connect(key);\r
-            ret = this;\r
-        } catch (Throwable t) {\r
-            exception = new NotConnectedException("Failed to finalize connection.", t);\r
-            // Connection refused.\r
-        }\r
-        connectionCreated.countDown();\r
-        return ret;\r
-    }\r
-    // Called by session manager.\r
-    void onDisconnect() {\r
-        disconnect();\r
-    }\r
-    boolean isConnected() {\r
-        return channel.isConnected();\r
-    }\r
-//    boolean isStopping() {\r
-//    }\r
-    boolean isExcepted() {\r
-        return null == exception;\r
-    }\r
-    boolean isOk() {\r
-        return isConnected() && !isExcepted();\r
-    }\r
-}\r
-class HeaderData {\r
-    int lastTokenIn;\r
-    int token;\r
-    int messageNumber;\r
-    int inflateSize;\r
-    int deflateSize;\r
-}\r
-class ReceiveData {\r
-    public int lastTokenIn;\r
-    public int token;\r
-    public int messageNumber;\r
-//    public DataBuffer inflateBuffer;\r
-    public ByteBuffer buffer;\r
-    public int inflateSize;\r
-    ReceiveData(HeaderData headerData) {\r
-        lastTokenIn = headerData.lastTokenIn;\r
-        token = headerData.token;\r
-        messageNumber = headerData.messageNumber;\r
-    }\r
-}\r
-abstract class MethodHandler {\r
-    abstract void got(Packet packet, AbstractFunction function) throws ProCoreException;\r
-    DataBuffer getDataBuffer(Packet packet) {\r
-        packet.bytes.position(20);\r
-        DataBuffer db = new DataBuffer(packet.bytes, 0);\r
-        return db;\r
-    }\r
-}\r
-class ErrorHandler extends MethodHandler {\r
-    private final ProCoreException exception;\r
-    ErrorHandler(String s) {\r
-        if (null == s || s == "")\r
-            s = "Ilegal argument. Missing description for remote call exception.";\r
-        this.exception = new ProCoreException(s);\r
-    }\r
-    @Override\r
-    void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
-        throw exception;\r
-    }\r
-}\r
-class ExceptionHandler extends MethodHandler {\r
-    @Override\r
-    void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
-        packet.bytes.position(20);\r
-        ExceptionFunction exception = new ExceptionFunction();\r
-        exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0));\r
-        throw new ProCoreException(exception.what);\r
-    }\r
-}\r
-class DefaultHandler extends MethodHandler {\r
-    @Override\r
-    void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
-        function.deserialize(function.getResponseNumber(), getDataBuffer(packet));\r
-    }\r
-}\r
-class GetClusterHandler extends MethodHandler {\r
-    private final GetClusterNewFunction function;\r
-    public GetClusterHandler(final GetClusterNewFunction function) {\r
-        this.function = function;\r
-    }\r
-    @Override\r
-    void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
-        this.function.inflateSize = packet.header.inflateSize;\r
-        packet.bytes.position(20);\r
-        packet.bytes.limit(20 + packet.header.deflateSize);\r
-        this.function.deflated = packet.bytes.compact();\r
-        this.function.deflated.position(0);\r
-        this.function.deflated.limit(packet.header.deflateSize);\r
-    }\r
-}\r
-//class AckHandler extends MethodHandler {\r
-//    @Override\r
-//    void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
-//        packet.first.position(20);\r
-//        NullFunction ack = new NullFunction();\r
-//        ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0));\r
-//        Util.showDebug("ack message number=" + ack.messageNumber);\r
-//    }\r
-//}\r
-abstract class AbstractMethod {\r
-//  abstract ByteBuffer serialize(ByteOrder byteOrder);\r
-  protected final int requestNumber;\r
-  protected final int responseNumber;\r
-  protected int receivedNumber = 0;\r
-  boolean hasResponse() {\r
-      return responseNumber != 0;\r
-  }\r
-  private int token = 0;\r
-  void setToken(int token) {\r
-      this.token = token;\r
-  }\r
-  int getToken() {\r
-      return token;\r
-  }\r
-  AbstractMethod(int requestNumber, int responseNumber) {\r
-      this.requestNumber = requestNumber;\r
-      this.responseNumber = responseNumber;\r
-  }\r
-  void prepareForSendingRequest() {\r
-      receivedNumber = 0;\r
-  }\r
-}\r
-class Method extends AbstractMethod {\r
-    private final ExceptionHandler exceptionHandler = new ExceptionHandler();\r
-    private final AbstractFunction function;\r
-    private Packet packet = null;\r
-    private CountDownLatch gotPacket = new CountDownLatch(1);\r
-    private MethodHandler handler = new DefaultHandler();\r
-    private final EventHandler eventHandler;\r
-    Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) {\r
-        super(function.getRequestNumber(), function.getResponseNumber());\r
-        this.eventHandler = eventHandler;\r
-        if (null != handler)\r
-            this.handler = handler;\r
-        switch (function.getRequestNumber()) {\r
-            default:\r
-                break;\r
-            case MessageNumber.AcceptCommitRequest:\r
-            case MessageNumber.UpdateClusterRequest:\r
-                gotPacket.countDown();\r
-            break;\r
-        }\r
-        this.function = function;\r
-    } // MARK: waitforReply\r
-    void waitForReply(Connection c) throws ProCoreException, InterruptedException {\r
-        while (gotPacket.getCount() > 0) {\r
-            gotPacket.await(30, TimeUnit.SECONDS);\r
-            if (gotPacket.getCount() > 0) {\r
-                System.err.println("waitForReply timeout:");\r
-                System.err.println("wait token=" + getToken());\r
-                // MARK: c.onWait(getToken());\r
-            }\r
-        }\r
-        if (null != packet)\r
-            handler.got(packet, function);\r
-    }\r
-    EventHandler getEventHandler(EventHandler eventHandler) {\r
-        if (null != this.eventHandler)\r
-            return this.eventHandler;\r
-        else\r
-            return eventHandler;\r
-    }\r
-    ByteBuffer serialize(ByteOrder byteOrder) {\r
-        DataBuffer db = function.serialize(byteOrder);\r
-        return db.getByteBuffer();\r
-    }\r
-    void gotPacket(Packet packet) {\r
-        this.packet = packet;\r
-        gotPacket.countDown();\r
-    }\r
-    void gotException(String s) {\r
-        handler = new ErrorHandler(s);\r
-        gotPacket.countDown();\r
-    }\r
-    void gotException(Packet packet) {\r
-        this.packet = packet;\r
-        handler = exceptionHandler;\r
-        gotPacket.countDown();\r
-    }\r
-//    void gotAck(Packet packet) {\r
-//        this.packet = packet;\r
-//        handler = new AckHandler();\r
-//        gotPacket.countDown();\r
-//    }\r
-}\r
-class DebugPolicy {\r
-    static final boolean REPORT_DEBUG_EVENTS = false;\r
-    static final boolean REPORT_SERVER_EVENTS = false;\r
-    static final boolean REPORT_DEBUG_OPERATIONS = false;\r
-    static final boolean REPORT_TIME_CONSUMING_REQUESTS = false;\r
-    static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false;\r
-\r
-    private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS;\r
-    private static final long LONG_EXECUTION_REPORT_PERIOD = 10000;\r
-    static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD);\r
-\r
-    // Time to wait for an event (in milliseconds).\r
-    // Code logic should assume that this wait never expires.\r
-    // Should be big enough so that if this time is expired it indicates an error situation.\r
-    private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour\r
-    private static final boolean REPORT_WAIT_EVENTS = true;\r
-    private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true;\r
-    /**\r
-     * Calls object.wait.\r
-     * @param waitMs time to wait in milliseconds.\r
-     * @param object to wait for (used for calling object.wait)\r
-     * @param string text used in debug prints.\r
-     * @see java.lang.Object.wait\r
-     */\r
-    static final void wait(long waitMs, Object object, String string) {\r
-        waitImpl(waitMs, object, string);\r
-    }\r
-    /**\r
-     * Calls object.wait.\r
-     * @param object to wait for (used for calling object.wait)\r
-     * @param string text used in debug prints.\r
-     * @see java.lang.Object.wait\r
-     */\r
-    static final void wait(Object object, String string) {\r
-        waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string);\r
-    }\r
-    private static final void report(String string, Object object) {\r
-        String msg = "DEBUG: " + string\r
-        + " " + object.getClass().getSimpleName()\r
-        + " hid=" + System.identityHashCode(object);\r
-        if (REPORT_WAIT_EVENTS_VERBOSE)\r
-            new Throwable(msg).printStackTrace();\r
-        else\r
-            System.err.println(msg);\r
-    }\r
-    private static final void waitImpl(long waitMs, Object object, String string) {\r
-        if (REPORT_WAIT_EVENTS)\r
-            report("Wait for event started.", object);\r
-        try {\r
-            object.wait(waitMs); // milliseconds\r
-        } catch (InterruptedException e) {\r
-            if (REPORT_WAIT_EVENTS)\r
-                report("Wait for event interrupted.", object);\r
-        } finally {\r
-            if (REPORT_WAIT_EVENTS)\r
-                report("Wait for event stopped.", object);\r
-        }\r
-    }\r
-}\r
+package org.simantics.db.server.internal;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.db.server.protocol.AbstractFunction;
+import org.simantics.db.server.protocol.DataBuffer;
+import org.simantics.db.server.protocol.ExceptionFunction;
+import org.simantics.db.server.protocol.GetClusterNewFunction;
+import org.simantics.db.server.protocol.MessageNumber;
+
+import gnu.trove.iterator.TIntObjectIterator;
+import gnu.trove.map.hash.TIntObjectHashMap;
+
+public class ConnectionManager {
+    private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS;
+    private static final ConnectionManager connectionManager = new ConnectionManager();
+    public static ConnectionManager getInstance() {
+        return connectionManager.instance();
+    }
+    private final Manager manager = new Manager();
+    private final Thread thread = new Thread(manager, "Connection Manager");
+    private boolean stop = false;
+    protected List<Connection> connections = Collections.synchronizedList(new LinkedList<Connection>());
+    private ConnectionManager() {
+        Util.trace("ConnectionManager.new");
+    }
+    private ConnectionManager instance() {
+        if (!thread.isAlive())
+            start();
+        return this;
+    }
+    protected class Manager implements Runnable {
+        private Selector selector;
+        private CountDownLatch selectorOpen = new CountDownLatch(1);
+        Manager() {
+        }
+        public void run() {
+            try {
+                selector = Selector.open();
+                selectorOpen.countDown();
+                while (true) {
+                    int n = selector.select();
+                    try {
+                        if (n > 0)
+                            processSelected();
+                        processConnectors();
+                        if (stop) {
+                            selector.close();
+                            return;
+                        }
+                    } catch (CancelledKeyException e) {
+                        Logger.defaultLogError("Cancelled key in select loop.", e);
+                    }
+                }
+            } catch (Throwable x) {
+                Logger.defaultLogError("Select loop failed.", x);
+            }
+        }
+        void processConnectors() throws ClosedChannelException {
+            while (connections.size() > 0) {
+                Connection c = connections.remove(0);
+                c.register(selector);
+            }
+        }
+        void processSelected() {
+            Iterator<SelectionKey> i = selector.selectedKeys().iterator();
+            while (i.hasNext()) {
+                SelectionKey sk = (SelectionKey)i.next();
+                i.remove();
+                try {
+                    if (!sk.isValid())
+                        processDisconnect(sk);
+                    else if (sk.isConnectable())
+                        processConnect(sk);
+                    else if (sk.isReadable())
+                        processRead(sk);
+                } catch (CancelledKeyException e) {
+                    processDisconnect(sk);
+                }
+            }
+        }
+        private void processDisconnect(SelectionKey key) {
+            Connection c = (Connection)key.attachment();
+            if (null == c)
+                return;
+            c.onDisconnect();
+            key.attach(null); // Just to make sure that reference to connection is released.
+            key.cancel();
+        }
+        private void processConnect(SelectionKey key) {
+                Connection connection = (Connection)key.attachment();
+                SocketChannel sc = (SocketChannel)key.channel();
+                try {
+                    if (sc.finishConnect()) {
+                        int ops = key.interestOps() & ~SelectionKey.OP_CONNECT;
+                        ops |= SelectionKey.OP_READ;
+                        key.interestOps(ops);
+                        Connection c = connection.onConnectSucceeded(key);
+                        if (c == null) { // Connector refused to accept connection.
+                            key.attach(null); // Just to make sure that reference to connector is released.
+                            key.cancel();
+                            sc.close();
+                        } else
+                            key.attach(c);
+                    }
+                } catch (Throwable x) {
+                    try {
+                        if (key.isValid())
+                            key.attach(null); // Just to make sure that reference to connector is released.
+                            key.cancel();
+                        try {
+                            sc.close();
+                        } catch (IOException e) {
+                            if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used.
+                                Logger.defaultLogError("Failed to close connection.", e);
+                        }
+                    } finally {
+                        connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x));
+                    }
+                }
+            }
+        }
+        private void processRead(SelectionKey key) {
+            Connection c = (Connection)key.attachment();
+            SocketChannel sc = (SocketChannel)key.channel();
+            try {
+                int ops = key.interestOps();
+                ops |= SelectionKey.OP_READ;
+                key.interestOps(ops);
+                c.onRead();
+            } catch (IOException x) {
+                if (key.isValid()) {
+                    key.attach(null); // Just to make sure that reference to connection is released.
+                    key.cancel();
+                }
+                if (sc.isOpen())
+                    try {
+                        sc.close();
+                    } catch (IOException e) {
+                    }
+            }
+        }
+        void connect(Connection connection) throws ProCoreException {
+            try {
+                if (connections.contains(connection))
+                    throw new ProCoreException("Connection already registered. connection=" + connection);
+                connection.prepare4Connection(SocketChannel.open());
+                connections.add(connection);
+                manager.selector.wakeup();
+                connection.wait4Connection();
+            } catch(IOException e) {
+                throw new ConnectionException("Failed to connect to " + connection + ".", e);
+            } catch (InterruptedException e) {
+                if (!connection.isConnected())
+                    throw new NotConnectedException("Connection interrupted to "  + connection + ".");
+            }
+        }
+    private void start() {
+        thread.start();
+        boolean once = true;
+        while (manager.selectorOpen.getCount() > 0) {
+            try {
+                manager.selectorOpen.await();
+            } catch (InterruptedException e) {
+                if (once) {
+                    once = false;
+                    Util.logError("Wait for selector open interrupted. Continuing wait.", e);
+                }
+            }
+        }
+    }
+    public void stop() {
+        stop = true;
+        manager.selector.wakeup();
+    }
+}
+//class MethodQueue {
+//    private LinkedList<Method> methods = new LinkedList<Method>();
+//    MethodQueue() {
+//    }
+//    public synchronized int size() {
+//        return methods.size();
+//    }
+//    public synchronized void addFirst(Method method) {
+//        methods.addFirst(method);
+//    }
+//    public synchronized void addLast(Method method) {
+//        methods.addLast(method);
+//    }
+//    public synchronized Method removeFirst() {
+//        try {
+//            Method method = methods.removeFirst();
+//            return method;
+//        } catch (NoSuchElementException e) {
+//            return null;
+//        }
+//    }
+//    public synchronized void close() {
+//        while (size() > 0) {
+//            Method method = removeFirst();
+//            method.setExceptionText("Closing.");
+//            method.gotResponse();
+//        }
+//    }
+//
+//}
+//class RequestQueue {
+//    private LinkedList<AbstractFunction> functions = new LinkedList<AbstractFunction>();
+//
+//    RequestQueue() {
+//    }
+//    public synchronized int size() {
+//        return functions.size();
+//    }
+//    public synchronized void addFirst(AbstractFunction e) {
+//        functions.addFirst(e);
+//    }
+//    public synchronized void addLast(AbstractFunction e) {
+//        functions.addLast(e);
+//    }
+//    public synchronized AbstractFunction removeFirst() {
+//        try {
+//            AbstractFunction ret = functions.removeFirst();
+//            return ret;
+//        } catch (NoSuchElementException e) {
+//            return null;
+//        }
+//    }
+//}
+class MethodQueue {
+    private TIntObjectHashMap<Method> methodMap = new TIntObjectHashMap<Method>();
+    MethodQueue() {
+    }
+    public synchronized int size() {
+        return methodMap.size();
+    }
+    public synchronized boolean isEmpty() {
+        return 0 == size();
+    }
+    public synchronized void add(Method method) throws ProCoreException {
+        int token = method.getToken();
+        if (0 == token)
+            throw new ProCoreException("Illegal token number.");
+        Method oldMethod = methodMap.put(token, method);
+        if (null != oldMethod)
+            throw new ProCoreException("Duplicate token number.");
+    }
+    public synchronized Method remove(int token) {
+        Method method = methodMap.remove(token);
+        return method;
+    }
+    synchronized void close() {
+        try {
+            TIntObjectIterator<Method> iterator = methodMap.iterator();
+            while (iterator.hasNext()) {
+              iterator.advance();
+              Method method = iterator.value();
+              if (null == method)
+                  Util.logError("Null function in method queue.");
+              method.gotException("Method queue closing.");
+          }
+        } finally {
+            methodMap.clear();
+        }
+    }
+//    public synchronized Method removeAny() throws ProCoreException {
+//        TIntObjectIterator<Method> iterator = methodMap.iterator();
+//        if (!iterator.hasNext())
+//            return null;
+//        iterator.advance();
+//        Method method = iterator.value();
+//        if (null == method)
+//            throw new ProCoreException("Null function in response queue.");
+//        methodMap.remove(method.getToken());
+//        return method;
+//    }
+}
+class Channel {
+    private SelectionKey selectionKey = null;
+    private SocketChannel socketChannel = null;
+    Channel() {
+    }
+    void register(Selector selector, Connection connection)
+    throws ClosedChannelException {
+        if (null == socketChannel)
+            throw new ClosedChannelException();
+        socketChannel.register(selector, SelectionKey.OP_CONNECT, connection);
+    }
+    void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address)
+    throws ProCoreException, IOException {
+        if (null == socketChannel)
+            throw new NotConnectedException("Illegal argument. No socket channel to connect to.");
+        if (null == address)
+            throw new NotConnectedException("Illegal argument. No address to connect to.");
+        if (null != this.socketChannel)
+            throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this);
+        socketChannel.configureBlocking(false);
+        boolean connected = socketChannel.connect(address);
+        if (connected)
+            if (DebugPolicy.REPORT_DEBUG_EVENTS)
+                Util.showDebug("Connected to " + socketChannel + ".");
+        this.socketChannel = socketChannel; // Waiting for connect.
+    }
+//    private static final int IPTOS_LOWDELAY = 0x10;
+    synchronized void connect(SelectionKey key) {
+        this.selectionKey = key;
+        this.socketChannel = (SocketChannel)key.channel();
+        if (DebugPolicy.REPORT_DEBUG_EVENTS)
+            Util.showDebug("Connected to " + socketChannel + ".");
+//        try {
+//            this.socketChannel.socket().setKeepAlive(true);
+//            this.socketChannel.socket().setSoLinger(true, 1);
+//        } catch (SocketException e) {
+//            e.printStackTrace();
+//        }
+//        try {
+//            this.socketChannel.socket().setTcpNoDelay(true);
+        // Crashes Kalle!
+        // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY);
+//        } catch (SocketException e) {
+//            e.printStackTrace();
+//        }
+    }
+    synchronized void disconnect() {
+        if (selectionKey == null)
+            return;
+        try {
+            try {
+                if (DebugPolicy.REPORT_DEBUG_EVENTS)
+                    Util.showDebug("Disconnected from " + socketChannel);
+                selectionKey.interestOps(0);
+                selectionKey.cancel();
+                selectionKey.selector().wakeup();
+            } catch (CancelledKeyException e) {
+                if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
+                    Logger.defaultLogError("Failed to disconnect cleanly.", e);
+            } catch (IllegalArgumentException e) {
+                if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
+                    Logger.defaultLogError("Failed to disconnect cleanly.", e);
+            }
+            try { // Note that this must be after the selectionKey operations above.
+                socketChannel.socket().close();
+            } catch (IOException e) {
+                if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
+                    Logger.defaultLogError("Failed to close socket cleanly.", e);
+            }
+        } finally {
+            selectionKey = null;
+            socketChannel = null;
+        }
+    }
+    synchronized boolean isConnected() {
+        boolean ret = selectionKey != null && socketChannel.isConnected();
+        return ret;
+    }
+    synchronized SocketChannel socket() throws IOException {
+        if (null == socketChannel)
+            throw new IOException("Socket channel is null.");
+        return socketChannel;
+    }
+    synchronized void listenReadEvents()
+    throws IOException {
+        if (null == selectionKey)
+            throw new IOException("Trying to listen connection which is not connected.");
+        int ops = selectionKey.interestOps() | SelectionKey.OP_READ;
+        selectionKey.interestOps(ops);
+        selectionKey.selector().wakeup();
+    }
+}
+class Connection {
+    private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS;
+    private static int lastSentTokenNumber = 0;
+    private final ConnectionThread connectionThread;
+    private InetSocketAddress address;
+    private Channel channel;
+    private CountDownLatch connectionCreated;
+    private ProCoreException exception;
+    private ByteBuffer readBuffer;
+    Connection(ConnectionThread connectionThread) {
+        this.connectionThread = connectionThread;
+        init4Connection(InetSocketAddress.createUnresolved("", 0));
+    }
+    @Override
+    public String toString() {
+        return "Connection address=" + address + " token=" + lastSentTokenNumber;
+    }
+    boolean addressNotInitialized() {
+        return address.equals(InetSocketAddress.createUnresolved("", 0));
+    }
+    void register(Selector selector)
+    throws ClosedChannelException {
+        channel.register(selector, this);
+    }
+    InetSocketAddress getAddress() {
+        return address;
+    }
+    boolean equalsAddress(InetSocketAddress address) {
+        return address.equals(address);
+    }
+    void init4Connection(InetSocketAddress address) {
+        this.address = address;
+        if (null != channel && channel.isConnected())
+            channel.disconnect();
+        channel = new Channel();
+        connectionCreated = new CountDownLatch(1);
+        exception = null;
+        readBuffer = connectionThread.getInputBuffer();
+    }
+    void prepare4Connection(SocketChannel socketChannel)
+    throws ProCoreException, IOException {
+        if (isConnected())
+            throw new ProCoreException("Illegal state exception. Already connected.");
+        if (address.isUnresolved())
+            address = new InetSocketAddress(address.getHostName(), address.getPort());
+        if (address.getPort() == 0)
+            throw new ProCoreException("Port 0 not supported as connection address.");
+        channel.prepare4Connection(socketChannel, address);
+    }
+    void wait4Connection() throws InterruptedException {
+        connectionCreated.await();
+    }
+    synchronized void disconnect() {
+        if (!isConnected())
+            return;
+        channel.disconnect();
+    }
+    private void throwIOExceptionFromRead(IOException e)
+    throws IOException {
+        disconnect(); // This ensures that isConnected will be false after this.
+        if (null != e)
+            throw e;
+        throw new IOException("Failed to read.");
+    }
+    // Called by synchronized handleInput.
+    private void sendMessage(ByteBuffer byteBuffer)
+    throws IOException, InterruptedException {
+        int left = byteBuffer.remaining();
+        while (left > 0) {
+            ByteBuffer slice = byteBuffer.slice();
+            int size = Math.min(left, 1000000);
+            slice.limit(size);
+            sendBuffer(slice);
+            int position = byteBuffer.position() + size;
+            byteBuffer.position(position);
+            left -= size;
+        }
+    }
+    private void sendBuffer(ByteBuffer byteBuffer)
+    throws IOException, InterruptedException {
+        int left = byteBuffer.remaining();
+        while (left > 0) {
+            int n = channel.socket().write(byteBuffer);
+            if (n > 0) {
+                left -= n;
+                continue;
+            }
+            // System.err.println("############### YIELD ##############################");
+            Thread.yield();
+            if (left > 0)
+                Logger.defaultLogInfo("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining()
+                        + ", write return =" + n);
+        }
+    }
+    static public class MessageHeader {
+        public int token;
+        public int lastTokenIn;
+        public MessageHeader(int token, int lastTokenIn) {
+            this.token = token;
+            this.lastTokenIn = lastTokenIn;
+        }
+    }
+    // Called by synchronized call.
+    protected void call(MessageHeader messageHeader, Method method)
+    throws IOException, InterruptedException {
+        byte[] bytes = new byte[20];
+        ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20);
+        header.order(ByteOrder.LITTLE_ENDIAN);
+        header.putInt(messageHeader.lastTokenIn);
+        header.putInt(messageHeader.token);
+        int messageNumber = method.requestNumber;
+        header.putInt(messageNumber);
+        header.putInt(0); // inflate size, zero if no compression
+        ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order.
+        header.putInt(data.position());
+        header.clear();
+        synchronized (this) {
+            if (DebugPolicy.REPORT_DEBUG_EVENTS)
+                Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber);
+            // Receiver expects header and data to form continuous block.
+            sendMessage(header);
+            data.limit(data.position());
+            data.rewind();
+            sendMessage(data);
+            if (DebugPolicy.REPORT_DEBUG_EVENTS)
+                Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber);
+        }
+    }
+    void sendMethod(MethodQueue mq, Method m, int lastTokenIn)
+    throws ProCoreException, IOException, InterruptedException {
+        m.prepareForSendingRequest();
+        if (DEBUG)
+            Util.showDebug("Setting last token out=" + lastTokenIn);
+        synchronized (this) {
+            MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn);
+            m.setToken(messageHeader.token);
+            mq.add(m);
+            call(messageHeader, m);
+        }
+    }
+    // Called by session manager.
+    void onRead() throws IOException {
+        if (readBuffer.remaining() < 1)
+            throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead."));
+        int n = -1;
+        try {
+            n = channel.socket().read(readBuffer);
+        } catch (IOException e) {
+            throwIOExceptionFromRead(e);
+        }
+        if (n < 0) // Assuming that this is same as broken connection.
+            throwIOExceptionFromRead(new IOException("Failed to read."));
+        else if (n < 1)
+            return; // No data, no error.
+        try {
+            readBuffer = connectionThread.handleInput(readBuffer, n);
+        } catch (Throwable t) {
+            if (t instanceof IOException)
+                throw (IOException)t;
+            else
+                throw new IOException("Throwable from handleInput.", t);
+        }
+    }
+    // Called by session manager.
+    void onConnectFailed(ProCoreException exception) {
+        if (null != exception)
+            this.exception = exception;
+        else
+            this.exception = new NotConnectedException("Failed to create connection.");
+        connectionCreated.countDown();
+    }
+    // Called by session manager.
+    Connection onConnectSucceeded(SelectionKey key) {
+        Connection ret = null;
+        try {
+            channel.connect(key);
+            ret = this;
+        } catch (Throwable t) {
+            exception = new NotConnectedException("Failed to finalize connection.", t);
+            // Connection refused.
+        }
+        connectionCreated.countDown();
+        return ret;
+    }
+    // Called by session manager.
+    void onDisconnect() {
+        disconnect();
+    }
+    boolean isConnected() {
+        return channel.isConnected();
+    }
+//    boolean isStopping() {
+//    }
+    boolean isExcepted() {
+        return null == exception;
+    }
+    boolean isOk() {
+        return isConnected() && !isExcepted();
+    }
+}
+class HeaderData {
+    int lastTokenIn;
+    int token;
+    int messageNumber;
+    int inflateSize;
+    int deflateSize;
+}
+class ReceiveData {
+    public int lastTokenIn;
+    public int token;
+    public int messageNumber;
+//    public DataBuffer inflateBuffer;
+    public ByteBuffer buffer;
+    public int inflateSize;
+    ReceiveData(HeaderData headerData) {
+        lastTokenIn = headerData.lastTokenIn;
+        token = headerData.token;
+        messageNumber = headerData.messageNumber;
+    }
+}
+abstract class MethodHandler {
+    abstract void got(Packet packet, AbstractFunction function) throws ProCoreException;
+    DataBuffer getDataBuffer(Packet packet) {
+        packet.bytes.position(20);
+        DataBuffer db = new DataBuffer(packet.bytes, 0);
+        return db;
+    }
+}
+class ErrorHandler extends MethodHandler {
+    private final ProCoreException exception;
+    ErrorHandler(String s) {
+        if (null == s || s == "")
+            s = "Ilegal argument. Missing description for remote call exception.";
+        this.exception = new ProCoreException(s);
+    }
+    @Override
+    void got(Packet packet, AbstractFunction function) throws ProCoreException {
+        throw exception;
+    }
+}
+class ExceptionHandler extends MethodHandler {
+    @Override
+    void got(Packet packet, AbstractFunction function) throws ProCoreException {
+        packet.bytes.position(20);
+        ExceptionFunction exception = new ExceptionFunction();
+        exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0));
+        throw new ProCoreException(exception.what);
+    }
+}
+class DefaultHandler extends MethodHandler {
+    @Override
+    void got(Packet packet, AbstractFunction function) throws ProCoreException {
+        function.deserialize(function.getResponseNumber(), getDataBuffer(packet));
+    }
+}
+class GetClusterHandler extends MethodHandler {
+    private final GetClusterNewFunction function;
+    public GetClusterHandler(final GetClusterNewFunction function) {
+        this.function = function;
+    }
+    @Override
+    void got(Packet packet, AbstractFunction function) throws ProCoreException {
+        this.function.inflateSize = packet.header.inflateSize;
+        packet.bytes.position(20);
+        packet.bytes.limit(20 + packet.header.deflateSize);
+        this.function.deflated = packet.bytes.compact();
+        this.function.deflated.position(0);
+        this.function.deflated.limit(packet.header.deflateSize);
+    }
+}
+//class AckHandler extends MethodHandler {
+//    @Override
+//    void got(Packet packet, AbstractFunction function) throws ProCoreException {
+//        packet.first.position(20);
+//        NullFunction ack = new NullFunction();
+//        ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0));
+//        Util.showDebug("ack message number=" + ack.messageNumber);
+//    }
+//}
+abstract class AbstractMethod {
+//  abstract ByteBuffer serialize(ByteOrder byteOrder);
+  protected final int requestNumber;
+  protected final int responseNumber;
+  protected int receivedNumber = 0;
+  boolean hasResponse() {
+      return responseNumber != 0;
+  }
+  private int token = 0;
+  void setToken(int token) {
+      this.token = token;
+  }
+  int getToken() {
+      return token;
+  }
+  AbstractMethod(int requestNumber, int responseNumber) {
+      this.requestNumber = requestNumber;
+      this.responseNumber = responseNumber;
+  }
+  void prepareForSendingRequest() {
+      receivedNumber = 0;
+  }
+}
+class Method extends AbstractMethod {
+    private final ExceptionHandler exceptionHandler = new ExceptionHandler();
+    private final AbstractFunction function;
+    private Packet packet = null;
+    private CountDownLatch gotPacket = new CountDownLatch(1);
+    private MethodHandler handler = new DefaultHandler();
+    private final EventHandler eventHandler;
+    Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) {
+        super(function.getRequestNumber(), function.getResponseNumber());
+        this.eventHandler = eventHandler;
+        if (null != handler)
+            this.handler = handler;
+        switch (function.getRequestNumber()) {
+            default:
+                break;
+            case MessageNumber.AcceptCommitRequest:
+            case MessageNumber.UpdateClusterRequest:
+                gotPacket.countDown();
+            break;
+        }
+        this.function = function;
+    } // MARK: waitforReply
+    void waitForReply(Connection c) throws ProCoreException, InterruptedException {
+        while (gotPacket.getCount() > 0) {
+            gotPacket.await(30, TimeUnit.SECONDS);
+            if (gotPacket.getCount() > 0) {
+                System.err.println("waitForReply timeout:");
+                System.err.println("wait token=" + getToken());
+                // MARK: c.onWait(getToken());
+            }
+        }
+        if (null != packet)
+            handler.got(packet, function);
+    }
+    EventHandler getEventHandler(EventHandler eventHandler) {
+        if (null != this.eventHandler)
+            return this.eventHandler;
+        else
+            return eventHandler;
+    }
+    ByteBuffer serialize(ByteOrder byteOrder) {
+        DataBuffer db = function.serialize(byteOrder);
+        return db.getByteBuffer();
+    }
+    void gotPacket(Packet packet) {
+        this.packet = packet;
+        gotPacket.countDown();
+    }
+    void gotException(String s) {
+        handler = new ErrorHandler(s);
+        gotPacket.countDown();
+    }
+    void gotException(Packet packet) {
+        this.packet = packet;
+        handler = exceptionHandler;
+        gotPacket.countDown();
+    }
+//    void gotAck(Packet packet) {
+//        this.packet = packet;
+//        handler = new AckHandler();
+//        gotPacket.countDown();
+//    }
+}
+class DebugPolicy {
+    static final boolean REPORT_DEBUG_EVENTS = false;
+    static final boolean REPORT_SERVER_EVENTS = false;
+    static final boolean REPORT_DEBUG_OPERATIONS = false;
+    static final boolean REPORT_TIME_CONSUMING_REQUESTS = false;
+    static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false;
+
+    private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS;
+    private static final long LONG_EXECUTION_REPORT_PERIOD = 10000;
+    static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD);
+
+    // Time to wait for an event (in milliseconds).
+    // Code logic should assume that this wait never expires.
+    // Should be big enough so that if this time is expired it indicates an error situation.
+    private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour
+    private static final boolean REPORT_WAIT_EVENTS = true;
+    private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true;
+    /**
+     * Calls object.wait.
+     * @param waitMs time to wait in milliseconds.
+     * @param object to wait for (used for calling object.wait)
+     * @param string text used in debug prints.
+     * @see java.lang.Object.wait
+     */
+    static final void wait(long waitMs, Object object, String string) {
+        waitImpl(waitMs, object, string);
+    }
+    /**
+     * Calls object.wait.
+     * @param object to wait for (used for calling object.wait)
+     * @param string text used in debug prints.
+     * @see java.lang.Object.wait
+     */
+    static final void wait(Object object, String string) {
+        waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string);
+    }
+    private static final void report(String string, Object object) {
+        String msg = "DEBUG: " + string
+        + " " + object.getClass().getSimpleName()
+        + " hid=" + System.identityHashCode(object);
+        if (REPORT_WAIT_EVENTS_VERBOSE)
+            new Throwable(msg).printStackTrace();
+        else
+            System.err.println(msg);
+    }
+    private static final void waitImpl(long waitMs, Object object, String string) {
+        if (REPORT_WAIT_EVENTS)
+            report("Wait for event started.", object);
+        try {
+            object.wait(waitMs); // milliseconds
+        } catch (InterruptedException e) {
+            if (REPORT_WAIT_EVENTS)
+                report("Wait for event interrupted.", object);
+        } finally {
+            if (REPORT_WAIT_EVENTS)
+                report("Wait for event stopped.", object);
+        }
+    }
+}