-package org.simantics.db.server.internal;\r
-\r
-import java.io.IOException;\r
-import java.net.InetSocketAddress;\r
-import java.nio.ByteBuffer;\r
-import java.nio.ByteOrder;\r
-import java.nio.channels.CancelledKeyException;\r
-import java.nio.channels.ClosedChannelException;\r
-import java.nio.channels.SelectionKey;\r
-import java.nio.channels.Selector;\r
-import java.nio.channels.SocketChannel;\r
-import java.util.Collections;\r
-import java.util.Iterator;\r
-import java.util.LinkedList;\r
-import java.util.List;\r
-import java.util.concurrent.CountDownLatch;\r
-import java.util.concurrent.TimeUnit;\r
-\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.server.ProCoreException;\r
-import org.simantics.db.server.protocol.AbstractFunction;\r
-import org.simantics.db.server.protocol.DataBuffer;\r
-import org.simantics.db.server.protocol.ExceptionFunction;\r
-import org.simantics.db.server.protocol.GetClusterNewFunction;\r
-import org.simantics.db.server.protocol.MessageNumber;\r
-\r
-import gnu.trove.iterator.TIntObjectIterator;\r
-import gnu.trove.map.hash.TIntObjectHashMap;\r
-\r
-public class ConnectionManager {\r
- private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS;\r
- private static final ConnectionManager connectionManager = new ConnectionManager();\r
- public static ConnectionManager getInstance() {\r
- return connectionManager.instance();\r
- }\r
- private final Manager manager = new Manager();\r
- private final Thread thread = new Thread(manager, "Connection Manager");\r
- private boolean stop = false;\r
- protected List<Connection> connections = Collections.synchronizedList(new LinkedList<Connection>());\r
- private ConnectionManager() {\r
- Util.trace("ConnectionManager.new");\r
- }\r
- private ConnectionManager instance() {\r
- if (!thread.isAlive())\r
- start();\r
- return this;\r
- }\r
- protected class Manager implements Runnable {\r
- private Selector selector;\r
- private CountDownLatch selectorOpen = new CountDownLatch(1);\r
- Manager() {\r
- }\r
- public void run() {\r
- try {\r
- selector = Selector.open();\r
- selectorOpen.countDown();\r
- while (true) {\r
- int n = selector.select();\r
- try {\r
- if (n > 0)\r
- processSelected();\r
- processConnectors();\r
- if (stop) {\r
- selector.close();\r
- return;\r
- }\r
- } catch (CancelledKeyException e) {\r
- Logger.defaultLogError("Cancelled key in select loop.", e);\r
- }\r
- }\r
- } catch (Throwable x) {\r
- Logger.defaultLogError("Select loop failed.", x);\r
- }\r
- }\r
- void processConnectors() throws ClosedChannelException {\r
- while (connections.size() > 0) {\r
- Connection c = connections.remove(0);\r
- c.register(selector);\r
- }\r
- }\r
- void processSelected() {\r
- Iterator<SelectionKey> i = selector.selectedKeys().iterator();\r
- while (i.hasNext()) {\r
- SelectionKey sk = (SelectionKey)i.next();\r
- i.remove();\r
- try {\r
- if (!sk.isValid())\r
- processDisconnect(sk);\r
- else if (sk.isConnectable())\r
- processConnect(sk);\r
- else if (sk.isReadable())\r
- processRead(sk);\r
- } catch (CancelledKeyException e) {\r
- processDisconnect(sk);\r
- }\r
- }\r
- }\r
- private void processDisconnect(SelectionKey key) {\r
- Connection c = (Connection)key.attachment();\r
- if (null == c)\r
- return;\r
- c.onDisconnect();\r
- key.attach(null); // Just to make sure that reference to connection is released.\r
- key.cancel();\r
- }\r
- private void processConnect(SelectionKey key) {\r
- Connection connection = (Connection)key.attachment();\r
- SocketChannel sc = (SocketChannel)key.channel();\r
- try {\r
- if (sc.finishConnect()) {\r
- int ops = key.interestOps() & ~SelectionKey.OP_CONNECT;\r
- ops |= SelectionKey.OP_READ;\r
- key.interestOps(ops);\r
- Connection c = connection.onConnectSucceeded(key);\r
- if (c == null) { // Connector refused to accept connection.\r
- key.attach(null); // Just to make sure that reference to connector is released.\r
- key.cancel();\r
- sc.close();\r
- } else\r
- key.attach(c);\r
- }\r
- } catch (Throwable x) {\r
- try {\r
- if (key.isValid())\r
- key.attach(null); // Just to make sure that reference to connector is released.\r
- key.cancel();\r
- try {\r
- sc.close();\r
- } catch (IOException e) {\r
- if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used.\r
- Logger.defaultLogError("Failed to close connection.", e);\r
- }\r
- } finally {\r
- connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x));\r
- }\r
- }\r
- }\r
- }\r
- private void processRead(SelectionKey key) {\r
- Connection c = (Connection)key.attachment();\r
- SocketChannel sc = (SocketChannel)key.channel();\r
- try {\r
- int ops = key.interestOps();\r
- ops |= SelectionKey.OP_READ;\r
- key.interestOps(ops);\r
- c.onRead();\r
- } catch (IOException x) {\r
- if (key.isValid()) {\r
- key.attach(null); // Just to make sure that reference to connection is released.\r
- key.cancel();\r
- }\r
- if (sc.isOpen())\r
- try {\r
- sc.close();\r
- } catch (IOException e) {\r
- }\r
- }\r
- }\r
- void connect(Connection connection) throws ProCoreException {\r
- try {\r
- if (connections.contains(connection))\r
- throw new ProCoreException("Connection already registered. connection=" + connection);\r
- connection.prepare4Connection(SocketChannel.open());\r
- connections.add(connection);\r
- manager.selector.wakeup();\r
- connection.wait4Connection();\r
- } catch(IOException e) {\r
- throw new ConnectionException("Failed to connect to " + connection + ".", e);\r
- } catch (InterruptedException e) {\r
- if (!connection.isConnected())\r
- throw new NotConnectedException("Connection interrupted to " + connection + ".");\r
- }\r
- }\r
- private void start() {\r
- thread.start();\r
- boolean once = true;\r
- while (manager.selectorOpen.getCount() > 0) {\r
- try {\r
- manager.selectorOpen.await();\r
- } catch (InterruptedException e) {\r
- if (once) {\r
- once = false;\r
- Util.logError("Wait for selector open interrupted. Continuing wait.", e);\r
- }\r
- }\r
- }\r
- }\r
- public void stop() {\r
- stop = true;\r
- manager.selector.wakeup();\r
- }\r
-}\r
-//class MethodQueue {\r
-// private LinkedList<Method> methods = new LinkedList<Method>();\r
-// MethodQueue() {\r
-// }\r
-// public synchronized int size() {\r
-// return methods.size();\r
-// }\r
-// public synchronized void addFirst(Method method) {\r
-// methods.addFirst(method);\r
-// }\r
-// public synchronized void addLast(Method method) {\r
-// methods.addLast(method);\r
-// }\r
-// public synchronized Method removeFirst() {\r
-// try {\r
-// Method method = methods.removeFirst();\r
-// return method;\r
-// } catch (NoSuchElementException e) {\r
-// return null;\r
-// }\r
-// }\r
-// public synchronized void close() {\r
-// while (size() > 0) {\r
-// Method method = removeFirst();\r
-// method.setExceptionText("Closing.");\r
-// method.gotResponse();\r
-// }\r
-// }\r
-//\r
-//}\r
-//class RequestQueue {\r
-// private LinkedList<AbstractFunction> functions = new LinkedList<AbstractFunction>();\r
-//\r
-// RequestQueue() {\r
-// }\r
-// public synchronized int size() {\r
-// return functions.size();\r
-// }\r
-// public synchronized void addFirst(AbstractFunction e) {\r
-// functions.addFirst(e);\r
-// }\r
-// public synchronized void addLast(AbstractFunction e) {\r
-// functions.addLast(e);\r
-// }\r
-// public synchronized AbstractFunction removeFirst() {\r
-// try {\r
-// AbstractFunction ret = functions.removeFirst();\r
-// return ret;\r
-// } catch (NoSuchElementException e) {\r
-// return null;\r
-// }\r
-// }\r
-//}\r
-class MethodQueue {\r
- private TIntObjectHashMap<Method> methodMap = new TIntObjectHashMap<Method>();\r
- MethodQueue() {\r
- }\r
- public synchronized int size() {\r
- return methodMap.size();\r
- }\r
- public synchronized boolean isEmpty() {\r
- return 0 == size();\r
- }\r
- public synchronized void add(Method method) throws ProCoreException {\r
- int token = method.getToken();\r
- if (0 == token)\r
- throw new ProCoreException("Illegal token number.");\r
- Method oldMethod = methodMap.put(token, method);\r
- if (null != oldMethod)\r
- throw new ProCoreException("Duplicate token number.");\r
- }\r
- public synchronized Method remove(int token) {\r
- Method method = methodMap.remove(token);\r
- return method;\r
- }\r
- synchronized void close() {\r
- try {\r
- TIntObjectIterator<Method> iterator = methodMap.iterator();\r
- while (iterator.hasNext()) {\r
- iterator.advance();\r
- Method method = iterator.value();\r
- if (null == method)\r
- Util.logError("Null function in method queue.");\r
- method.gotException("Method queue closing.");\r
- }\r
- } finally {\r
- methodMap.clear();\r
- }\r
- }\r
-// public synchronized Method removeAny() throws ProCoreException {\r
-// TIntObjectIterator<Method> iterator = methodMap.iterator();\r
-// if (!iterator.hasNext())\r
-// return null;\r
-// iterator.advance();\r
-// Method method = iterator.value();\r
-// if (null == method)\r
-// throw new ProCoreException("Null function in response queue.");\r
-// methodMap.remove(method.getToken());\r
-// return method;\r
-// }\r
-}\r
-class Channel {\r
- private SelectionKey selectionKey = null;\r
- private SocketChannel socketChannel = null;\r
- Channel() {\r
- }\r
- void register(Selector selector, Connection connection)\r
- throws ClosedChannelException {\r
- if (null == socketChannel)\r
- throw new ClosedChannelException();\r
- socketChannel.register(selector, SelectionKey.OP_CONNECT, connection);\r
- }\r
- void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address)\r
- throws ProCoreException, IOException {\r
- if (null == socketChannel)\r
- throw new NotConnectedException("Illegal argument. No socket channel to connect to.");\r
- if (null == address)\r
- throw new NotConnectedException("Illegal argument. No address to connect to.");\r
- if (null != this.socketChannel)\r
- throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this);\r
- socketChannel.configureBlocking(false);\r
- boolean connected = socketChannel.connect(address);\r
- if (connected)\r
- if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
- Util.showDebug("Connected to " + socketChannel + ".");\r
- this.socketChannel = socketChannel; // Waiting for connect.\r
- }\r
-// private static final int IPTOS_LOWDELAY = 0x10;\r
- synchronized void connect(SelectionKey key) {\r
- this.selectionKey = key;\r
- this.socketChannel = (SocketChannel)key.channel();\r
- if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
- Util.showDebug("Connected to " + socketChannel + ".");\r
-// try {\r
-// this.socketChannel.socket().setKeepAlive(true);\r
-// this.socketChannel.socket().setSoLinger(true, 1);\r
-// } catch (SocketException e) {\r
-// e.printStackTrace();\r
-// }\r
-// try {\r
-// this.socketChannel.socket().setTcpNoDelay(true);\r
- // Crashes Kalle!\r
- // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY);\r
-// } catch (SocketException e) {\r
-// e.printStackTrace();\r
-// }\r
- }\r
- synchronized void disconnect() {\r
- if (selectionKey == null)\r
- return;\r
- try {\r
- try {\r
- if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
- Util.showDebug("Disconnected from " + socketChannel);\r
- selectionKey.interestOps(0);\r
- selectionKey.cancel();\r
- selectionKey.selector().wakeup();\r
- } catch (CancelledKeyException e) {\r
- if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
- Logger.defaultLogError("Failed to disconnect cleanly.", e);\r
- } catch (IllegalArgumentException e) {\r
- if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
- Logger.defaultLogError("Failed to disconnect cleanly.", e);\r
- }\r
- try { // Note that this must be after the selectionKey operations above.\r
- socketChannel.socket().close();\r
- } catch (IOException e) {\r
- if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
- Logger.defaultLogError("Failed to close socket cleanly.", e);\r
- }\r
- } finally {\r
- selectionKey = null;\r
- socketChannel = null;\r
- }\r
- }\r
- synchronized boolean isConnected() {\r
- boolean ret = selectionKey != null && socketChannel.isConnected();\r
- return ret;\r
- }\r
- synchronized SocketChannel socket() throws IOException {\r
- if (null == socketChannel)\r
- throw new IOException("Socket channel is null.");\r
- return socketChannel;\r
- }\r
- synchronized void listenReadEvents()\r
- throws IOException {\r
- if (null == selectionKey)\r
- throw new IOException("Trying to listen connection which is not connected.");\r
- int ops = selectionKey.interestOps() | SelectionKey.OP_READ;\r
- selectionKey.interestOps(ops);\r
- selectionKey.selector().wakeup();\r
- }\r
-}\r
-class Connection {\r
- private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS;\r
- private static int lastSentTokenNumber = 0;\r
- private final ConnectionThread connectionThread;\r
- private InetSocketAddress address;\r
- private Channel channel;\r
- private CountDownLatch connectionCreated;\r
- private ProCoreException exception;\r
- private ByteBuffer readBuffer;\r
- Connection(ConnectionThread connectionThread) {\r
- this.connectionThread = connectionThread;\r
- init4Connection(InetSocketAddress.createUnresolved("", 0));\r
- }\r
- @Override\r
- public String toString() {\r
- return "Connection address=" + address + " token=" + lastSentTokenNumber;\r
- }\r
- boolean addressNotInitialized() {\r
- return address.equals(InetSocketAddress.createUnresolved("", 0));\r
- }\r
- void register(Selector selector)\r
- throws ClosedChannelException {\r
- channel.register(selector, this);\r
- }\r
- InetSocketAddress getAddress() {\r
- return address;\r
- }\r
- boolean equalsAddress(InetSocketAddress address) {\r
- return address.equals(address);\r
- }\r
- void init4Connection(InetSocketAddress address) {\r
- this.address = address;\r
- if (null != channel && channel.isConnected())\r
- channel.disconnect();\r
- channel = new Channel();\r
- connectionCreated = new CountDownLatch(1);\r
- exception = null;\r
- readBuffer = connectionThread.getInputBuffer();\r
- }\r
- void prepare4Connection(SocketChannel socketChannel)\r
- throws ProCoreException, IOException {\r
- if (isConnected())\r
- throw new ProCoreException("Illegal state exception. Already connected.");\r
- if (address.isUnresolved())\r
- address = new InetSocketAddress(address.getHostName(), address.getPort());\r
- if (address.getPort() == 0)\r
- throw new ProCoreException("Port 0 not supported as connection address.");\r
- channel.prepare4Connection(socketChannel, address);\r
- }\r
- void wait4Connection() throws InterruptedException {\r
- connectionCreated.await();\r
- }\r
- synchronized void disconnect() {\r
- if (!isConnected())\r
- return;\r
- channel.disconnect();\r
- }\r
- private void throwIOExceptionFromRead(IOException e)\r
- throws IOException {\r
- disconnect(); // This ensures that isConnected will be false after this.\r
- if (null != e)\r
- throw e;\r
- throw new IOException("Failed to read.");\r
- }\r
- // Called by synchronized handleInput.\r
- private void sendMessage(ByteBuffer byteBuffer)\r
- throws IOException, InterruptedException {\r
- int left = byteBuffer.remaining();\r
- while (left > 0) {\r
- ByteBuffer slice = byteBuffer.slice();\r
- int size = Math.min(left, 1000000);\r
- slice.limit(size);\r
- sendBuffer(slice);\r
- int position = byteBuffer.position() + size;\r
- byteBuffer.position(position);\r
- left -= size;\r
- }\r
- }\r
- private void sendBuffer(ByteBuffer byteBuffer)\r
- throws IOException, InterruptedException {\r
- int left = byteBuffer.remaining();\r
- while (left > 0) {\r
- int n = channel.socket().write(byteBuffer);\r
- if (n > 0) {\r
- left -= n;\r
- continue;\r
- }\r
- // System.err.println("############### YIELD ##############################");\r
- Thread.yield();\r
- if (left > 0)\r
- Logger.defaultLogTrace("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining()\r
- + ", write return =" + n);\r
- }\r
- }\r
- static public class MessageHeader {\r
- public int token;\r
- public int lastTokenIn;\r
- public MessageHeader(int token, int lastTokenIn) {\r
- this.token = token;\r
- this.lastTokenIn = lastTokenIn;\r
- }\r
- }\r
- // Called by synchronized call.\r
- protected void call(MessageHeader messageHeader, Method method)\r
- throws IOException, InterruptedException {\r
- byte[] bytes = new byte[20];\r
- ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20);\r
- header.order(ByteOrder.LITTLE_ENDIAN);\r
- header.putInt(messageHeader.lastTokenIn);\r
- header.putInt(messageHeader.token);\r
- int messageNumber = method.requestNumber;\r
- header.putInt(messageNumber);\r
- header.putInt(0); // inflate size, zero if no compression\r
- ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order.\r
- header.putInt(data.position());\r
- header.clear();\r
- synchronized (this) {\r
- if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
- Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber);\r
- // Receiver expects header and data to form continuous block.\r
- sendMessage(header);\r
- data.limit(data.position());\r
- data.rewind();\r
- sendMessage(data);\r
- if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
- Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber);\r
- }\r
- }\r
- void sendMethod(MethodQueue mq, Method m, int lastTokenIn)\r
- throws ProCoreException, IOException, InterruptedException {\r
- m.prepareForSendingRequest();\r
- if (DEBUG)\r
- Util.showDebug("Setting last token out=" + lastTokenIn);\r
- synchronized (this) {\r
- MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn);\r
- m.setToken(messageHeader.token);\r
- mq.add(m);\r
- call(messageHeader, m);\r
- }\r
- }\r
- // Called by session manager.\r
- void onRead() throws IOException {\r
- if (readBuffer.remaining() < 1)\r
- throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead."));\r
- int n = -1;\r
- try {\r
- n = channel.socket().read(readBuffer);\r
- } catch (IOException e) {\r
- throwIOExceptionFromRead(e);\r
- }\r
- if (n < 0) // Assuming that this is same as broken connection.\r
- throwIOExceptionFromRead(new IOException("Failed to read."));\r
- else if (n < 1)\r
- return; // No data, no error.\r
- try {\r
- readBuffer = connectionThread.handleInput(readBuffer, n);\r
- } catch (Throwable t) {\r
- if (t instanceof IOException)\r
- throw (IOException)t;\r
- else\r
- throw new IOException("Throwable from handleInput.", t);\r
- }\r
- }\r
- // Called by session manager.\r
- void onConnectFailed(ProCoreException exception) {\r
- if (null != exception)\r
- this.exception = exception;\r
- else\r
- this.exception = new NotConnectedException("Failed to create connection.");\r
- connectionCreated.countDown();\r
- }\r
- // Called by session manager.\r
- Connection onConnectSucceeded(SelectionKey key) {\r
- Connection ret = null;\r
- try {\r
- channel.connect(key);\r
- ret = this;\r
- } catch (Throwable t) {\r
- exception = new NotConnectedException("Failed to finalize connection.", t);\r
- // Connection refused.\r
- }\r
- connectionCreated.countDown();\r
- return ret;\r
- }\r
- // Called by session manager.\r
- void onDisconnect() {\r
- disconnect();\r
- }\r
- boolean isConnected() {\r
- return channel.isConnected();\r
- }\r
-// boolean isStopping() {\r
-// }\r
- boolean isExcepted() {\r
- return null == exception;\r
- }\r
- boolean isOk() {\r
- return isConnected() && !isExcepted();\r
- }\r
-}\r
-class HeaderData {\r
- int lastTokenIn;\r
- int token;\r
- int messageNumber;\r
- int inflateSize;\r
- int deflateSize;\r
-}\r
-class ReceiveData {\r
- public int lastTokenIn;\r
- public int token;\r
- public int messageNumber;\r
-// public DataBuffer inflateBuffer;\r
- public ByteBuffer buffer;\r
- public int inflateSize;\r
- ReceiveData(HeaderData headerData) {\r
- lastTokenIn = headerData.lastTokenIn;\r
- token = headerData.token;\r
- messageNumber = headerData.messageNumber;\r
- }\r
-}\r
-abstract class MethodHandler {\r
- abstract void got(Packet packet, AbstractFunction function) throws ProCoreException;\r
- DataBuffer getDataBuffer(Packet packet) {\r
- packet.bytes.position(20);\r
- DataBuffer db = new DataBuffer(packet.bytes, 0);\r
- return db;\r
- }\r
-}\r
-class ErrorHandler extends MethodHandler {\r
- private final ProCoreException exception;\r
- ErrorHandler(String s) {\r
- if (null == s || s == "")\r
- s = "Ilegal argument. Missing description for remote call exception.";\r
- this.exception = new ProCoreException(s);\r
- }\r
- @Override\r
- void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
- throw exception;\r
- }\r
-}\r
-class ExceptionHandler extends MethodHandler {\r
- @Override\r
- void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
- packet.bytes.position(20);\r
- ExceptionFunction exception = new ExceptionFunction();\r
- exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0));\r
- throw new ProCoreException(exception.what);\r
- }\r
-}\r
-class DefaultHandler extends MethodHandler {\r
- @Override\r
- void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
- function.deserialize(function.getResponseNumber(), getDataBuffer(packet));\r
- }\r
-}\r
-class GetClusterHandler extends MethodHandler {\r
- private final GetClusterNewFunction function;\r
- public GetClusterHandler(final GetClusterNewFunction function) {\r
- this.function = function;\r
- }\r
- @Override\r
- void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
- this.function.inflateSize = packet.header.inflateSize;\r
- packet.bytes.position(20);\r
- packet.bytes.limit(20 + packet.header.deflateSize);\r
- this.function.deflated = packet.bytes.compact();\r
- this.function.deflated.position(0);\r
- this.function.deflated.limit(packet.header.deflateSize);\r
- }\r
-}\r
-//class AckHandler extends MethodHandler {\r
-// @Override\r
-// void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
-// packet.first.position(20);\r
-// NullFunction ack = new NullFunction();\r
-// ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0));\r
-// Util.showDebug("ack message number=" + ack.messageNumber);\r
-// }\r
-//}\r
-abstract class AbstractMethod {\r
-// abstract ByteBuffer serialize(ByteOrder byteOrder);\r
- protected final int requestNumber;\r
- protected final int responseNumber;\r
- protected int receivedNumber = 0;\r
- boolean hasResponse() {\r
- return responseNumber != 0;\r
- }\r
- private int token = 0;\r
- void setToken(int token) {\r
- this.token = token;\r
- }\r
- int getToken() {\r
- return token;\r
- }\r
- AbstractMethod(int requestNumber, int responseNumber) {\r
- this.requestNumber = requestNumber;\r
- this.responseNumber = responseNumber;\r
- }\r
- void prepareForSendingRequest() {\r
- receivedNumber = 0;\r
- }\r
-}\r
-class Method extends AbstractMethod {\r
- private final ExceptionHandler exceptionHandler = new ExceptionHandler();\r
- private final AbstractFunction function;\r
- private Packet packet = null;\r
- private CountDownLatch gotPacket = new CountDownLatch(1);\r
- private MethodHandler handler = new DefaultHandler();\r
- private final EventHandler eventHandler;\r
- Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) {\r
- super(function.getRequestNumber(), function.getResponseNumber());\r
- this.eventHandler = eventHandler;\r
- if (null != handler)\r
- this.handler = handler;\r
- switch (function.getRequestNumber()) {\r
- default:\r
- break;\r
- case MessageNumber.AcceptCommitRequest:\r
- case MessageNumber.UpdateClusterRequest:\r
- gotPacket.countDown();\r
- break;\r
- }\r
- this.function = function;\r
- } // MARK: waitforReply\r
- void waitForReply(Connection c) throws ProCoreException, InterruptedException {\r
- while (gotPacket.getCount() > 0) {\r
- gotPacket.await(30, TimeUnit.SECONDS);\r
- if (gotPacket.getCount() > 0) {\r
- System.err.println("waitForReply timeout:");\r
- System.err.println("wait token=" + getToken());\r
- // MARK: c.onWait(getToken());\r
- }\r
- }\r
- if (null != packet)\r
- handler.got(packet, function);\r
- }\r
- EventHandler getEventHandler(EventHandler eventHandler) {\r
- if (null != this.eventHandler)\r
- return this.eventHandler;\r
- else\r
- return eventHandler;\r
- }\r
- ByteBuffer serialize(ByteOrder byteOrder) {\r
- DataBuffer db = function.serialize(byteOrder);\r
- return db.getByteBuffer();\r
- }\r
- void gotPacket(Packet packet) {\r
- this.packet = packet;\r
- gotPacket.countDown();\r
- }\r
- void gotException(String s) {\r
- handler = new ErrorHandler(s);\r
- gotPacket.countDown();\r
- }\r
- void gotException(Packet packet) {\r
- this.packet = packet;\r
- handler = exceptionHandler;\r
- gotPacket.countDown();\r
- }\r
-// void gotAck(Packet packet) {\r
-// this.packet = packet;\r
-// handler = new AckHandler();\r
-// gotPacket.countDown();\r
-// }\r
-}\r
-class DebugPolicy {\r
- static final boolean REPORT_DEBUG_EVENTS = false;\r
- static final boolean REPORT_SERVER_EVENTS = false;\r
- static final boolean REPORT_DEBUG_OPERATIONS = false;\r
- static final boolean REPORT_TIME_CONSUMING_REQUESTS = false;\r
- static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false;\r
-\r
- private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS;\r
- private static final long LONG_EXECUTION_REPORT_PERIOD = 10000;\r
- static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD);\r
-\r
- // Time to wait for an event (in milliseconds).\r
- // Code logic should assume that this wait never expires.\r
- // Should be big enough so that if this time is expired it indicates an error situation.\r
- private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour\r
- private static final boolean REPORT_WAIT_EVENTS = true;\r
- private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true;\r
- /**\r
- * Calls object.wait.\r
- * @param waitMs time to wait in milliseconds.\r
- * @param object to wait for (used for calling object.wait)\r
- * @param string text used in debug prints.\r
- * @see java.lang.Object.wait\r
- */\r
- static final void wait(long waitMs, Object object, String string) {\r
- waitImpl(waitMs, object, string);\r
- }\r
- /**\r
- * Calls object.wait.\r
- * @param object to wait for (used for calling object.wait)\r
- * @param string text used in debug prints.\r
- * @see java.lang.Object.wait\r
- */\r
- static final void wait(Object object, String string) {\r
- waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string);\r
- }\r
- private static final void report(String string, Object object) {\r
- String msg = "DEBUG: " + string\r
- + " " + object.getClass().getSimpleName()\r
- + " hid=" + System.identityHashCode(object);\r
- if (REPORT_WAIT_EVENTS_VERBOSE)\r
- new Throwable(msg).printStackTrace();\r
- else\r
- System.err.println(msg);\r
- }\r
- private static final void waitImpl(long waitMs, Object object, String string) {\r
- if (REPORT_WAIT_EVENTS)\r
- report("Wait for event started.", object);\r
- try {\r
- object.wait(waitMs); // milliseconds\r
- } catch (InterruptedException e) {\r
- if (REPORT_WAIT_EVENTS)\r
- report("Wait for event interrupted.", object);\r
- } finally {\r
- if (REPORT_WAIT_EVENTS)\r
- report("Wait for event stopped.", object);\r
- }\r
- }\r
-}\r
+package org.simantics.db.server.internal;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.db.server.protocol.AbstractFunction;
+import org.simantics.db.server.protocol.DataBuffer;
+import org.simantics.db.server.protocol.ExceptionFunction;
+import org.simantics.db.server.protocol.GetClusterNewFunction;
+import org.simantics.db.server.protocol.MessageNumber;
+
+import gnu.trove.iterator.TIntObjectIterator;
+import gnu.trove.map.hash.TIntObjectHashMap;
+
+public class ConnectionManager {
+ private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS;
+ private static final ConnectionManager connectionManager = new ConnectionManager();
+ public static ConnectionManager getInstance() {
+ return connectionManager.instance();
+ }
+ private final Manager manager = new Manager();
+ private final Thread thread = new Thread(manager, "Connection Manager");
+ private boolean stop = false;
+ protected List<Connection> connections = Collections.synchronizedList(new LinkedList<Connection>());
+ private ConnectionManager() {
+ Util.trace("ConnectionManager.new");
+ }
+ private ConnectionManager instance() {
+ if (!thread.isAlive())
+ start();
+ return this;
+ }
+ protected class Manager implements Runnable {
+ private Selector selector;
+ private CountDownLatch selectorOpen = new CountDownLatch(1);
+ Manager() {
+ }
+ public void run() {
+ try {
+ selector = Selector.open();
+ selectorOpen.countDown();
+ while (true) {
+ int n = selector.select();
+ try {
+ if (n > 0)
+ processSelected();
+ processConnectors();
+ if (stop) {
+ selector.close();
+ return;
+ }
+ } catch (CancelledKeyException e) {
+ Logger.defaultLogError("Cancelled key in select loop.", e);
+ }
+ }
+ } catch (Throwable x) {
+ Logger.defaultLogError("Select loop failed.", x);
+ }
+ }
+ void processConnectors() throws ClosedChannelException {
+ while (connections.size() > 0) {
+ Connection c = connections.remove(0);
+ c.register(selector);
+ }
+ }
+ void processSelected() {
+ Iterator<SelectionKey> i = selector.selectedKeys().iterator();
+ while (i.hasNext()) {
+ SelectionKey sk = (SelectionKey)i.next();
+ i.remove();
+ try {
+ if (!sk.isValid())
+ processDisconnect(sk);
+ else if (sk.isConnectable())
+ processConnect(sk);
+ else if (sk.isReadable())
+ processRead(sk);
+ } catch (CancelledKeyException e) {
+ processDisconnect(sk);
+ }
+ }
+ }
+ private void processDisconnect(SelectionKey key) {
+ Connection c = (Connection)key.attachment();
+ if (null == c)
+ return;
+ c.onDisconnect();
+ key.attach(null); // Just to make sure that reference to connection is released.
+ key.cancel();
+ }
+ private void processConnect(SelectionKey key) {
+ Connection connection = (Connection)key.attachment();
+ SocketChannel sc = (SocketChannel)key.channel();
+ try {
+ if (sc.finishConnect()) {
+ int ops = key.interestOps() & ~SelectionKey.OP_CONNECT;
+ ops |= SelectionKey.OP_READ;
+ key.interestOps(ops);
+ Connection c = connection.onConnectSucceeded(key);
+ if (c == null) { // Connector refused to accept connection.
+ key.attach(null); // Just to make sure that reference to connector is released.
+ key.cancel();
+ sc.close();
+ } else
+ key.attach(c);
+ }
+ } catch (Throwable x) {
+ try {
+ if (key.isValid())
+ key.attach(null); // Just to make sure that reference to connector is released.
+ key.cancel();
+ try {
+ sc.close();
+ } catch (IOException e) {
+ if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used.
+ Logger.defaultLogError("Failed to close connection.", e);
+ }
+ } finally {
+ connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x));
+ }
+ }
+ }
+ }
+ private void processRead(SelectionKey key) {
+ Connection c = (Connection)key.attachment();
+ SocketChannel sc = (SocketChannel)key.channel();
+ try {
+ int ops = key.interestOps();
+ ops |= SelectionKey.OP_READ;
+ key.interestOps(ops);
+ c.onRead();
+ } catch (IOException x) {
+ if (key.isValid()) {
+ key.attach(null); // Just to make sure that reference to connection is released.
+ key.cancel();
+ }
+ if (sc.isOpen())
+ try {
+ sc.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ void connect(Connection connection) throws ProCoreException {
+ try {
+ if (connections.contains(connection))
+ throw new ProCoreException("Connection already registered. connection=" + connection);
+ connection.prepare4Connection(SocketChannel.open());
+ connections.add(connection);
+ manager.selector.wakeup();
+ connection.wait4Connection();
+ } catch(IOException e) {
+ throw new ConnectionException("Failed to connect to " + connection + ".", e);
+ } catch (InterruptedException e) {
+ if (!connection.isConnected())
+ throw new NotConnectedException("Connection interrupted to " + connection + ".");
+ }
+ }
+ private void start() {
+ thread.start();
+ boolean once = true;
+ while (manager.selectorOpen.getCount() > 0) {
+ try {
+ manager.selectorOpen.await();
+ } catch (InterruptedException e) {
+ if (once) {
+ once = false;
+ Util.logError("Wait for selector open interrupted. Continuing wait.", e);
+ }
+ }
+ }
+ }
+ public void stop() {
+ stop = true;
+ manager.selector.wakeup();
+ }
+}
+//class MethodQueue {
+// private LinkedList<Method> methods = new LinkedList<Method>();
+// MethodQueue() {
+// }
+// public synchronized int size() {
+// return methods.size();
+// }
+// public synchronized void addFirst(Method method) {
+// methods.addFirst(method);
+// }
+// public synchronized void addLast(Method method) {
+// methods.addLast(method);
+// }
+// public synchronized Method removeFirst() {
+// try {
+// Method method = methods.removeFirst();
+// return method;
+// } catch (NoSuchElementException e) {
+// return null;
+// }
+// }
+// public synchronized void close() {
+// while (size() > 0) {
+// Method method = removeFirst();
+// method.setExceptionText("Closing.");
+// method.gotResponse();
+// }
+// }
+//
+//}
+//class RequestQueue {
+// private LinkedList<AbstractFunction> functions = new LinkedList<AbstractFunction>();
+//
+// RequestQueue() {
+// }
+// public synchronized int size() {
+// return functions.size();
+// }
+// public synchronized void addFirst(AbstractFunction e) {
+// functions.addFirst(e);
+// }
+// public synchronized void addLast(AbstractFunction e) {
+// functions.addLast(e);
+// }
+// public synchronized AbstractFunction removeFirst() {
+// try {
+// AbstractFunction ret = functions.removeFirst();
+// return ret;
+// } catch (NoSuchElementException e) {
+// return null;
+// }
+// }
+//}
+class MethodQueue {
+ private TIntObjectHashMap<Method> methodMap = new TIntObjectHashMap<Method>();
+ MethodQueue() {
+ }
+ public synchronized int size() {
+ return methodMap.size();
+ }
+ public synchronized boolean isEmpty() {
+ return 0 == size();
+ }
+ public synchronized void add(Method method) throws ProCoreException {
+ int token = method.getToken();
+ if (0 == token)
+ throw new ProCoreException("Illegal token number.");
+ Method oldMethod = methodMap.put(token, method);
+ if (null != oldMethod)
+ throw new ProCoreException("Duplicate token number.");
+ }
+ public synchronized Method remove(int token) {
+ Method method = methodMap.remove(token);
+ return method;
+ }
+ synchronized void close() {
+ try {
+ TIntObjectIterator<Method> iterator = methodMap.iterator();
+ while (iterator.hasNext()) {
+ iterator.advance();
+ Method method = iterator.value();
+ if (null == method)
+ Util.logError("Null function in method queue.");
+ method.gotException("Method queue closing.");
+ }
+ } finally {
+ methodMap.clear();
+ }
+ }
+// public synchronized Method removeAny() throws ProCoreException {
+// TIntObjectIterator<Method> iterator = methodMap.iterator();
+// if (!iterator.hasNext())
+// return null;
+// iterator.advance();
+// Method method = iterator.value();
+// if (null == method)
+// throw new ProCoreException("Null function in response queue.");
+// methodMap.remove(method.getToken());
+// return method;
+// }
+}
+class Channel {
+ private SelectionKey selectionKey = null;
+ private SocketChannel socketChannel = null;
+ Channel() {
+ }
+ void register(Selector selector, Connection connection)
+ throws ClosedChannelException {
+ if (null == socketChannel)
+ throw new ClosedChannelException();
+ socketChannel.register(selector, SelectionKey.OP_CONNECT, connection);
+ }
+ void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address)
+ throws ProCoreException, IOException {
+ if (null == socketChannel)
+ throw new NotConnectedException("Illegal argument. No socket channel to connect to.");
+ if (null == address)
+ throw new NotConnectedException("Illegal argument. No address to connect to.");
+ if (null != this.socketChannel)
+ throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this);
+ socketChannel.configureBlocking(false);
+ boolean connected = socketChannel.connect(address);
+ if (connected)
+ if (DebugPolicy.REPORT_DEBUG_EVENTS)
+ Util.showDebug("Connected to " + socketChannel + ".");
+ this.socketChannel = socketChannel; // Waiting for connect.
+ }
+// private static final int IPTOS_LOWDELAY = 0x10;
+ synchronized void connect(SelectionKey key) {
+ this.selectionKey = key;
+ this.socketChannel = (SocketChannel)key.channel();
+ if (DebugPolicy.REPORT_DEBUG_EVENTS)
+ Util.showDebug("Connected to " + socketChannel + ".");
+// try {
+// this.socketChannel.socket().setKeepAlive(true);
+// this.socketChannel.socket().setSoLinger(true, 1);
+// } catch (SocketException e) {
+// e.printStackTrace();
+// }
+// try {
+// this.socketChannel.socket().setTcpNoDelay(true);
+ // Crashes Kalle!
+ // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY);
+// } catch (SocketException e) {
+// e.printStackTrace();
+// }
+ }
+ synchronized void disconnect() {
+ if (selectionKey == null)
+ return;
+ try {
+ try {
+ if (DebugPolicy.REPORT_DEBUG_EVENTS)
+ Util.showDebug("Disconnected from " + socketChannel);
+ selectionKey.interestOps(0);
+ selectionKey.cancel();
+ selectionKey.selector().wakeup();
+ } catch (CancelledKeyException e) {
+ if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
+ Logger.defaultLogError("Failed to disconnect cleanly.", e);
+ } catch (IllegalArgumentException e) {
+ if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
+ Logger.defaultLogError("Failed to disconnect cleanly.", e);
+ }
+ try { // Note that this must be after the selectionKey operations above.
+ socketChannel.socket().close();
+ } catch (IOException e) {
+ if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
+ Logger.defaultLogError("Failed to close socket cleanly.", e);
+ }
+ } finally {
+ selectionKey = null;
+ socketChannel = null;
+ }
+ }
+ synchronized boolean isConnected() {
+ boolean ret = selectionKey != null && socketChannel.isConnected();
+ return ret;
+ }
+ synchronized SocketChannel socket() throws IOException {
+ if (null == socketChannel)
+ throw new IOException("Socket channel is null.");
+ return socketChannel;
+ }
+ synchronized void listenReadEvents()
+ throws IOException {
+ if (null == selectionKey)
+ throw new IOException("Trying to listen connection which is not connected.");
+ int ops = selectionKey.interestOps() | SelectionKey.OP_READ;
+ selectionKey.interestOps(ops);
+ selectionKey.selector().wakeup();
+ }
+}
+class Connection {
+ private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS;
+ private static int lastSentTokenNumber = 0;
+ private final ConnectionThread connectionThread;
+ private InetSocketAddress address;
+ private Channel channel;
+ private CountDownLatch connectionCreated;
+ private ProCoreException exception;
+ private ByteBuffer readBuffer;
+ Connection(ConnectionThread connectionThread) {
+ this.connectionThread = connectionThread;
+ init4Connection(InetSocketAddress.createUnresolved("", 0));
+ }
+ @Override
+ public String toString() {
+ return "Connection address=" + address + " token=" + lastSentTokenNumber;
+ }
+ boolean addressNotInitialized() {
+ return address.equals(InetSocketAddress.createUnresolved("", 0));
+ }
+ void register(Selector selector)
+ throws ClosedChannelException {
+ channel.register(selector, this);
+ }
+ InetSocketAddress getAddress() {
+ return address;
+ }
+ boolean equalsAddress(InetSocketAddress address) {
+ return address.equals(address);
+ }
+ void init4Connection(InetSocketAddress address) {
+ this.address = address;
+ if (null != channel && channel.isConnected())
+ channel.disconnect();
+ channel = new Channel();
+ connectionCreated = new CountDownLatch(1);
+ exception = null;
+ readBuffer = connectionThread.getInputBuffer();
+ }
+ void prepare4Connection(SocketChannel socketChannel)
+ throws ProCoreException, IOException {
+ if (isConnected())
+ throw new ProCoreException("Illegal state exception. Already connected.");
+ if (address.isUnresolved())
+ address = new InetSocketAddress(address.getHostName(), address.getPort());
+ if (address.getPort() == 0)
+ throw new ProCoreException("Port 0 not supported as connection address.");
+ channel.prepare4Connection(socketChannel, address);
+ }
+ void wait4Connection() throws InterruptedException {
+ connectionCreated.await();
+ }
+ synchronized void disconnect() {
+ if (!isConnected())
+ return;
+ channel.disconnect();
+ }
+ private void throwIOExceptionFromRead(IOException e)
+ throws IOException {
+ disconnect(); // This ensures that isConnected will be false after this.
+ if (null != e)
+ throw e;
+ throw new IOException("Failed to read.");
+ }
+ // Called by synchronized handleInput.
+ private void sendMessage(ByteBuffer byteBuffer)
+ throws IOException, InterruptedException {
+ int left = byteBuffer.remaining();
+ while (left > 0) {
+ ByteBuffer slice = byteBuffer.slice();
+ int size = Math.min(left, 1000000);
+ slice.limit(size);
+ sendBuffer(slice);
+ int position = byteBuffer.position() + size;
+ byteBuffer.position(position);
+ left -= size;
+ }
+ }
+ private void sendBuffer(ByteBuffer byteBuffer)
+ throws IOException, InterruptedException {
+ int left = byteBuffer.remaining();
+ while (left > 0) {
+ int n = channel.socket().write(byteBuffer);
+ if (n > 0) {
+ left -= n;
+ continue;
+ }
+ // System.err.println("############### YIELD ##############################");
+ Thread.yield();
+ if (left > 0)
+ Logger.defaultLogInfo("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining()
+ + ", write return =" + n);
+ }
+ }
+ static public class MessageHeader {
+ public int token;
+ public int lastTokenIn;
+ public MessageHeader(int token, int lastTokenIn) {
+ this.token = token;
+ this.lastTokenIn = lastTokenIn;
+ }
+ }
+ // Called by synchronized call.
+ protected void call(MessageHeader messageHeader, Method method)
+ throws IOException, InterruptedException {
+ byte[] bytes = new byte[20];
+ ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20);
+ header.order(ByteOrder.LITTLE_ENDIAN);
+ header.putInt(messageHeader.lastTokenIn);
+ header.putInt(messageHeader.token);
+ int messageNumber = method.requestNumber;
+ header.putInt(messageNumber);
+ header.putInt(0); // inflate size, zero if no compression
+ ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order.
+ header.putInt(data.position());
+ header.clear();
+ synchronized (this) {
+ if (DebugPolicy.REPORT_DEBUG_EVENTS)
+ Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber);
+ // Receiver expects header and data to form continuous block.
+ sendMessage(header);
+ data.limit(data.position());
+ data.rewind();
+ sendMessage(data);
+ if (DebugPolicy.REPORT_DEBUG_EVENTS)
+ Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber);
+ }
+ }
+ void sendMethod(MethodQueue mq, Method m, int lastTokenIn)
+ throws ProCoreException, IOException, InterruptedException {
+ m.prepareForSendingRequest();
+ if (DEBUG)
+ Util.showDebug("Setting last token out=" + lastTokenIn);
+ synchronized (this) {
+ MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn);
+ m.setToken(messageHeader.token);
+ mq.add(m);
+ call(messageHeader, m);
+ }
+ }
+ // Called by session manager.
+ void onRead() throws IOException {
+ if (readBuffer.remaining() < 1)
+ throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead."));
+ int n = -1;
+ try {
+ n = channel.socket().read(readBuffer);
+ } catch (IOException e) {
+ throwIOExceptionFromRead(e);
+ }
+ if (n < 0) // Assuming that this is same as broken connection.
+ throwIOExceptionFromRead(new IOException("Failed to read."));
+ else if (n < 1)
+ return; // No data, no error.
+ try {
+ readBuffer = connectionThread.handleInput(readBuffer, n);
+ } catch (Throwable t) {
+ if (t instanceof IOException)
+ throw (IOException)t;
+ else
+ throw new IOException("Throwable from handleInput.", t);
+ }
+ }
+ // Called by session manager.
+ void onConnectFailed(ProCoreException exception) {
+ if (null != exception)
+ this.exception = exception;
+ else
+ this.exception = new NotConnectedException("Failed to create connection.");
+ connectionCreated.countDown();
+ }
+ // Called by session manager.
+ Connection onConnectSucceeded(SelectionKey key) {
+ Connection ret = null;
+ try {
+ channel.connect(key);
+ ret = this;
+ } catch (Throwable t) {
+ exception = new NotConnectedException("Failed to finalize connection.", t);
+ // Connection refused.
+ }
+ connectionCreated.countDown();
+ return ret;
+ }
+ // Called by session manager.
+ void onDisconnect() {
+ disconnect();
+ }
+ boolean isConnected() {
+ return channel.isConnected();
+ }
+// boolean isStopping() {
+// }
+ boolean isExcepted() {
+ return null == exception;
+ }
+ boolean isOk() {
+ return isConnected() && !isExcepted();
+ }
+}
+class HeaderData {
+ int lastTokenIn;
+ int token;
+ int messageNumber;
+ int inflateSize;
+ int deflateSize;
+}
+class ReceiveData {
+ public int lastTokenIn;
+ public int token;
+ public int messageNumber;
+// public DataBuffer inflateBuffer;
+ public ByteBuffer buffer;
+ public int inflateSize;
+ ReceiveData(HeaderData headerData) {
+ lastTokenIn = headerData.lastTokenIn;
+ token = headerData.token;
+ messageNumber = headerData.messageNumber;
+ }
+}
+abstract class MethodHandler {
+ abstract void got(Packet packet, AbstractFunction function) throws ProCoreException;
+ DataBuffer getDataBuffer(Packet packet) {
+ packet.bytes.position(20);
+ DataBuffer db = new DataBuffer(packet.bytes, 0);
+ return db;
+ }
+}
+class ErrorHandler extends MethodHandler {
+ private final ProCoreException exception;
+ ErrorHandler(String s) {
+ if (null == s || s == "")
+ s = "Ilegal argument. Missing description for remote call exception.";
+ this.exception = new ProCoreException(s);
+ }
+ @Override
+ void got(Packet packet, AbstractFunction function) throws ProCoreException {
+ throw exception;
+ }
+}
+class ExceptionHandler extends MethodHandler {
+ @Override
+ void got(Packet packet, AbstractFunction function) throws ProCoreException {
+ packet.bytes.position(20);
+ ExceptionFunction exception = new ExceptionFunction();
+ exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0));
+ throw new ProCoreException(exception.what);
+ }
+}
+class DefaultHandler extends MethodHandler {
+ @Override
+ void got(Packet packet, AbstractFunction function) throws ProCoreException {
+ function.deserialize(function.getResponseNumber(), getDataBuffer(packet));
+ }
+}
+class GetClusterHandler extends MethodHandler {
+ private final GetClusterNewFunction function;
+ public GetClusterHandler(final GetClusterNewFunction function) {
+ this.function = function;
+ }
+ @Override
+ void got(Packet packet, AbstractFunction function) throws ProCoreException {
+ this.function.inflateSize = packet.header.inflateSize;
+ packet.bytes.position(20);
+ packet.bytes.limit(20 + packet.header.deflateSize);
+ this.function.deflated = packet.bytes.compact();
+ this.function.deflated.position(0);
+ this.function.deflated.limit(packet.header.deflateSize);
+ }
+}
+//class AckHandler extends MethodHandler {
+// @Override
+// void got(Packet packet, AbstractFunction function) throws ProCoreException {
+// packet.first.position(20);
+// NullFunction ack = new NullFunction();
+// ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0));
+// Util.showDebug("ack message number=" + ack.messageNumber);
+// }
+//}
+abstract class AbstractMethod {
+// abstract ByteBuffer serialize(ByteOrder byteOrder);
+ protected final int requestNumber;
+ protected final int responseNumber;
+ protected int receivedNumber = 0;
+ boolean hasResponse() {
+ return responseNumber != 0;
+ }
+ private int token = 0;
+ void setToken(int token) {
+ this.token = token;
+ }
+ int getToken() {
+ return token;
+ }
+ AbstractMethod(int requestNumber, int responseNumber) {
+ this.requestNumber = requestNumber;
+ this.responseNumber = responseNumber;
+ }
+ void prepareForSendingRequest() {
+ receivedNumber = 0;
+ }
+}
+class Method extends AbstractMethod {
+ private final ExceptionHandler exceptionHandler = new ExceptionHandler();
+ private final AbstractFunction function;
+ private Packet packet = null;
+ private CountDownLatch gotPacket = new CountDownLatch(1);
+ private MethodHandler handler = new DefaultHandler();
+ private final EventHandler eventHandler;
+ Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) {
+ super(function.getRequestNumber(), function.getResponseNumber());
+ this.eventHandler = eventHandler;
+ if (null != handler)
+ this.handler = handler;
+ switch (function.getRequestNumber()) {
+ default:
+ break;
+ case MessageNumber.AcceptCommitRequest:
+ case MessageNumber.UpdateClusterRequest:
+ gotPacket.countDown();
+ break;
+ }
+ this.function = function;
+ } // MARK: waitforReply
+ void waitForReply(Connection c) throws ProCoreException, InterruptedException {
+ while (gotPacket.getCount() > 0) {
+ gotPacket.await(30, TimeUnit.SECONDS);
+ if (gotPacket.getCount() > 0) {
+ System.err.println("waitForReply timeout:");
+ System.err.println("wait token=" + getToken());
+ // MARK: c.onWait(getToken());
+ }
+ }
+ if (null != packet)
+ handler.got(packet, function);
+ }
+ EventHandler getEventHandler(EventHandler eventHandler) {
+ if (null != this.eventHandler)
+ return this.eventHandler;
+ else
+ return eventHandler;
+ }
+ ByteBuffer serialize(ByteOrder byteOrder) {
+ DataBuffer db = function.serialize(byteOrder);
+ return db.getByteBuffer();
+ }
+ void gotPacket(Packet packet) {
+ this.packet = packet;
+ gotPacket.countDown();
+ }
+ void gotException(String s) {
+ handler = new ErrorHandler(s);
+ gotPacket.countDown();
+ }
+ void gotException(Packet packet) {
+ this.packet = packet;
+ handler = exceptionHandler;
+ gotPacket.countDown();
+ }
+// void gotAck(Packet packet) {
+// this.packet = packet;
+// handler = new AckHandler();
+// gotPacket.countDown();
+// }
+}
+class DebugPolicy {
+ static final boolean REPORT_DEBUG_EVENTS = false;
+ static final boolean REPORT_SERVER_EVENTS = false;
+ static final boolean REPORT_DEBUG_OPERATIONS = false;
+ static final boolean REPORT_TIME_CONSUMING_REQUESTS = false;
+ static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false;
+
+ private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS;
+ private static final long LONG_EXECUTION_REPORT_PERIOD = 10000;
+ static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD);
+
+ // Time to wait for an event (in milliseconds).
+ // Code logic should assume that this wait never expires.
+ // Should be big enough so that if this time is expired it indicates an error situation.
+ private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour
+ private static final boolean REPORT_WAIT_EVENTS = true;
+ private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true;
+ /**
+ * Calls object.wait.
+ * @param waitMs time to wait in milliseconds.
+ * @param object to wait for (used for calling object.wait)
+ * @param string text used in debug prints.
+ * @see java.lang.Object.wait
+ */
+ static final void wait(long waitMs, Object object, String string) {
+ waitImpl(waitMs, object, string);
+ }
+ /**
+ * Calls object.wait.
+ * @param object to wait for (used for calling object.wait)
+ * @param string text used in debug prints.
+ * @see java.lang.Object.wait
+ */
+ static final void wait(Object object, String string) {
+ waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string);
+ }
+ private static final void report(String string, Object object) {
+ String msg = "DEBUG: " + string
+ + " " + object.getClass().getSimpleName()
+ + " hid=" + System.identityHashCode(object);
+ if (REPORT_WAIT_EVENTS_VERBOSE)
+ new Throwable(msg).printStackTrace();
+ else
+ System.err.println(msg);
+ }
+ private static final void waitImpl(long waitMs, Object object, String string) {
+ if (REPORT_WAIT_EVENTS)
+ report("Wait for event started.", object);
+ try {
+ object.wait(waitMs); // milliseconds
+ } catch (InterruptedException e) {
+ if (REPORT_WAIT_EVENTS)
+ report("Wait for event interrupted.", object);
+ } finally {
+ if (REPORT_WAIT_EVENTS)
+ report("Wait for event stopped.", object);
+ }
+ }
+}