package org.simantics.db.server.internal; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.simantics.db.common.utils.Logger; import org.simantics.db.server.ProCoreException; import org.simantics.db.server.protocol.AbstractFunction; import org.simantics.db.server.protocol.DataBuffer; import org.simantics.db.server.protocol.ExceptionFunction; import org.simantics.db.server.protocol.GetClusterNewFunction; import org.simantics.db.server.protocol.MessageNumber; import gnu.trove.iterator.TIntObjectIterator; import gnu.trove.map.hash.TIntObjectHashMap; public class ConnectionManager { private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS; private static final ConnectionManager connectionManager = new ConnectionManager(); public static ConnectionManager getInstance() { return connectionManager.instance(); } private final Manager manager = new Manager(); private final Thread thread = new Thread(manager, "Connection Manager"); private boolean stop = false; protected List connections = Collections.synchronizedList(new LinkedList()); private ConnectionManager() { Util.trace("ConnectionManager.new"); } private ConnectionManager instance() { if (!thread.isAlive()) start(); return this; } protected class Manager implements Runnable { private Selector selector; private CountDownLatch selectorOpen = new CountDownLatch(1); Manager() { } public void run() { try { selector = Selector.open(); selectorOpen.countDown(); while (true) { int n = selector.select(); try { if (n > 0) processSelected(); processConnectors(); if (stop) { selector.close(); return; } } catch (CancelledKeyException e) { Logger.defaultLogError("Cancelled key in select loop.", e); } } } catch (Throwable x) { Logger.defaultLogError("Select loop failed.", x); } } void processConnectors() throws ClosedChannelException { while (connections.size() > 0) { Connection c = connections.remove(0); c.register(selector); } } void processSelected() { Iterator i = selector.selectedKeys().iterator(); while (i.hasNext()) { SelectionKey sk = (SelectionKey)i.next(); i.remove(); try { if (!sk.isValid()) processDisconnect(sk); else if (sk.isConnectable()) processConnect(sk); else if (sk.isReadable()) processRead(sk); } catch (CancelledKeyException e) { processDisconnect(sk); } } } private void processDisconnect(SelectionKey key) { Connection c = (Connection)key.attachment(); if (null == c) return; c.onDisconnect(); key.attach(null); // Just to make sure that reference to connection is released. key.cancel(); } private void processConnect(SelectionKey key) { Connection connection = (Connection)key.attachment(); SocketChannel sc = (SocketChannel)key.channel(); try { if (sc.finishConnect()) { int ops = key.interestOps() & ~SelectionKey.OP_CONNECT; ops |= SelectionKey.OP_READ; key.interestOps(ops); Connection c = connection.onConnectSucceeded(key); if (c == null) { // Connector refused to accept connection. key.attach(null); // Just to make sure that reference to connector is released. key.cancel(); sc.close(); } else key.attach(c); } } catch (Throwable x) { try { if (key.isValid()) key.attach(null); // Just to make sure that reference to connector is released. key.cancel(); try { sc.close(); } catch (IOException e) { if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used. Logger.defaultLogError("Failed to close connection.", e); } } finally { connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x)); } } } } private void processRead(SelectionKey key) { Connection c = (Connection)key.attachment(); SocketChannel sc = (SocketChannel)key.channel(); try { int ops = key.interestOps(); ops |= SelectionKey.OP_READ; key.interestOps(ops); c.onRead(); } catch (IOException x) { if (key.isValid()) { key.attach(null); // Just to make sure that reference to connection is released. key.cancel(); } if (sc.isOpen()) try { sc.close(); } catch (IOException e) { } } } void connect(Connection connection) throws ProCoreException { try { if (connections.contains(connection)) throw new ProCoreException("Connection already registered. connection=" + connection); connection.prepare4Connection(SocketChannel.open()); connections.add(connection); manager.selector.wakeup(); connection.wait4Connection(); } catch(IOException e) { throw new ConnectionException("Failed to connect to " + connection + ".", e); } catch (InterruptedException e) { if (!connection.isConnected()) throw new NotConnectedException("Connection interrupted to " + connection + "."); } } private void start() { thread.start(); boolean once = true; while (manager.selectorOpen.getCount() > 0) { try { manager.selectorOpen.await(); } catch (InterruptedException e) { if (once) { once = false; Util.logError("Wait for selector open interrupted. Continuing wait.", e); } } } } public void stop() { stop = true; manager.selector.wakeup(); } } //class MethodQueue { // private LinkedList methods = new LinkedList(); // MethodQueue() { // } // public synchronized int size() { // return methods.size(); // } // public synchronized void addFirst(Method method) { // methods.addFirst(method); // } // public synchronized void addLast(Method method) { // methods.addLast(method); // } // public synchronized Method removeFirst() { // try { // Method method = methods.removeFirst(); // return method; // } catch (NoSuchElementException e) { // return null; // } // } // public synchronized void close() { // while (size() > 0) { // Method method = removeFirst(); // method.setExceptionText("Closing."); // method.gotResponse(); // } // } // //} //class RequestQueue { // private LinkedList functions = new LinkedList(); // // RequestQueue() { // } // public synchronized int size() { // return functions.size(); // } // public synchronized void addFirst(AbstractFunction e) { // functions.addFirst(e); // } // public synchronized void addLast(AbstractFunction e) { // functions.addLast(e); // } // public synchronized AbstractFunction removeFirst() { // try { // AbstractFunction ret = functions.removeFirst(); // return ret; // } catch (NoSuchElementException e) { // return null; // } // } //} class MethodQueue { private TIntObjectHashMap methodMap = new TIntObjectHashMap(); MethodQueue() { } public synchronized int size() { return methodMap.size(); } public synchronized boolean isEmpty() { return 0 == size(); } public synchronized void add(Method method) throws ProCoreException { int token = method.getToken(); if (0 == token) throw new ProCoreException("Illegal token number."); Method oldMethod = methodMap.put(token, method); if (null != oldMethod) throw new ProCoreException("Duplicate token number."); } public synchronized Method remove(int token) { Method method = methodMap.remove(token); return method; } synchronized void close() { try { TIntObjectIterator iterator = methodMap.iterator(); while (iterator.hasNext()) { iterator.advance(); Method method = iterator.value(); if (null == method) Util.logError("Null function in method queue."); method.gotException("Method queue closing."); } } finally { methodMap.clear(); } } // public synchronized Method removeAny() throws ProCoreException { // TIntObjectIterator iterator = methodMap.iterator(); // if (!iterator.hasNext()) // return null; // iterator.advance(); // Method method = iterator.value(); // if (null == method) // throw new ProCoreException("Null function in response queue."); // methodMap.remove(method.getToken()); // return method; // } } class Channel { private SelectionKey selectionKey = null; private SocketChannel socketChannel = null; Channel() { } void register(Selector selector, Connection connection) throws ClosedChannelException { if (null == socketChannel) throw new ClosedChannelException(); socketChannel.register(selector, SelectionKey.OP_CONNECT, connection); } void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address) throws ProCoreException, IOException { if (null == socketChannel) throw new NotConnectedException("Illegal argument. No socket channel to connect to."); if (null == address) throw new NotConnectedException("Illegal argument. No address to connect to."); if (null != this.socketChannel) throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this); socketChannel.configureBlocking(false); boolean connected = socketChannel.connect(address); if (connected) if (DebugPolicy.REPORT_DEBUG_EVENTS) Util.showDebug("Connected to " + socketChannel + "."); this.socketChannel = socketChannel; // Waiting for connect. } // private static final int IPTOS_LOWDELAY = 0x10; synchronized void connect(SelectionKey key) { this.selectionKey = key; this.socketChannel = (SocketChannel)key.channel(); if (DebugPolicy.REPORT_DEBUG_EVENTS) Util.showDebug("Connected to " + socketChannel + "."); // try { // this.socketChannel.socket().setKeepAlive(true); // this.socketChannel.socket().setSoLinger(true, 1); // } catch (SocketException e) { // e.printStackTrace(); // } // try { // this.socketChannel.socket().setTcpNoDelay(true); // Crashes Kalle! // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY); // } catch (SocketException e) { // e.printStackTrace(); // } } synchronized void disconnect() { if (selectionKey == null) return; try { try { if (DebugPolicy.REPORT_DEBUG_EVENTS) Util.showDebug("Disconnected from " + socketChannel); selectionKey.interestOps(0); selectionKey.cancel(); selectionKey.selector().wakeup(); } catch (CancelledKeyException e) { if (DebugPolicy.REPORT_DEBUG_OPERATIONS) Logger.defaultLogError("Failed to disconnect cleanly.", e); } catch (IllegalArgumentException e) { if (DebugPolicy.REPORT_DEBUG_OPERATIONS) Logger.defaultLogError("Failed to disconnect cleanly.", e); } try { // Note that this must be after the selectionKey operations above. socketChannel.socket().close(); } catch (IOException e) { if (DebugPolicy.REPORT_DEBUG_OPERATIONS) Logger.defaultLogError("Failed to close socket cleanly.", e); } } finally { selectionKey = null; socketChannel = null; } } synchronized boolean isConnected() { boolean ret = selectionKey != null && socketChannel.isConnected(); return ret; } synchronized SocketChannel socket() throws IOException { if (null == socketChannel) throw new IOException("Socket channel is null."); return socketChannel; } synchronized void listenReadEvents() throws IOException { if (null == selectionKey) throw new IOException("Trying to listen connection which is not connected."); int ops = selectionKey.interestOps() | SelectionKey.OP_READ; selectionKey.interestOps(ops); selectionKey.selector().wakeup(); } } class Connection { private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS; private static int lastSentTokenNumber = 0; private final ConnectionThread connectionThread; private InetSocketAddress address; private Channel channel; private CountDownLatch connectionCreated; private ProCoreException exception; private ByteBuffer readBuffer; Connection(ConnectionThread connectionThread) { this.connectionThread = connectionThread; init4Connection(InetSocketAddress.createUnresolved("", 0)); } @Override public String toString() { return "Connection address=" + address + " token=" + lastSentTokenNumber; } boolean addressNotInitialized() { return address.equals(InetSocketAddress.createUnresolved("", 0)); } void register(Selector selector) throws ClosedChannelException { channel.register(selector, this); } InetSocketAddress getAddress() { return address; } boolean equalsAddress(InetSocketAddress address) { return address.equals(address); } void init4Connection(InetSocketAddress address) { this.address = address; if (null != channel && channel.isConnected()) channel.disconnect(); channel = new Channel(); connectionCreated = new CountDownLatch(1); exception = null; readBuffer = connectionThread.getInputBuffer(); } void prepare4Connection(SocketChannel socketChannel) throws ProCoreException, IOException { if (isConnected()) throw new ProCoreException("Illegal state exception. Already connected."); if (address.isUnresolved()) address = new InetSocketAddress(address.getHostName(), address.getPort()); if (address.getPort() == 0) throw new ProCoreException("Port 0 not supported as connection address."); channel.prepare4Connection(socketChannel, address); } void wait4Connection() throws InterruptedException { connectionCreated.await(); } synchronized void disconnect() { if (!isConnected()) return; channel.disconnect(); } private void throwIOExceptionFromRead(IOException e) throws IOException { disconnect(); // This ensures that isConnected will be false after this. if (null != e) throw e; throw new IOException("Failed to read."); } // Called by synchronized handleInput. private void sendMessage(ByteBuffer byteBuffer) throws IOException, InterruptedException { int left = byteBuffer.remaining(); while (left > 0) { ByteBuffer slice = byteBuffer.slice(); int size = Math.min(left, 1000000); slice.limit(size); sendBuffer(slice); int position = byteBuffer.position() + size; byteBuffer.position(position); left -= size; } } private void sendBuffer(ByteBuffer byteBuffer) throws IOException, InterruptedException { int left = byteBuffer.remaining(); while (left > 0) { int n = channel.socket().write(byteBuffer); if (n > 0) { left -= n; continue; } // System.err.println("############### YIELD ##############################"); Thread.yield(); if (left > 0) Logger.defaultLogInfo("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining() + ", write return =" + n); } } static public class MessageHeader { public int token; public int lastTokenIn; public MessageHeader(int token, int lastTokenIn) { this.token = token; this.lastTokenIn = lastTokenIn; } } // Called by synchronized call. protected void call(MessageHeader messageHeader, Method method) throws IOException, InterruptedException { byte[] bytes = new byte[20]; ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20); header.order(ByteOrder.LITTLE_ENDIAN); header.putInt(messageHeader.lastTokenIn); header.putInt(messageHeader.token); int messageNumber = method.requestNumber; header.putInt(messageNumber); header.putInt(0); // inflate size, zero if no compression ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order. header.putInt(data.position()); header.clear(); synchronized (this) { if (DebugPolicy.REPORT_DEBUG_EVENTS) Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber); // Receiver expects header and data to form continuous block. sendMessage(header); data.limit(data.position()); data.rewind(); sendMessage(data); if (DebugPolicy.REPORT_DEBUG_EVENTS) Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber); } } void sendMethod(MethodQueue mq, Method m, int lastTokenIn) throws ProCoreException, IOException, InterruptedException { m.prepareForSendingRequest(); if (DEBUG) Util.showDebug("Setting last token out=" + lastTokenIn); synchronized (this) { MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn); m.setToken(messageHeader.token); mq.add(m); call(messageHeader, m); } } // Called by session manager. void onRead() throws IOException { if (readBuffer.remaining() < 1) throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead.")); int n = -1; try { n = channel.socket().read(readBuffer); } catch (IOException e) { throwIOExceptionFromRead(e); } if (n < 0) // Assuming that this is same as broken connection. throwIOExceptionFromRead(new IOException("Failed to read.")); else if (n < 1) return; // No data, no error. try { readBuffer = connectionThread.handleInput(readBuffer, n); } catch (Throwable t) { if (t instanceof IOException) throw (IOException)t; else throw new IOException("Throwable from handleInput.", t); } } // Called by session manager. void onConnectFailed(ProCoreException exception) { if (null != exception) this.exception = exception; else this.exception = new NotConnectedException("Failed to create connection."); connectionCreated.countDown(); } // Called by session manager. Connection onConnectSucceeded(SelectionKey key) { Connection ret = null; try { channel.connect(key); ret = this; } catch (Throwable t) { exception = new NotConnectedException("Failed to finalize connection.", t); // Connection refused. } connectionCreated.countDown(); return ret; } // Called by session manager. void onDisconnect() { disconnect(); } boolean isConnected() { return channel.isConnected(); } // boolean isStopping() { // } boolean isExcepted() { return null == exception; } boolean isOk() { return isConnected() && !isExcepted(); } } class HeaderData { int lastTokenIn; int token; int messageNumber; int inflateSize; int deflateSize; } class ReceiveData { public int lastTokenIn; public int token; public int messageNumber; // public DataBuffer inflateBuffer; public ByteBuffer buffer; public int inflateSize; ReceiveData(HeaderData headerData) { lastTokenIn = headerData.lastTokenIn; token = headerData.token; messageNumber = headerData.messageNumber; } } abstract class MethodHandler { abstract void got(Packet packet, AbstractFunction function) throws ProCoreException; DataBuffer getDataBuffer(Packet packet) { packet.bytes.position(20); DataBuffer db = new DataBuffer(packet.bytes, 0); return db; } } class ErrorHandler extends MethodHandler { private final ProCoreException exception; ErrorHandler(String s) { if (null == s || s == "") s = "Ilegal argument. Missing description for remote call exception."; this.exception = new ProCoreException(s); } @Override void got(Packet packet, AbstractFunction function) throws ProCoreException { throw exception; } } class ExceptionHandler extends MethodHandler { @Override void got(Packet packet, AbstractFunction function) throws ProCoreException { packet.bytes.position(20); ExceptionFunction exception = new ExceptionFunction(); exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0)); throw new ProCoreException(exception.what); } } class DefaultHandler extends MethodHandler { @Override void got(Packet packet, AbstractFunction function) throws ProCoreException { function.deserialize(function.getResponseNumber(), getDataBuffer(packet)); } } class GetClusterHandler extends MethodHandler { private final GetClusterNewFunction function; public GetClusterHandler(final GetClusterNewFunction function) { this.function = function; } @Override void got(Packet packet, AbstractFunction function) throws ProCoreException { this.function.inflateSize = packet.header.inflateSize; packet.bytes.position(20); packet.bytes.limit(20 + packet.header.deflateSize); this.function.deflated = packet.bytes.compact(); this.function.deflated.position(0); this.function.deflated.limit(packet.header.deflateSize); } } //class AckHandler extends MethodHandler { // @Override // void got(Packet packet, AbstractFunction function) throws ProCoreException { // packet.first.position(20); // NullFunction ack = new NullFunction(); // ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0)); // Util.showDebug("ack message number=" + ack.messageNumber); // } //} abstract class AbstractMethod { // abstract ByteBuffer serialize(ByteOrder byteOrder); protected final int requestNumber; protected final int responseNumber; protected int receivedNumber = 0; boolean hasResponse() { return responseNumber != 0; } private int token = 0; void setToken(int token) { this.token = token; } int getToken() { return token; } AbstractMethod(int requestNumber, int responseNumber) { this.requestNumber = requestNumber; this.responseNumber = responseNumber; } void prepareForSendingRequest() { receivedNumber = 0; } } class Method extends AbstractMethod { private final ExceptionHandler exceptionHandler = new ExceptionHandler(); private final AbstractFunction function; private Packet packet = null; private CountDownLatch gotPacket = new CountDownLatch(1); private MethodHandler handler = new DefaultHandler(); private final EventHandler eventHandler; Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) { super(function.getRequestNumber(), function.getResponseNumber()); this.eventHandler = eventHandler; if (null != handler) this.handler = handler; switch (function.getRequestNumber()) { default: break; case MessageNumber.AcceptCommitRequest: case MessageNumber.UpdateClusterRequest: gotPacket.countDown(); break; } this.function = function; } // MARK: waitforReply void waitForReply(Connection c) throws ProCoreException, InterruptedException { while (gotPacket.getCount() > 0) { gotPacket.await(30, TimeUnit.SECONDS); if (gotPacket.getCount() > 0) { System.err.println("waitForReply timeout:"); System.err.println("wait token=" + getToken()); // MARK: c.onWait(getToken()); } } if (null != packet) handler.got(packet, function); } EventHandler getEventHandler(EventHandler eventHandler) { if (null != this.eventHandler) return this.eventHandler; else return eventHandler; } ByteBuffer serialize(ByteOrder byteOrder) { DataBuffer db = function.serialize(byteOrder); return db.getByteBuffer(); } void gotPacket(Packet packet) { this.packet = packet; gotPacket.countDown(); } void gotException(String s) { handler = new ErrorHandler(s); gotPacket.countDown(); } void gotException(Packet packet) { this.packet = packet; handler = exceptionHandler; gotPacket.countDown(); } // void gotAck(Packet packet) { // this.packet = packet; // handler = new AckHandler(); // gotPacket.countDown(); // } } class DebugPolicy { static final boolean REPORT_DEBUG_EVENTS = false; static final boolean REPORT_SERVER_EVENTS = false; static final boolean REPORT_DEBUG_OPERATIONS = false; static final boolean REPORT_TIME_CONSUMING_REQUESTS = false; static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false; private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS; private static final long LONG_EXECUTION_REPORT_PERIOD = 10000; static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD); // Time to wait for an event (in milliseconds). // Code logic should assume that this wait never expires. // Should be big enough so that if this time is expired it indicates an error situation. private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour private static final boolean REPORT_WAIT_EVENTS = true; private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true; /** * Calls object.wait. * @param waitMs time to wait in milliseconds. * @param object to wait for (used for calling object.wait) * @param string text used in debug prints. * @see java.lang.Object.wait */ static final void wait(long waitMs, Object object, String string) { waitImpl(waitMs, object, string); } /** * Calls object.wait. * @param object to wait for (used for calling object.wait) * @param string text used in debug prints. * @see java.lang.Object.wait */ static final void wait(Object object, String string) { waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string); } private static final void report(String string, Object object) { String msg = "DEBUG: " + string + " " + object.getClass().getSimpleName() + " hid=" + System.identityHashCode(object); if (REPORT_WAIT_EVENTS_VERBOSE) new Throwable(msg).printStackTrace(); else System.err.println(msg); } private static final void waitImpl(long waitMs, Object object, String string) { if (REPORT_WAIT_EVENTS) report("Wait for event started.", object); try { object.wait(waitMs); // milliseconds } catch (InterruptedException e) { if (REPORT_WAIT_EVENTS) report("Wait for event interrupted.", object); } finally { if (REPORT_WAIT_EVENTS) report("Wait for event stopped.", object); } } }