1 package org.simantics.db.server.internal;
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.ByteBuffer;
6 import java.nio.ByteOrder;
7 import java.nio.channels.CancelledKeyException;
8 import java.nio.channels.ClosedChannelException;
9 import java.nio.channels.SelectionKey;
10 import java.nio.channels.Selector;
11 import java.nio.channels.SocketChannel;
12 import java.util.Collections;
13 import java.util.Iterator;
14 import java.util.LinkedList;
15 import java.util.List;
16 import java.util.concurrent.CountDownLatch;
17 import java.util.concurrent.TimeUnit;
19 import org.simantics.db.common.utils.Logger;
20 import org.simantics.db.server.ProCoreException;
21 import org.simantics.db.server.protocol.AbstractFunction;
22 import org.simantics.db.server.protocol.DataBuffer;
23 import org.simantics.db.server.protocol.ExceptionFunction;
24 import org.simantics.db.server.protocol.GetClusterNewFunction;
25 import org.simantics.db.server.protocol.MessageNumber;
27 import gnu.trove.iterator.TIntObjectIterator;
28 import gnu.trove.map.hash.TIntObjectHashMap;
30 public class ConnectionManager {
31 private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS;
32 private static final ConnectionManager connectionManager = new ConnectionManager();
33 public static ConnectionManager getInstance() {
34 return connectionManager.instance();
36 private final Manager manager = new Manager();
37 private final Thread thread = new Thread(manager, "Connection Manager");
38 private boolean stop = false;
39 protected List<Connection> connections = Collections.synchronizedList(new LinkedList<Connection>());
40 private ConnectionManager() {
41 Util.trace("ConnectionManager.new");
43 private ConnectionManager instance() {
44 if (!thread.isAlive())
48 protected class Manager implements Runnable {
49 private Selector selector;
50 private CountDownLatch selectorOpen = new CountDownLatch(1);
55 selector = Selector.open();
56 selectorOpen.countDown();
58 int n = selector.select();
67 } catch (CancelledKeyException e) {
68 Logger.defaultLogError("Cancelled key in select loop.", e);
71 } catch (Throwable x) {
72 Logger.defaultLogError("Select loop failed.", x);
75 void processConnectors() throws ClosedChannelException {
76 while (connections.size() > 0) {
77 Connection c = connections.remove(0);
81 void processSelected() {
82 Iterator<SelectionKey> i = selector.selectedKeys().iterator();
84 SelectionKey sk = (SelectionKey)i.next();
88 processDisconnect(sk);
89 else if (sk.isConnectable())
91 else if (sk.isReadable())
93 } catch (CancelledKeyException e) {
94 processDisconnect(sk);
98 private void processDisconnect(SelectionKey key) {
99 Connection c = (Connection)key.attachment();
103 key.attach(null); // Just to make sure that reference to connection is released.
106 private void processConnect(SelectionKey key) {
107 Connection connection = (Connection)key.attachment();
108 SocketChannel sc = (SocketChannel)key.channel();
110 if (sc.finishConnect()) {
111 int ops = key.interestOps() & ~SelectionKey.OP_CONNECT;
112 ops |= SelectionKey.OP_READ;
113 key.interestOps(ops);
114 Connection c = connection.onConnectSucceeded(key);
115 if (c == null) { // Connector refused to accept connection.
116 key.attach(null); // Just to make sure that reference to connector is released.
122 } catch (Throwable x) {
125 key.attach(null); // Just to make sure that reference to connector is released.
129 } catch (IOException e) {
130 if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used.
131 Logger.defaultLogError("Failed to close connection.", e);
134 connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x));
139 private void processRead(SelectionKey key) {
140 Connection c = (Connection)key.attachment();
141 SocketChannel sc = (SocketChannel)key.channel();
143 int ops = key.interestOps();
144 ops |= SelectionKey.OP_READ;
145 key.interestOps(ops);
147 } catch (IOException x) {
149 key.attach(null); // Just to make sure that reference to connection is released.
155 } catch (IOException e) {
159 void connect(Connection connection) throws ProCoreException {
161 if (connections.contains(connection))
162 throw new ProCoreException("Connection already registered. connection=" + connection);
163 connection.prepare4Connection(SocketChannel.open());
164 connections.add(connection);
165 manager.selector.wakeup();
166 connection.wait4Connection();
167 } catch(IOException e) {
168 throw new ConnectionException("Failed to connect to " + connection + ".", e);
169 } catch (InterruptedException e) {
170 if (!connection.isConnected())
171 throw new NotConnectedException("Connection interrupted to " + connection + ".");
174 private void start() {
177 while (manager.selectorOpen.getCount() > 0) {
179 manager.selectorOpen.await();
180 } catch (InterruptedException e) {
183 Util.logError("Wait for selector open interrupted. Continuing wait.", e);
190 manager.selector.wakeup();
193 //class MethodQueue {
194 // private LinkedList<Method> methods = new LinkedList<Method>();
197 // public synchronized int size() {
198 // return methods.size();
200 // public synchronized void addFirst(Method method) {
201 // methods.addFirst(method);
203 // public synchronized void addLast(Method method) {
204 // methods.addLast(method);
206 // public synchronized Method removeFirst() {
208 // Method method = methods.removeFirst();
210 // } catch (NoSuchElementException e) {
214 // public synchronized void close() {
215 // while (size() > 0) {
216 // Method method = removeFirst();
217 // method.setExceptionText("Closing.");
218 // method.gotResponse();
223 //class RequestQueue {
224 // private LinkedList<AbstractFunction> functions = new LinkedList<AbstractFunction>();
228 // public synchronized int size() {
229 // return functions.size();
231 // public synchronized void addFirst(AbstractFunction e) {
232 // functions.addFirst(e);
234 // public synchronized void addLast(AbstractFunction e) {
235 // functions.addLast(e);
237 // public synchronized AbstractFunction removeFirst() {
239 // AbstractFunction ret = functions.removeFirst();
241 // } catch (NoSuchElementException e) {
247 private TIntObjectHashMap<Method> methodMap = new TIntObjectHashMap<Method>();
250 public synchronized int size() {
251 return methodMap.size();
253 public synchronized boolean isEmpty() {
256 public synchronized void add(Method method) throws ProCoreException {
257 int token = method.getToken();
259 throw new ProCoreException("Illegal token number.");
260 Method oldMethod = methodMap.put(token, method);
261 if (null != oldMethod)
262 throw new ProCoreException("Duplicate token number.");
264 public synchronized Method remove(int token) {
265 Method method = methodMap.remove(token);
268 synchronized void close() {
270 TIntObjectIterator<Method> iterator = methodMap.iterator();
271 while (iterator.hasNext()) {
273 Method method = iterator.value();
275 Util.logError("Null function in method queue.");
276 method.gotException("Method queue closing.");
282 // public synchronized Method removeAny() throws ProCoreException {
283 // TIntObjectIterator<Method> iterator = methodMap.iterator();
284 // if (!iterator.hasNext())
286 // iterator.advance();
287 // Method method = iterator.value();
288 // if (null == method)
289 // throw new ProCoreException("Null function in response queue.");
290 // methodMap.remove(method.getToken());
295 private SelectionKey selectionKey = null;
296 private SocketChannel socketChannel = null;
299 void register(Selector selector, Connection connection)
300 throws ClosedChannelException {
301 if (null == socketChannel)
302 throw new ClosedChannelException();
303 socketChannel.register(selector, SelectionKey.OP_CONNECT, connection);
305 void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address)
306 throws ProCoreException, IOException {
307 if (null == socketChannel)
308 throw new NotConnectedException("Illegal argument. No socket channel to connect to.");
310 throw new NotConnectedException("Illegal argument. No address to connect to.");
311 if (null != this.socketChannel)
312 throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this);
313 socketChannel.configureBlocking(false);
314 boolean connected = socketChannel.connect(address);
316 if (DebugPolicy.REPORT_DEBUG_EVENTS)
317 Util.showDebug("Connected to " + socketChannel + ".");
318 this.socketChannel = socketChannel; // Waiting for connect.
320 // private static final int IPTOS_LOWDELAY = 0x10;
321 synchronized void connect(SelectionKey key) {
322 this.selectionKey = key;
323 this.socketChannel = (SocketChannel)key.channel();
324 if (DebugPolicy.REPORT_DEBUG_EVENTS)
325 Util.showDebug("Connected to " + socketChannel + ".");
327 // this.socketChannel.socket().setKeepAlive(true);
328 // this.socketChannel.socket().setSoLinger(true, 1);
329 // } catch (SocketException e) {
330 // e.printStackTrace();
333 // this.socketChannel.socket().setTcpNoDelay(true);
335 // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY);
336 // } catch (SocketException e) {
337 // e.printStackTrace();
340 synchronized void disconnect() {
341 if (selectionKey == null)
345 if (DebugPolicy.REPORT_DEBUG_EVENTS)
346 Util.showDebug("Disconnected from " + socketChannel);
347 selectionKey.interestOps(0);
348 selectionKey.cancel();
349 selectionKey.selector().wakeup();
350 } catch (CancelledKeyException e) {
351 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
352 Logger.defaultLogError("Failed to disconnect cleanly.", e);
353 } catch (IllegalArgumentException e) {
354 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
355 Logger.defaultLogError("Failed to disconnect cleanly.", e);
357 try { // Note that this must be after the selectionKey operations above.
358 socketChannel.socket().close();
359 } catch (IOException e) {
360 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
361 Logger.defaultLogError("Failed to close socket cleanly.", e);
365 socketChannel = null;
368 synchronized boolean isConnected() {
369 boolean ret = selectionKey != null && socketChannel.isConnected();
372 synchronized SocketChannel socket() throws IOException {
373 if (null == socketChannel)
374 throw new IOException("Socket channel is null.");
375 return socketChannel;
377 synchronized void listenReadEvents()
379 if (null == selectionKey)
380 throw new IOException("Trying to listen connection which is not connected.");
381 int ops = selectionKey.interestOps() | SelectionKey.OP_READ;
382 selectionKey.interestOps(ops);
383 selectionKey.selector().wakeup();
387 private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS;
388 private static int lastSentTokenNumber = 0;
389 private final ConnectionThread connectionThread;
390 private InetSocketAddress address;
391 private Channel channel;
392 private CountDownLatch connectionCreated;
393 private ProCoreException exception;
394 private ByteBuffer readBuffer;
395 Connection(ConnectionThread connectionThread) {
396 this.connectionThread = connectionThread;
397 init4Connection(InetSocketAddress.createUnresolved("", 0));
400 public String toString() {
401 return "Connection address=" + address + " token=" + lastSentTokenNumber;
403 boolean addressNotInitialized() {
404 return address.equals(InetSocketAddress.createUnresolved("", 0));
406 void register(Selector selector)
407 throws ClosedChannelException {
408 channel.register(selector, this);
410 InetSocketAddress getAddress() {
413 boolean equalsAddress(InetSocketAddress address) {
414 return address.equals(address);
416 void init4Connection(InetSocketAddress address) {
417 this.address = address;
418 if (null != channel && channel.isConnected())
419 channel.disconnect();
420 channel = new Channel();
421 connectionCreated = new CountDownLatch(1);
423 readBuffer = connectionThread.getInputBuffer();
425 void prepare4Connection(SocketChannel socketChannel)
426 throws ProCoreException, IOException {
428 throw new ProCoreException("Illegal state exception. Already connected.");
429 if (address.isUnresolved())
430 address = new InetSocketAddress(address.getHostName(), address.getPort());
431 if (address.getPort() == 0)
432 throw new ProCoreException("Port 0 not supported as connection address.");
433 channel.prepare4Connection(socketChannel, address);
435 void wait4Connection() throws InterruptedException {
436 connectionCreated.await();
438 synchronized void disconnect() {
441 channel.disconnect();
443 private void throwIOExceptionFromRead(IOException e)
445 disconnect(); // This ensures that isConnected will be false after this.
448 throw new IOException("Failed to read.");
450 // Called by synchronized handleInput.
451 private void sendMessage(ByteBuffer byteBuffer)
452 throws IOException, InterruptedException {
453 int left = byteBuffer.remaining();
455 ByteBuffer slice = byteBuffer.slice();
456 int size = Math.min(left, 1000000);
459 int position = byteBuffer.position() + size;
460 byteBuffer.position(position);
464 private void sendBuffer(ByteBuffer byteBuffer)
465 throws IOException, InterruptedException {
466 int left = byteBuffer.remaining();
468 int n = channel.socket().write(byteBuffer);
473 // System.err.println("############### YIELD ##############################");
476 Logger.defaultLogTrace("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining()
477 + ", write return =" + n);
480 static public class MessageHeader {
482 public int lastTokenIn;
483 public MessageHeader(int token, int lastTokenIn) {
485 this.lastTokenIn = lastTokenIn;
488 // Called by synchronized call.
489 protected void call(MessageHeader messageHeader, Method method)
490 throws IOException, InterruptedException {
491 byte[] bytes = new byte[20];
492 ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20);
493 header.order(ByteOrder.LITTLE_ENDIAN);
494 header.putInt(messageHeader.lastTokenIn);
495 header.putInt(messageHeader.token);
496 int messageNumber = method.requestNumber;
497 header.putInt(messageNumber);
498 header.putInt(0); // inflate size, zero if no compression
499 ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order.
500 header.putInt(data.position());
502 synchronized (this) {
503 if (DebugPolicy.REPORT_DEBUG_EVENTS)
504 Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber);
505 // Receiver expects header and data to form continuous block.
507 data.limit(data.position());
510 if (DebugPolicy.REPORT_DEBUG_EVENTS)
511 Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber);
514 void sendMethod(MethodQueue mq, Method m, int lastTokenIn)
515 throws ProCoreException, IOException, InterruptedException {
516 m.prepareForSendingRequest();
518 Util.showDebug("Setting last token out=" + lastTokenIn);
519 synchronized (this) {
520 MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn);
521 m.setToken(messageHeader.token);
523 call(messageHeader, m);
526 // Called by session manager.
527 void onRead() throws IOException {
528 if (readBuffer.remaining() < 1)
529 throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead."));
532 n = channel.socket().read(readBuffer);
533 } catch (IOException e) {
534 throwIOExceptionFromRead(e);
536 if (n < 0) // Assuming that this is same as broken connection.
537 throwIOExceptionFromRead(new IOException("Failed to read."));
539 return; // No data, no error.
541 readBuffer = connectionThread.handleInput(readBuffer, n);
542 } catch (Throwable t) {
543 if (t instanceof IOException)
544 throw (IOException)t;
546 throw new IOException("Throwable from handleInput.", t);
549 // Called by session manager.
550 void onConnectFailed(ProCoreException exception) {
551 if (null != exception)
552 this.exception = exception;
554 this.exception = new NotConnectedException("Failed to create connection.");
555 connectionCreated.countDown();
557 // Called by session manager.
558 Connection onConnectSucceeded(SelectionKey key) {
559 Connection ret = null;
561 channel.connect(key);
563 } catch (Throwable t) {
564 exception = new NotConnectedException("Failed to finalize connection.", t);
565 // Connection refused.
567 connectionCreated.countDown();
570 // Called by session manager.
571 void onDisconnect() {
574 boolean isConnected() {
575 return channel.isConnected();
577 // boolean isStopping() {
579 boolean isExcepted() {
580 return null == exception;
583 return isConnected() && !isExcepted();
594 public int lastTokenIn;
596 public int messageNumber;
597 // public DataBuffer inflateBuffer;
598 public ByteBuffer buffer;
599 public int inflateSize;
600 ReceiveData(HeaderData headerData) {
601 lastTokenIn = headerData.lastTokenIn;
602 token = headerData.token;
603 messageNumber = headerData.messageNumber;
606 abstract class MethodHandler {
607 abstract void got(Packet packet, AbstractFunction function) throws ProCoreException;
608 DataBuffer getDataBuffer(Packet packet) {
609 packet.bytes.position(20);
610 DataBuffer db = new DataBuffer(packet.bytes, 0);
614 class ErrorHandler extends MethodHandler {
615 private final ProCoreException exception;
616 ErrorHandler(String s) {
617 if (null == s || s == "")
618 s = "Ilegal argument. Missing description for remote call exception.";
619 this.exception = new ProCoreException(s);
622 void got(Packet packet, AbstractFunction function) throws ProCoreException {
626 class ExceptionHandler extends MethodHandler {
628 void got(Packet packet, AbstractFunction function) throws ProCoreException {
629 packet.bytes.position(20);
630 ExceptionFunction exception = new ExceptionFunction();
631 exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0));
632 throw new ProCoreException(exception.what);
635 class DefaultHandler extends MethodHandler {
637 void got(Packet packet, AbstractFunction function) throws ProCoreException {
638 function.deserialize(function.getResponseNumber(), getDataBuffer(packet));
641 class GetClusterHandler extends MethodHandler {
642 private final GetClusterNewFunction function;
643 public GetClusterHandler(final GetClusterNewFunction function) {
644 this.function = function;
647 void got(Packet packet, AbstractFunction function) throws ProCoreException {
648 this.function.inflateSize = packet.header.inflateSize;
649 packet.bytes.position(20);
650 packet.bytes.limit(20 + packet.header.deflateSize);
651 this.function.deflated = packet.bytes.compact();
652 this.function.deflated.position(0);
653 this.function.deflated.limit(packet.header.deflateSize);
656 //class AckHandler extends MethodHandler {
658 // void got(Packet packet, AbstractFunction function) throws ProCoreException {
659 // packet.first.position(20);
660 // NullFunction ack = new NullFunction();
661 // ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0));
662 // Util.showDebug("ack message number=" + ack.messageNumber);
665 abstract class AbstractMethod {
666 // abstract ByteBuffer serialize(ByteOrder byteOrder);
667 protected final int requestNumber;
668 protected final int responseNumber;
669 protected int receivedNumber = 0;
670 boolean hasResponse() {
671 return responseNumber != 0;
673 private int token = 0;
674 void setToken(int token) {
680 AbstractMethod(int requestNumber, int responseNumber) {
681 this.requestNumber = requestNumber;
682 this.responseNumber = responseNumber;
684 void prepareForSendingRequest() {
688 class Method extends AbstractMethod {
689 private final ExceptionHandler exceptionHandler = new ExceptionHandler();
690 private final AbstractFunction function;
691 private Packet packet = null;
692 private CountDownLatch gotPacket = new CountDownLatch(1);
693 private MethodHandler handler = new DefaultHandler();
694 private final EventHandler eventHandler;
695 Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) {
696 super(function.getRequestNumber(), function.getResponseNumber());
697 this.eventHandler = eventHandler;
699 this.handler = handler;
700 switch (function.getRequestNumber()) {
703 case MessageNumber.AcceptCommitRequest:
704 case MessageNumber.UpdateClusterRequest:
705 gotPacket.countDown();
708 this.function = function;
709 } // MARK: waitforReply
710 void waitForReply(Connection c) throws ProCoreException, InterruptedException {
711 while (gotPacket.getCount() > 0) {
712 gotPacket.await(30, TimeUnit.SECONDS);
713 if (gotPacket.getCount() > 0) {
714 System.err.println("waitForReply timeout:");
715 System.err.println("wait token=" + getToken());
716 // MARK: c.onWait(getToken());
720 handler.got(packet, function);
722 EventHandler getEventHandler(EventHandler eventHandler) {
723 if (null != this.eventHandler)
724 return this.eventHandler;
728 ByteBuffer serialize(ByteOrder byteOrder) {
729 DataBuffer db = function.serialize(byteOrder);
730 return db.getByteBuffer();
732 void gotPacket(Packet packet) {
733 this.packet = packet;
734 gotPacket.countDown();
736 void gotException(String s) {
737 handler = new ErrorHandler(s);
738 gotPacket.countDown();
740 void gotException(Packet packet) {
741 this.packet = packet;
742 handler = exceptionHandler;
743 gotPacket.countDown();
745 // void gotAck(Packet packet) {
746 // this.packet = packet;
747 // handler = new AckHandler();
748 // gotPacket.countDown();
752 static final boolean REPORT_DEBUG_EVENTS = false;
753 static final boolean REPORT_SERVER_EVENTS = false;
754 static final boolean REPORT_DEBUG_OPERATIONS = false;
755 static final boolean REPORT_TIME_CONSUMING_REQUESTS = false;
756 static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false;
758 private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS;
759 private static final long LONG_EXECUTION_REPORT_PERIOD = 10000;
760 static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD);
762 // Time to wait for an event (in milliseconds).
763 // Code logic should assume that this wait never expires.
764 // Should be big enough so that if this time is expired it indicates an error situation.
765 private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour
766 private static final boolean REPORT_WAIT_EVENTS = true;
767 private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true;
770 * @param waitMs time to wait in milliseconds.
771 * @param object to wait for (used for calling object.wait)
772 * @param string text used in debug prints.
773 * @see java.lang.Object.wait
775 static final void wait(long waitMs, Object object, String string) {
776 waitImpl(waitMs, object, string);
780 * @param object to wait for (used for calling object.wait)
781 * @param string text used in debug prints.
782 * @see java.lang.Object.wait
784 static final void wait(Object object, String string) {
785 waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string);
787 private static final void report(String string, Object object) {
788 String msg = "DEBUG: " + string
789 + " " + object.getClass().getSimpleName()
790 + " hid=" + System.identityHashCode(object);
791 if (REPORT_WAIT_EVENTS_VERBOSE)
792 new Throwable(msg).printStackTrace();
794 System.err.println(msg);
796 private static final void waitImpl(long waitMs, Object object, String string) {
797 if (REPORT_WAIT_EVENTS)
798 report("Wait for event started.", object);
800 object.wait(waitMs); // milliseconds
801 } catch (InterruptedException e) {
802 if (REPORT_WAIT_EVENTS)
803 report("Wait for event interrupted.", object);
805 if (REPORT_WAIT_EVENTS)
806 report("Wait for event stopped.", object);