--- /dev/null
+package org.simantics.db.server.internal;\r
+\r
+import java.io.IOException;\r
+import java.net.InetSocketAddress;\r
+import java.nio.ByteBuffer;\r
+import java.nio.ByteOrder;\r
+import java.nio.channels.CancelledKeyException;\r
+import java.nio.channels.ClosedChannelException;\r
+import java.nio.channels.SelectionKey;\r
+import java.nio.channels.Selector;\r
+import java.nio.channels.SocketChannel;\r
+import java.util.Collections;\r
+import java.util.Iterator;\r
+import java.util.LinkedList;\r
+import java.util.List;\r
+import java.util.concurrent.CountDownLatch;\r
+import java.util.concurrent.TimeUnit;\r
+\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.server.ProCoreException;\r
+import org.simantics.db.server.protocol.AbstractFunction;\r
+import org.simantics.db.server.protocol.DataBuffer;\r
+import org.simantics.db.server.protocol.ExceptionFunction;\r
+import org.simantics.db.server.protocol.GetClusterNewFunction;\r
+import org.simantics.db.server.protocol.MessageNumber;\r
+\r
+import gnu.trove.iterator.TIntObjectIterator;\r
+import gnu.trove.map.hash.TIntObjectHashMap;\r
+\r
+public class ConnectionManager {\r
+ private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS;\r
+ private static final ConnectionManager connectionManager = new ConnectionManager();\r
+ public static ConnectionManager getInstance() {\r
+ return connectionManager.instance();\r
+ }\r
+ private final Manager manager = new Manager();\r
+ private final Thread thread = new Thread(manager, "Connection Manager");\r
+ private boolean stop = false;\r
+ protected List<Connection> connections = Collections.synchronizedList(new LinkedList<Connection>());\r
+ private ConnectionManager() {\r
+ Util.trace("ConnectionManager.new");\r
+ }\r
+ private ConnectionManager instance() {\r
+ if (!thread.isAlive())\r
+ start();\r
+ return this;\r
+ }\r
+ protected class Manager implements Runnable {\r
+ private Selector selector;\r
+ private CountDownLatch selectorOpen = new CountDownLatch(1);\r
+ Manager() {\r
+ }\r
+ public void run() {\r
+ try {\r
+ selector = Selector.open();\r
+ selectorOpen.countDown();\r
+ while (true) {\r
+ int n = selector.select();\r
+ try {\r
+ if (n > 0)\r
+ processSelected();\r
+ processConnectors();\r
+ if (stop) {\r
+ selector.close();\r
+ return;\r
+ }\r
+ } catch (CancelledKeyException e) {\r
+ Logger.defaultLogError("Cancelled key in select loop.", e);\r
+ }\r
+ }\r
+ } catch (Throwable x) {\r
+ Logger.defaultLogError("Select loop failed.", x);\r
+ }\r
+ }\r
+ void processConnectors() throws ClosedChannelException {\r
+ while (connections.size() > 0) {\r
+ Connection c = connections.remove(0);\r
+ c.register(selector);\r
+ }\r
+ }\r
+ void processSelected() {\r
+ Iterator<SelectionKey> i = selector.selectedKeys().iterator();\r
+ while (i.hasNext()) {\r
+ SelectionKey sk = (SelectionKey)i.next();\r
+ i.remove();\r
+ try {\r
+ if (!sk.isValid())\r
+ processDisconnect(sk);\r
+ else if (sk.isConnectable())\r
+ processConnect(sk);\r
+ else if (sk.isReadable())\r
+ processRead(sk);\r
+ } catch (CancelledKeyException e) {\r
+ processDisconnect(sk);\r
+ }\r
+ }\r
+ }\r
+ private void processDisconnect(SelectionKey key) {\r
+ Connection c = (Connection)key.attachment();\r
+ if (null == c)\r
+ return;\r
+ c.onDisconnect();\r
+ key.attach(null); // Just to make sure that reference to connection is released.\r
+ key.cancel();\r
+ }\r
+ private void processConnect(SelectionKey key) {\r
+ Connection connection = (Connection)key.attachment();\r
+ SocketChannel sc = (SocketChannel)key.channel();\r
+ try {\r
+ if (sc.finishConnect()) {\r
+ int ops = key.interestOps() & ~SelectionKey.OP_CONNECT;\r
+ ops |= SelectionKey.OP_READ;\r
+ key.interestOps(ops);\r
+ Connection c = connection.onConnectSucceeded(key);\r
+ if (c == null) { // Connector refused to accept connection.\r
+ key.attach(null); // Just to make sure that reference to connector is released.\r
+ key.cancel();\r
+ sc.close();\r
+ } else\r
+ key.attach(c);\r
+ }\r
+ } catch (Throwable x) {\r
+ try {\r
+ if (key.isValid())\r
+ key.attach(null); // Just to make sure that reference to connector is released.\r
+ key.cancel();\r
+ try {\r
+ sc.close();\r
+ } catch (IOException e) {\r
+ if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used.\r
+ Logger.defaultLogError("Failed to close connection.", e);\r
+ }\r
+ } finally {\r
+ connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x));\r
+ }\r
+ }\r
+ }\r
+ }\r
+ private void processRead(SelectionKey key) {\r
+ Connection c = (Connection)key.attachment();\r
+ SocketChannel sc = (SocketChannel)key.channel();\r
+ try {\r
+ int ops = key.interestOps();\r
+ ops |= SelectionKey.OP_READ;\r
+ key.interestOps(ops);\r
+ c.onRead();\r
+ } catch (IOException x) {\r
+ if (key.isValid()) {\r
+ key.attach(null); // Just to make sure that reference to connection is released.\r
+ key.cancel();\r
+ }\r
+ if (sc.isOpen())\r
+ try {\r
+ sc.close();\r
+ } catch (IOException e) {\r
+ }\r
+ }\r
+ }\r
+ void connect(Connection connection) throws ProCoreException {\r
+ try {\r
+ if (connections.contains(connection))\r
+ throw new ProCoreException("Connection already registered. connection=" + connection);\r
+ connection.prepare4Connection(SocketChannel.open());\r
+ connections.add(connection);\r
+ manager.selector.wakeup();\r
+ connection.wait4Connection();\r
+ } catch(IOException e) {\r
+ throw new ConnectionException("Failed to connect to " + connection + ".", e);\r
+ } catch (InterruptedException e) {\r
+ if (!connection.isConnected())\r
+ throw new NotConnectedException("Connection interrupted to " + connection + ".");\r
+ }\r
+ }\r
+ private void start() {\r
+ thread.start();\r
+ boolean once = true;\r
+ while (manager.selectorOpen.getCount() > 0) {\r
+ try {\r
+ manager.selectorOpen.await();\r
+ } catch (InterruptedException e) {\r
+ if (once) {\r
+ once = false;\r
+ Util.logError("Wait for selector open interrupted. Continuing wait.", e);\r
+ }\r
+ }\r
+ }\r
+ }\r
+ public void stop() {\r
+ stop = true;\r
+ manager.selector.wakeup();\r
+ }\r
+}\r
+//class MethodQueue {\r
+// private LinkedList<Method> methods = new LinkedList<Method>();\r
+// MethodQueue() {\r
+// }\r
+// public synchronized int size() {\r
+// return methods.size();\r
+// }\r
+// public synchronized void addFirst(Method method) {\r
+// methods.addFirst(method);\r
+// }\r
+// public synchronized void addLast(Method method) {\r
+// methods.addLast(method);\r
+// }\r
+// public synchronized Method removeFirst() {\r
+// try {\r
+// Method method = methods.removeFirst();\r
+// return method;\r
+// } catch (NoSuchElementException e) {\r
+// return null;\r
+// }\r
+// }\r
+// public synchronized void close() {\r
+// while (size() > 0) {\r
+// Method method = removeFirst();\r
+// method.setExceptionText("Closing.");\r
+// method.gotResponse();\r
+// }\r
+// }\r
+//\r
+//}\r
+//class RequestQueue {\r
+// private LinkedList<AbstractFunction> functions = new LinkedList<AbstractFunction>();\r
+//\r
+// RequestQueue() {\r
+// }\r
+// public synchronized int size() {\r
+// return functions.size();\r
+// }\r
+// public synchronized void addFirst(AbstractFunction e) {\r
+// functions.addFirst(e);\r
+// }\r
+// public synchronized void addLast(AbstractFunction e) {\r
+// functions.addLast(e);\r
+// }\r
+// public synchronized AbstractFunction removeFirst() {\r
+// try {\r
+// AbstractFunction ret = functions.removeFirst();\r
+// return ret;\r
+// } catch (NoSuchElementException e) {\r
+// return null;\r
+// }\r
+// }\r
+//}\r
+class MethodQueue {\r
+ private TIntObjectHashMap<Method> methodMap = new TIntObjectHashMap<Method>();\r
+ MethodQueue() {\r
+ }\r
+ public synchronized int size() {\r
+ return methodMap.size();\r
+ }\r
+ public synchronized boolean isEmpty() {\r
+ return 0 == size();\r
+ }\r
+ public synchronized void add(Method method) throws ProCoreException {\r
+ int token = method.getToken();\r
+ if (0 == token)\r
+ throw new ProCoreException("Illegal token number.");\r
+ Method oldMethod = methodMap.put(token, method);\r
+ if (null != oldMethod)\r
+ throw new ProCoreException("Duplicate token number.");\r
+ }\r
+ public synchronized Method remove(int token) {\r
+ Method method = methodMap.remove(token);\r
+ return method;\r
+ }\r
+ synchronized void close() {\r
+ try {\r
+ TIntObjectIterator<Method> iterator = methodMap.iterator();\r
+ while (iterator.hasNext()) {\r
+ iterator.advance();\r
+ Method method = iterator.value();\r
+ if (null == method)\r
+ Util.logError("Null function in method queue.");\r
+ method.gotException("Method queue closing.");\r
+ }\r
+ } finally {\r
+ methodMap.clear();\r
+ }\r
+ }\r
+// public synchronized Method removeAny() throws ProCoreException {\r
+// TIntObjectIterator<Method> iterator = methodMap.iterator();\r
+// if (!iterator.hasNext())\r
+// return null;\r
+// iterator.advance();\r
+// Method method = iterator.value();\r
+// if (null == method)\r
+// throw new ProCoreException("Null function in response queue.");\r
+// methodMap.remove(method.getToken());\r
+// return method;\r
+// }\r
+}\r
+class Channel {\r
+ private SelectionKey selectionKey = null;\r
+ private SocketChannel socketChannel = null;\r
+ Channel() {\r
+ }\r
+ void register(Selector selector, Connection connection)\r
+ throws ClosedChannelException {\r
+ if (null == socketChannel)\r
+ throw new ClosedChannelException();\r
+ socketChannel.register(selector, SelectionKey.OP_CONNECT, connection);\r
+ }\r
+ void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address)\r
+ throws ProCoreException, IOException {\r
+ if (null == socketChannel)\r
+ throw new NotConnectedException("Illegal argument. No socket channel to connect to.");\r
+ if (null == address)\r
+ throw new NotConnectedException("Illegal argument. No address to connect to.");\r
+ if (null != this.socketChannel)\r
+ throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this);\r
+ socketChannel.configureBlocking(false);\r
+ boolean connected = socketChannel.connect(address);\r
+ if (connected)\r
+ if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
+ Util.showDebug("Connected to " + socketChannel + ".");\r
+ this.socketChannel = socketChannel; // Waiting for connect.\r
+ }\r
+// private static final int IPTOS_LOWDELAY = 0x10;\r
+ synchronized void connect(SelectionKey key) {\r
+ this.selectionKey = key;\r
+ this.socketChannel = (SocketChannel)key.channel();\r
+ if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
+ Util.showDebug("Connected to " + socketChannel + ".");\r
+// try {\r
+// this.socketChannel.socket().setKeepAlive(true);\r
+// this.socketChannel.socket().setSoLinger(true, 1);\r
+// } catch (SocketException e) {\r
+// e.printStackTrace();\r
+// }\r
+// try {\r
+// this.socketChannel.socket().setTcpNoDelay(true);\r
+ // Crashes Kalle!\r
+ // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY);\r
+// } catch (SocketException e) {\r
+// e.printStackTrace();\r
+// }\r
+ }\r
+ synchronized void disconnect() {\r
+ if (selectionKey == null)\r
+ return;\r
+ try {\r
+ try {\r
+ if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
+ Util.showDebug("Disconnected from " + socketChannel);\r
+ selectionKey.interestOps(0);\r
+ selectionKey.cancel();\r
+ selectionKey.selector().wakeup();\r
+ } catch (CancelledKeyException e) {\r
+ if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
+ Logger.defaultLogError("Failed to disconnect cleanly.", e);\r
+ } catch (IllegalArgumentException e) {\r
+ if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
+ Logger.defaultLogError("Failed to disconnect cleanly.", e);\r
+ }\r
+ try { // Note that this must be after the selectionKey operations above.\r
+ socketChannel.socket().close();\r
+ } catch (IOException e) {\r
+ if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
+ Logger.defaultLogError("Failed to close socket cleanly.", e);\r
+ }\r
+ } finally {\r
+ selectionKey = null;\r
+ socketChannel = null;\r
+ }\r
+ }\r
+ synchronized boolean isConnected() {\r
+ boolean ret = selectionKey != null && socketChannel.isConnected();\r
+ return ret;\r
+ }\r
+ synchronized SocketChannel socket() throws IOException {\r
+ if (null == socketChannel)\r
+ throw new IOException("Socket channel is null.");\r
+ return socketChannel;\r
+ }\r
+ synchronized void listenReadEvents()\r
+ throws IOException {\r
+ if (null == selectionKey)\r
+ throw new IOException("Trying to listen connection which is not connected.");\r
+ int ops = selectionKey.interestOps() | SelectionKey.OP_READ;\r
+ selectionKey.interestOps(ops);\r
+ selectionKey.selector().wakeup();\r
+ }\r
+}\r
+class Connection {\r
+ private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS;\r
+ private static int lastSentTokenNumber = 0;\r
+ private final ConnectionThread connectionThread;\r
+ private InetSocketAddress address;\r
+ private Channel channel;\r
+ private CountDownLatch connectionCreated;\r
+ private ProCoreException exception;\r
+ private ByteBuffer readBuffer;\r
+ Connection(ConnectionThread connectionThread) {\r
+ this.connectionThread = connectionThread;\r
+ init4Connection(InetSocketAddress.createUnresolved("", 0));\r
+ }\r
+ @Override\r
+ public String toString() {\r
+ return "Connection address=" + address + " token=" + lastSentTokenNumber;\r
+ }\r
+ boolean addressNotInitialized() {\r
+ return address.equals(InetSocketAddress.createUnresolved("", 0));\r
+ }\r
+ void register(Selector selector)\r
+ throws ClosedChannelException {\r
+ channel.register(selector, this);\r
+ }\r
+ InetSocketAddress getAddress() {\r
+ return address;\r
+ }\r
+ boolean equalsAddress(InetSocketAddress address) {\r
+ return address.equals(address);\r
+ }\r
+ void init4Connection(InetSocketAddress address) {\r
+ this.address = address;\r
+ if (null != channel && channel.isConnected())\r
+ channel.disconnect();\r
+ channel = new Channel();\r
+ connectionCreated = new CountDownLatch(1);\r
+ exception = null;\r
+ readBuffer = connectionThread.getInputBuffer();\r
+ }\r
+ void prepare4Connection(SocketChannel socketChannel)\r
+ throws ProCoreException, IOException {\r
+ if (isConnected())\r
+ throw new ProCoreException("Illegal state exception. Already connected.");\r
+ if (address.isUnresolved())\r
+ address = new InetSocketAddress(address.getHostName(), address.getPort());\r
+ if (address.getPort() == 0)\r
+ throw new ProCoreException("Port 0 not supported as connection address.");\r
+ channel.prepare4Connection(socketChannel, address);\r
+ }\r
+ void wait4Connection() throws InterruptedException {\r
+ connectionCreated.await();\r
+ }\r
+ synchronized void disconnect() {\r
+ if (!isConnected())\r
+ return;\r
+ channel.disconnect();\r
+ }\r
+ private void throwIOExceptionFromRead(IOException e)\r
+ throws IOException {\r
+ disconnect(); // This ensures that isConnected will be false after this.\r
+ if (null != e)\r
+ throw e;\r
+ throw new IOException("Failed to read.");\r
+ }\r
+ // Called by synchronized handleInput.\r
+ private void sendMessage(ByteBuffer byteBuffer)\r
+ throws IOException, InterruptedException {\r
+ int left = byteBuffer.remaining();\r
+ while (left > 0) {\r
+ ByteBuffer slice = byteBuffer.slice();\r
+ int size = Math.min(left, 1000000);\r
+ slice.limit(size);\r
+ sendBuffer(slice);\r
+ int position = byteBuffer.position() + size;\r
+ byteBuffer.position(position);\r
+ left -= size;\r
+ }\r
+ }\r
+ private void sendBuffer(ByteBuffer byteBuffer)\r
+ throws IOException, InterruptedException {\r
+ int left = byteBuffer.remaining();\r
+ while (left > 0) {\r
+ int n = channel.socket().write(byteBuffer);\r
+ if (n > 0) {\r
+ left -= n;\r
+ continue;\r
+ }\r
+ // System.err.println("############### YIELD ##############################");\r
+ Thread.yield();\r
+ if (left > 0)\r
+ Logger.defaultLogTrace("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining()\r
+ + ", write return =" + n);\r
+ }\r
+ }\r
+ static public class MessageHeader {\r
+ public int token;\r
+ public int lastTokenIn;\r
+ public MessageHeader(int token, int lastTokenIn) {\r
+ this.token = token;\r
+ this.lastTokenIn = lastTokenIn;\r
+ }\r
+ }\r
+ // Called by synchronized call.\r
+ protected void call(MessageHeader messageHeader, Method method)\r
+ throws IOException, InterruptedException {\r
+ byte[] bytes = new byte[20];\r
+ ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20);\r
+ header.order(ByteOrder.LITTLE_ENDIAN);\r
+ header.putInt(messageHeader.lastTokenIn);\r
+ header.putInt(messageHeader.token);\r
+ int messageNumber = method.requestNumber;\r
+ header.putInt(messageNumber);\r
+ header.putInt(0); // inflate size, zero if no compression\r
+ ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order.\r
+ header.putInt(data.position());\r
+ header.clear();\r
+ synchronized (this) {\r
+ if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
+ Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber);\r
+ // Receiver expects header and data to form continuous block.\r
+ sendMessage(header);\r
+ data.limit(data.position());\r
+ data.rewind();\r
+ sendMessage(data);\r
+ if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
+ Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber);\r
+ }\r
+ }\r
+ void sendMethod(MethodQueue mq, Method m, int lastTokenIn)\r
+ throws ProCoreException, IOException, InterruptedException {\r
+ m.prepareForSendingRequest();\r
+ if (DEBUG)\r
+ Util.showDebug("Setting last token out=" + lastTokenIn);\r
+ synchronized (this) {\r
+ MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn);\r
+ m.setToken(messageHeader.token);\r
+ mq.add(m);\r
+ call(messageHeader, m);\r
+ }\r
+ }\r
+ // Called by session manager.\r
+ void onRead() throws IOException {\r
+ if (readBuffer.remaining() < 1)\r
+ throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead."));\r
+ int n = -1;\r
+ try {\r
+ n = channel.socket().read(readBuffer);\r
+ } catch (IOException e) {\r
+ throwIOExceptionFromRead(e);\r
+ }\r
+ if (n < 0) // Assuming that this is same as broken connection.\r
+ throwIOExceptionFromRead(new IOException("Failed to read."));\r
+ else if (n < 1)\r
+ return; // No data, no error.\r
+ try {\r
+ readBuffer = connectionThread.handleInput(readBuffer, n);\r
+ } catch (Throwable t) {\r
+ if (t instanceof IOException)\r
+ throw (IOException)t;\r
+ else\r
+ throw new IOException("Throwable from handleInput.", t);\r
+ }\r
+ }\r
+ // Called by session manager.\r
+ void onConnectFailed(ProCoreException exception) {\r
+ if (null != exception)\r
+ this.exception = exception;\r
+ else\r
+ this.exception = new NotConnectedException("Failed to create connection.");\r
+ connectionCreated.countDown();\r
+ }\r
+ // Called by session manager.\r
+ Connection onConnectSucceeded(SelectionKey key) {\r
+ Connection ret = null;\r
+ try {\r
+ channel.connect(key);\r
+ ret = this;\r
+ } catch (Throwable t) {\r
+ exception = new NotConnectedException("Failed to finalize connection.", t);\r
+ // Connection refused.\r
+ }\r
+ connectionCreated.countDown();\r
+ return ret;\r
+ }\r
+ // Called by session manager.\r
+ void onDisconnect() {\r
+ disconnect();\r
+ }\r
+ boolean isConnected() {\r
+ return channel.isConnected();\r
+ }\r
+// boolean isStopping() {\r
+// }\r
+ boolean isExcepted() {\r
+ return null == exception;\r
+ }\r
+ boolean isOk() {\r
+ return isConnected() && !isExcepted();\r
+ }\r
+}\r
+class HeaderData {\r
+ int lastTokenIn;\r
+ int token;\r
+ int messageNumber;\r
+ int inflateSize;\r
+ int deflateSize;\r
+}\r
+class ReceiveData {\r
+ public int lastTokenIn;\r
+ public int token;\r
+ public int messageNumber;\r
+// public DataBuffer inflateBuffer;\r
+ public ByteBuffer buffer;\r
+ public int inflateSize;\r
+ ReceiveData(HeaderData headerData) {\r
+ lastTokenIn = headerData.lastTokenIn;\r
+ token = headerData.token;\r
+ messageNumber = headerData.messageNumber;\r
+ }\r
+}\r
+abstract class MethodHandler {\r
+ abstract void got(Packet packet, AbstractFunction function) throws ProCoreException;\r
+ DataBuffer getDataBuffer(Packet packet) {\r
+ packet.bytes.position(20);\r
+ DataBuffer db = new DataBuffer(packet.bytes, 0);\r
+ return db;\r
+ }\r
+}\r
+class ErrorHandler extends MethodHandler {\r
+ private final ProCoreException exception;\r
+ ErrorHandler(String s) {\r
+ if (null == s || s == "")\r
+ s = "Ilegal argument. Missing description for remote call exception.";\r
+ this.exception = new ProCoreException(s);\r
+ }\r
+ @Override\r
+ void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
+ throw exception;\r
+ }\r
+}\r
+class ExceptionHandler extends MethodHandler {\r
+ @Override\r
+ void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
+ packet.bytes.position(20);\r
+ ExceptionFunction exception = new ExceptionFunction();\r
+ exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0));\r
+ throw new ProCoreException(exception.what);\r
+ }\r
+}\r
+class DefaultHandler extends MethodHandler {\r
+ @Override\r
+ void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
+ function.deserialize(function.getResponseNumber(), getDataBuffer(packet));\r
+ }\r
+}\r
+class GetClusterHandler extends MethodHandler {\r
+ private final GetClusterNewFunction function;\r
+ public GetClusterHandler(final GetClusterNewFunction function) {\r
+ this.function = function;\r
+ }\r
+ @Override\r
+ void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
+ this.function.inflateSize = packet.header.inflateSize;\r
+ packet.bytes.position(20);\r
+ packet.bytes.limit(20 + packet.header.deflateSize);\r
+ this.function.deflated = packet.bytes.compact();\r
+ this.function.deflated.position(0);\r
+ this.function.deflated.limit(packet.header.deflateSize);\r
+ }\r
+}\r
+//class AckHandler extends MethodHandler {\r
+// @Override\r
+// void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
+// packet.first.position(20);\r
+// NullFunction ack = new NullFunction();\r
+// ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0));\r
+// Util.showDebug("ack message number=" + ack.messageNumber);\r
+// }\r
+//}\r
+abstract class AbstractMethod {\r
+// abstract ByteBuffer serialize(ByteOrder byteOrder);\r
+ protected final int requestNumber;\r
+ protected final int responseNumber;\r
+ protected int receivedNumber = 0;\r
+ boolean hasResponse() {\r
+ return responseNumber != 0;\r
+ }\r
+ private int token = 0;\r
+ void setToken(int token) {\r
+ this.token = token;\r
+ }\r
+ int getToken() {\r
+ return token;\r
+ }\r
+ AbstractMethod(int requestNumber, int responseNumber) {\r
+ this.requestNumber = requestNumber;\r
+ this.responseNumber = responseNumber;\r
+ }\r
+ void prepareForSendingRequest() {\r
+ receivedNumber = 0;\r
+ }\r
+}\r
+class Method extends AbstractMethod {\r
+ private final ExceptionHandler exceptionHandler = new ExceptionHandler();\r
+ private final AbstractFunction function;\r
+ private Packet packet = null;\r
+ private CountDownLatch gotPacket = new CountDownLatch(1);\r
+ private MethodHandler handler = new DefaultHandler();\r
+ private final EventHandler eventHandler;\r
+ Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) {\r
+ super(function.getRequestNumber(), function.getResponseNumber());\r
+ this.eventHandler = eventHandler;\r
+ if (null != handler)\r
+ this.handler = handler;\r
+ switch (function.getRequestNumber()) {\r
+ default:\r
+ break;\r
+ case MessageNumber.AcceptCommitRequest:\r
+ case MessageNumber.UpdateClusterRequest:\r
+ gotPacket.countDown();\r
+ break;\r
+ }\r
+ this.function = function;\r
+ } // MARK: waitforReply\r
+ void waitForReply(Connection c) throws ProCoreException, InterruptedException {\r
+ while (gotPacket.getCount() > 0) {\r
+ gotPacket.await(30, TimeUnit.SECONDS);\r
+ if (gotPacket.getCount() > 0) {\r
+ System.err.println("waitForReply timeout:");\r
+ System.err.println("wait token=" + getToken());\r
+ // MARK: c.onWait(getToken());\r
+ }\r
+ }\r
+ if (null != packet)\r
+ handler.got(packet, function);\r
+ }\r
+ EventHandler getEventHandler(EventHandler eventHandler) {\r
+ if (null != this.eventHandler)\r
+ return this.eventHandler;\r
+ else\r
+ return eventHandler;\r
+ }\r
+ ByteBuffer serialize(ByteOrder byteOrder) {\r
+ DataBuffer db = function.serialize(byteOrder);\r
+ return db.getByteBuffer();\r
+ }\r
+ void gotPacket(Packet packet) {\r
+ this.packet = packet;\r
+ gotPacket.countDown();\r
+ }\r
+ void gotException(String s) {\r
+ handler = new ErrorHandler(s);\r
+ gotPacket.countDown();\r
+ }\r
+ void gotException(Packet packet) {\r
+ this.packet = packet;\r
+ handler = exceptionHandler;\r
+ gotPacket.countDown();\r
+ }\r
+// void gotAck(Packet packet) {\r
+// this.packet = packet;\r
+// handler = new AckHandler();\r
+// gotPacket.countDown();\r
+// }\r
+}\r
+class DebugPolicy {\r
+ static final boolean REPORT_DEBUG_EVENTS = false;\r
+ static final boolean REPORT_SERVER_EVENTS = false;\r
+ static final boolean REPORT_DEBUG_OPERATIONS = false;\r
+ static final boolean REPORT_TIME_CONSUMING_REQUESTS = false;\r
+ static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false;\r
+\r
+ private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS;\r
+ private static final long LONG_EXECUTION_REPORT_PERIOD = 10000;\r
+ static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD);\r
+\r
+ // Time to wait for an event (in milliseconds).\r
+ // Code logic should assume that this wait never expires.\r
+ // Should be big enough so that if this time is expired it indicates an error situation.\r
+ private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour\r
+ private static final boolean REPORT_WAIT_EVENTS = true;\r
+ private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true;\r
+ /**\r
+ * Calls object.wait.\r
+ * @param waitMs time to wait in milliseconds.\r
+ * @param object to wait for (used for calling object.wait)\r
+ * @param string text used in debug prints.\r
+ * @see java.lang.Object.wait\r
+ */\r
+ static final void wait(long waitMs, Object object, String string) {\r
+ waitImpl(waitMs, object, string);\r
+ }\r
+ /**\r
+ * Calls object.wait.\r
+ * @param object to wait for (used for calling object.wait)\r
+ * @param string text used in debug prints.\r
+ * @see java.lang.Object.wait\r
+ */\r
+ static final void wait(Object object, String string) {\r
+ waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string);\r
+ }\r
+ private static final void report(String string, Object object) {\r
+ String msg = "DEBUG: " + string\r
+ + " " + object.getClass().getSimpleName()\r
+ + " hid=" + System.identityHashCode(object);\r
+ if (REPORT_WAIT_EVENTS_VERBOSE)\r
+ new Throwable(msg).printStackTrace();\r
+ else\r
+ System.err.println(msg);\r
+ }\r
+ private static final void waitImpl(long waitMs, Object object, String string) {\r
+ if (REPORT_WAIT_EVENTS)\r
+ report("Wait for event started.", object);\r
+ try {\r
+ object.wait(waitMs); // milliseconds\r
+ } catch (InterruptedException e) {\r
+ if (REPORT_WAIT_EVENTS)\r
+ report("Wait for event interrupted.", object);\r
+ } finally {\r
+ if (REPORT_WAIT_EVENTS)\r
+ report("Wait for event stopped.", object);\r
+ }\r
+ }\r
+}\r