X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.server%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fserver%2Finternal%2FConnectionManager.java;h=8eb46f5df844c0c2950b364e936337f8d753f944;hb=refs%2Fchanges%2F76%2F2276%2F1;hp=fb01a06d02ad56cceec49b32df91c9c098b600ea;hpb=969bd23cab98a79ca9101af33334000879fb60c5;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ConnectionManager.java b/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ConnectionManager.java index fb01a06d0..8eb46f5df 100644 --- a/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ConnectionManager.java +++ b/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ConnectionManager.java @@ -1,809 +1,809 @@ -package org.simantics.db.server.internal; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.simantics.db.common.utils.Logger; -import org.simantics.db.server.ProCoreException; -import org.simantics.db.server.protocol.AbstractFunction; -import org.simantics.db.server.protocol.DataBuffer; -import org.simantics.db.server.protocol.ExceptionFunction; -import org.simantics.db.server.protocol.GetClusterNewFunction; -import org.simantics.db.server.protocol.MessageNumber; - -import gnu.trove.iterator.TIntObjectIterator; -import gnu.trove.map.hash.TIntObjectHashMap; - -public class ConnectionManager { - private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS; - private static final ConnectionManager connectionManager = new ConnectionManager(); - public static ConnectionManager getInstance() { - return connectionManager.instance(); - } - private final Manager manager = new Manager(); - private final Thread thread = new Thread(manager, "Connection Manager"); - private boolean stop = false; - protected List connections = Collections.synchronizedList(new LinkedList()); - private ConnectionManager() { - Util.trace("ConnectionManager.new"); - } - private ConnectionManager instance() { - if (!thread.isAlive()) - start(); - return this; - } - protected class Manager implements Runnable { - private Selector selector; - private CountDownLatch selectorOpen = new CountDownLatch(1); - Manager() { - } - public void run() { - try { - selector = Selector.open(); - selectorOpen.countDown(); - while (true) { - int n = selector.select(); - try { - if (n > 0) - processSelected(); - processConnectors(); - if (stop) { - selector.close(); - return; - } - } catch (CancelledKeyException e) { - Logger.defaultLogError("Cancelled key in select loop.", e); - } - } - } catch (Throwable x) { - Logger.defaultLogError("Select loop failed.", x); - } - } - void processConnectors() throws ClosedChannelException { - while (connections.size() > 0) { - Connection c = connections.remove(0); - c.register(selector); - } - } - void processSelected() { - Iterator i = selector.selectedKeys().iterator(); - while (i.hasNext()) { - SelectionKey sk = (SelectionKey)i.next(); - i.remove(); - try { - if (!sk.isValid()) - processDisconnect(sk); - else if (sk.isConnectable()) - processConnect(sk); - else if (sk.isReadable()) - processRead(sk); - } catch (CancelledKeyException e) { - processDisconnect(sk); - } - } - } - private void processDisconnect(SelectionKey key) { - Connection c = (Connection)key.attachment(); - if (null == c) - return; - c.onDisconnect(); - key.attach(null); // Just to make sure that reference to connection is released. - key.cancel(); - } - private void processConnect(SelectionKey key) { - Connection connection = (Connection)key.attachment(); - SocketChannel sc = (SocketChannel)key.channel(); - try { - if (sc.finishConnect()) { - int ops = key.interestOps() & ~SelectionKey.OP_CONNECT; - ops |= SelectionKey.OP_READ; - key.interestOps(ops); - Connection c = connection.onConnectSucceeded(key); - if (c == null) { // Connector refused to accept connection. - key.attach(null); // Just to make sure that reference to connector is released. - key.cancel(); - sc.close(); - } else - key.attach(c); - } - } catch (Throwable x) { - try { - if (key.isValid()) - key.attach(null); // Just to make sure that reference to connector is released. - key.cancel(); - try { - sc.close(); - } catch (IOException e) { - if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used. - Logger.defaultLogError("Failed to close connection.", e); - } - } finally { - connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x)); - } - } - } - } - private void processRead(SelectionKey key) { - Connection c = (Connection)key.attachment(); - SocketChannel sc = (SocketChannel)key.channel(); - try { - int ops = key.interestOps(); - ops |= SelectionKey.OP_READ; - key.interestOps(ops); - c.onRead(); - } catch (IOException x) { - if (key.isValid()) { - key.attach(null); // Just to make sure that reference to connection is released. - key.cancel(); - } - if (sc.isOpen()) - try { - sc.close(); - } catch (IOException e) { - } - } - } - void connect(Connection connection) throws ProCoreException { - try { - if (connections.contains(connection)) - throw new ProCoreException("Connection already registered. connection=" + connection); - connection.prepare4Connection(SocketChannel.open()); - connections.add(connection); - manager.selector.wakeup(); - connection.wait4Connection(); - } catch(IOException e) { - throw new ConnectionException("Failed to connect to " + connection + ".", e); - } catch (InterruptedException e) { - if (!connection.isConnected()) - throw new NotConnectedException("Connection interrupted to " + connection + "."); - } - } - private void start() { - thread.start(); - boolean once = true; - while (manager.selectorOpen.getCount() > 0) { - try { - manager.selectorOpen.await(); - } catch (InterruptedException e) { - if (once) { - once = false; - Util.logError("Wait for selector open interrupted. Continuing wait.", e); - } - } - } - } - public void stop() { - stop = true; - manager.selector.wakeup(); - } -} -//class MethodQueue { -// private LinkedList methods = new LinkedList(); -// MethodQueue() { -// } -// public synchronized int size() { -// return methods.size(); -// } -// public synchronized void addFirst(Method method) { -// methods.addFirst(method); -// } -// public synchronized void addLast(Method method) { -// methods.addLast(method); -// } -// public synchronized Method removeFirst() { -// try { -// Method method = methods.removeFirst(); -// return method; -// } catch (NoSuchElementException e) { -// return null; -// } -// } -// public synchronized void close() { -// while (size() > 0) { -// Method method = removeFirst(); -// method.setExceptionText("Closing."); -// method.gotResponse(); -// } -// } -// -//} -//class RequestQueue { -// private LinkedList functions = new LinkedList(); -// -// RequestQueue() { -// } -// public synchronized int size() { -// return functions.size(); -// } -// public synchronized void addFirst(AbstractFunction e) { -// functions.addFirst(e); -// } -// public synchronized void addLast(AbstractFunction e) { -// functions.addLast(e); -// } -// public synchronized AbstractFunction removeFirst() { -// try { -// AbstractFunction ret = functions.removeFirst(); -// return ret; -// } catch (NoSuchElementException e) { -// return null; -// } -// } -//} -class MethodQueue { - private TIntObjectHashMap methodMap = new TIntObjectHashMap(); - MethodQueue() { - } - public synchronized int size() { - return methodMap.size(); - } - public synchronized boolean isEmpty() { - return 0 == size(); - } - public synchronized void add(Method method) throws ProCoreException { - int token = method.getToken(); - if (0 == token) - throw new ProCoreException("Illegal token number."); - Method oldMethod = methodMap.put(token, method); - if (null != oldMethod) - throw new ProCoreException("Duplicate token number."); - } - public synchronized Method remove(int token) { - Method method = methodMap.remove(token); - return method; - } - synchronized void close() { - try { - TIntObjectIterator iterator = methodMap.iterator(); - while (iterator.hasNext()) { - iterator.advance(); - Method method = iterator.value(); - if (null == method) - Util.logError("Null function in method queue."); - method.gotException("Method queue closing."); - } - } finally { - methodMap.clear(); - } - } -// public synchronized Method removeAny() throws ProCoreException { -// TIntObjectIterator iterator = methodMap.iterator(); -// if (!iterator.hasNext()) -// return null; -// iterator.advance(); -// Method method = iterator.value(); -// if (null == method) -// throw new ProCoreException("Null function in response queue."); -// methodMap.remove(method.getToken()); -// return method; -// } -} -class Channel { - private SelectionKey selectionKey = null; - private SocketChannel socketChannel = null; - Channel() { - } - void register(Selector selector, Connection connection) - throws ClosedChannelException { - if (null == socketChannel) - throw new ClosedChannelException(); - socketChannel.register(selector, SelectionKey.OP_CONNECT, connection); - } - void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address) - throws ProCoreException, IOException { - if (null == socketChannel) - throw new NotConnectedException("Illegal argument. No socket channel to connect to."); - if (null == address) - throw new NotConnectedException("Illegal argument. No address to connect to."); - if (null != this.socketChannel) - throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this); - socketChannel.configureBlocking(false); - boolean connected = socketChannel.connect(address); - if (connected) - if (DebugPolicy.REPORT_DEBUG_EVENTS) - Util.showDebug("Connected to " + socketChannel + "."); - this.socketChannel = socketChannel; // Waiting for connect. - } -// private static final int IPTOS_LOWDELAY = 0x10; - synchronized void connect(SelectionKey key) { - this.selectionKey = key; - this.socketChannel = (SocketChannel)key.channel(); - if (DebugPolicy.REPORT_DEBUG_EVENTS) - Util.showDebug("Connected to " + socketChannel + "."); -// try { -// this.socketChannel.socket().setKeepAlive(true); -// this.socketChannel.socket().setSoLinger(true, 1); -// } catch (SocketException e) { -// e.printStackTrace(); -// } -// try { -// this.socketChannel.socket().setTcpNoDelay(true); - // Crashes Kalle! - // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY); -// } catch (SocketException e) { -// e.printStackTrace(); -// } - } - synchronized void disconnect() { - if (selectionKey == null) - return; - try { - try { - if (DebugPolicy.REPORT_DEBUG_EVENTS) - Util.showDebug("Disconnected from " + socketChannel); - selectionKey.interestOps(0); - selectionKey.cancel(); - selectionKey.selector().wakeup(); - } catch (CancelledKeyException e) { - if (DebugPolicy.REPORT_DEBUG_OPERATIONS) - Logger.defaultLogError("Failed to disconnect cleanly.", e); - } catch (IllegalArgumentException e) { - if (DebugPolicy.REPORT_DEBUG_OPERATIONS) - Logger.defaultLogError("Failed to disconnect cleanly.", e); - } - try { // Note that this must be after the selectionKey operations above. - socketChannel.socket().close(); - } catch (IOException e) { - if (DebugPolicy.REPORT_DEBUG_OPERATIONS) - Logger.defaultLogError("Failed to close socket cleanly.", e); - } - } finally { - selectionKey = null; - socketChannel = null; - } - } - synchronized boolean isConnected() { - boolean ret = selectionKey != null && socketChannel.isConnected(); - return ret; - } - synchronized SocketChannel socket() throws IOException { - if (null == socketChannel) - throw new IOException("Socket channel is null."); - return socketChannel; - } - synchronized void listenReadEvents() - throws IOException { - if (null == selectionKey) - throw new IOException("Trying to listen connection which is not connected."); - int ops = selectionKey.interestOps() | SelectionKey.OP_READ; - selectionKey.interestOps(ops); - selectionKey.selector().wakeup(); - } -} -class Connection { - private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS; - private static int lastSentTokenNumber = 0; - private final ConnectionThread connectionThread; - private InetSocketAddress address; - private Channel channel; - private CountDownLatch connectionCreated; - private ProCoreException exception; - private ByteBuffer readBuffer; - Connection(ConnectionThread connectionThread) { - this.connectionThread = connectionThread; - init4Connection(InetSocketAddress.createUnresolved("", 0)); - } - @Override - public String toString() { - return "Connection address=" + address + " token=" + lastSentTokenNumber; - } - boolean addressNotInitialized() { - return address.equals(InetSocketAddress.createUnresolved("", 0)); - } - void register(Selector selector) - throws ClosedChannelException { - channel.register(selector, this); - } - InetSocketAddress getAddress() { - return address; - } - boolean equalsAddress(InetSocketAddress address) { - return address.equals(address); - } - void init4Connection(InetSocketAddress address) { - this.address = address; - if (null != channel && channel.isConnected()) - channel.disconnect(); - channel = new Channel(); - connectionCreated = new CountDownLatch(1); - exception = null; - readBuffer = connectionThread.getInputBuffer(); - } - void prepare4Connection(SocketChannel socketChannel) - throws ProCoreException, IOException { - if (isConnected()) - throw new ProCoreException("Illegal state exception. Already connected."); - if (address.isUnresolved()) - address = new InetSocketAddress(address.getHostName(), address.getPort()); - if (address.getPort() == 0) - throw new ProCoreException("Port 0 not supported as connection address."); - channel.prepare4Connection(socketChannel, address); - } - void wait4Connection() throws InterruptedException { - connectionCreated.await(); - } - synchronized void disconnect() { - if (!isConnected()) - return; - channel.disconnect(); - } - private void throwIOExceptionFromRead(IOException e) - throws IOException { - disconnect(); // This ensures that isConnected will be false after this. - if (null != e) - throw e; - throw new IOException("Failed to read."); - } - // Called by synchronized handleInput. - private void sendMessage(ByteBuffer byteBuffer) - throws IOException, InterruptedException { - int left = byteBuffer.remaining(); - while (left > 0) { - ByteBuffer slice = byteBuffer.slice(); - int size = Math.min(left, 1000000); - slice.limit(size); - sendBuffer(slice); - int position = byteBuffer.position() + size; - byteBuffer.position(position); - left -= size; - } - } - private void sendBuffer(ByteBuffer byteBuffer) - throws IOException, InterruptedException { - int left = byteBuffer.remaining(); - while (left > 0) { - int n = channel.socket().write(byteBuffer); - if (n > 0) { - left -= n; - continue; - } - // System.err.println("############### YIELD ##############################"); - Thread.yield(); - if (left > 0) - Logger.defaultLogTrace("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining() - + ", write return =" + n); - } - } - static public class MessageHeader { - public int token; - public int lastTokenIn; - public MessageHeader(int token, int lastTokenIn) { - this.token = token; - this.lastTokenIn = lastTokenIn; - } - } - // Called by synchronized call. - protected void call(MessageHeader messageHeader, Method method) - throws IOException, InterruptedException { - byte[] bytes = new byte[20]; - ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20); - header.order(ByteOrder.LITTLE_ENDIAN); - header.putInt(messageHeader.lastTokenIn); - header.putInt(messageHeader.token); - int messageNumber = method.requestNumber; - header.putInt(messageNumber); - header.putInt(0); // inflate size, zero if no compression - ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order. - header.putInt(data.position()); - header.clear(); - synchronized (this) { - if (DebugPolicy.REPORT_DEBUG_EVENTS) - Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber); - // Receiver expects header and data to form continuous block. - sendMessage(header); - data.limit(data.position()); - data.rewind(); - sendMessage(data); - if (DebugPolicy.REPORT_DEBUG_EVENTS) - Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber); - } - } - void sendMethod(MethodQueue mq, Method m, int lastTokenIn) - throws ProCoreException, IOException, InterruptedException { - m.prepareForSendingRequest(); - if (DEBUG) - Util.showDebug("Setting last token out=" + lastTokenIn); - synchronized (this) { - MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn); - m.setToken(messageHeader.token); - mq.add(m); - call(messageHeader, m); - } - } - // Called by session manager. - void onRead() throws IOException { - if (readBuffer.remaining() < 1) - throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead.")); - int n = -1; - try { - n = channel.socket().read(readBuffer); - } catch (IOException e) { - throwIOExceptionFromRead(e); - } - if (n < 0) // Assuming that this is same as broken connection. - throwIOExceptionFromRead(new IOException("Failed to read.")); - else if (n < 1) - return; // No data, no error. - try { - readBuffer = connectionThread.handleInput(readBuffer, n); - } catch (Throwable t) { - if (t instanceof IOException) - throw (IOException)t; - else - throw new IOException("Throwable from handleInput.", t); - } - } - // Called by session manager. - void onConnectFailed(ProCoreException exception) { - if (null != exception) - this.exception = exception; - else - this.exception = new NotConnectedException("Failed to create connection."); - connectionCreated.countDown(); - } - // Called by session manager. - Connection onConnectSucceeded(SelectionKey key) { - Connection ret = null; - try { - channel.connect(key); - ret = this; - } catch (Throwable t) { - exception = new NotConnectedException("Failed to finalize connection.", t); - // Connection refused. - } - connectionCreated.countDown(); - return ret; - } - // Called by session manager. - void onDisconnect() { - disconnect(); - } - boolean isConnected() { - return channel.isConnected(); - } -// boolean isStopping() { -// } - boolean isExcepted() { - return null == exception; - } - boolean isOk() { - return isConnected() && !isExcepted(); - } -} -class HeaderData { - int lastTokenIn; - int token; - int messageNumber; - int inflateSize; - int deflateSize; -} -class ReceiveData { - public int lastTokenIn; - public int token; - public int messageNumber; -// public DataBuffer inflateBuffer; - public ByteBuffer buffer; - public int inflateSize; - ReceiveData(HeaderData headerData) { - lastTokenIn = headerData.lastTokenIn; - token = headerData.token; - messageNumber = headerData.messageNumber; - } -} -abstract class MethodHandler { - abstract void got(Packet packet, AbstractFunction function) throws ProCoreException; - DataBuffer getDataBuffer(Packet packet) { - packet.bytes.position(20); - DataBuffer db = new DataBuffer(packet.bytes, 0); - return db; - } -} -class ErrorHandler extends MethodHandler { - private final ProCoreException exception; - ErrorHandler(String s) { - if (null == s || s == "") - s = "Ilegal argument. Missing description for remote call exception."; - this.exception = new ProCoreException(s); - } - @Override - void got(Packet packet, AbstractFunction function) throws ProCoreException { - throw exception; - } -} -class ExceptionHandler extends MethodHandler { - @Override - void got(Packet packet, AbstractFunction function) throws ProCoreException { - packet.bytes.position(20); - ExceptionFunction exception = new ExceptionFunction(); - exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0)); - throw new ProCoreException(exception.what); - } -} -class DefaultHandler extends MethodHandler { - @Override - void got(Packet packet, AbstractFunction function) throws ProCoreException { - function.deserialize(function.getResponseNumber(), getDataBuffer(packet)); - } -} -class GetClusterHandler extends MethodHandler { - private final GetClusterNewFunction function; - public GetClusterHandler(final GetClusterNewFunction function) { - this.function = function; - } - @Override - void got(Packet packet, AbstractFunction function) throws ProCoreException { - this.function.inflateSize = packet.header.inflateSize; - packet.bytes.position(20); - packet.bytes.limit(20 + packet.header.deflateSize); - this.function.deflated = packet.bytes.compact(); - this.function.deflated.position(0); - this.function.deflated.limit(packet.header.deflateSize); - } -} -//class AckHandler extends MethodHandler { -// @Override -// void got(Packet packet, AbstractFunction function) throws ProCoreException { -// packet.first.position(20); -// NullFunction ack = new NullFunction(); -// ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0)); -// Util.showDebug("ack message number=" + ack.messageNumber); -// } -//} -abstract class AbstractMethod { -// abstract ByteBuffer serialize(ByteOrder byteOrder); - protected final int requestNumber; - protected final int responseNumber; - protected int receivedNumber = 0; - boolean hasResponse() { - return responseNumber != 0; - } - private int token = 0; - void setToken(int token) { - this.token = token; - } - int getToken() { - return token; - } - AbstractMethod(int requestNumber, int responseNumber) { - this.requestNumber = requestNumber; - this.responseNumber = responseNumber; - } - void prepareForSendingRequest() { - receivedNumber = 0; - } -} -class Method extends AbstractMethod { - private final ExceptionHandler exceptionHandler = new ExceptionHandler(); - private final AbstractFunction function; - private Packet packet = null; - private CountDownLatch gotPacket = new CountDownLatch(1); - private MethodHandler handler = new DefaultHandler(); - private final EventHandler eventHandler; - Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) { - super(function.getRequestNumber(), function.getResponseNumber()); - this.eventHandler = eventHandler; - if (null != handler) - this.handler = handler; - switch (function.getRequestNumber()) { - default: - break; - case MessageNumber.AcceptCommitRequest: - case MessageNumber.UpdateClusterRequest: - gotPacket.countDown(); - break; - } - this.function = function; - } // MARK: waitforReply - void waitForReply(Connection c) throws ProCoreException, InterruptedException { - while (gotPacket.getCount() > 0) { - gotPacket.await(30, TimeUnit.SECONDS); - if (gotPacket.getCount() > 0) { - System.err.println("waitForReply timeout:"); - System.err.println("wait token=" + getToken()); - // MARK: c.onWait(getToken()); - } - } - if (null != packet) - handler.got(packet, function); - } - EventHandler getEventHandler(EventHandler eventHandler) { - if (null != this.eventHandler) - return this.eventHandler; - else - return eventHandler; - } - ByteBuffer serialize(ByteOrder byteOrder) { - DataBuffer db = function.serialize(byteOrder); - return db.getByteBuffer(); - } - void gotPacket(Packet packet) { - this.packet = packet; - gotPacket.countDown(); - } - void gotException(String s) { - handler = new ErrorHandler(s); - gotPacket.countDown(); - } - void gotException(Packet packet) { - this.packet = packet; - handler = exceptionHandler; - gotPacket.countDown(); - } -// void gotAck(Packet packet) { -// this.packet = packet; -// handler = new AckHandler(); -// gotPacket.countDown(); -// } -} -class DebugPolicy { - static final boolean REPORT_DEBUG_EVENTS = false; - static final boolean REPORT_SERVER_EVENTS = false; - static final boolean REPORT_DEBUG_OPERATIONS = false; - static final boolean REPORT_TIME_CONSUMING_REQUESTS = false; - static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false; - - private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS; - private static final long LONG_EXECUTION_REPORT_PERIOD = 10000; - static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD); - - // Time to wait for an event (in milliseconds). - // Code logic should assume that this wait never expires. - // Should be big enough so that if this time is expired it indicates an error situation. - private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour - private static final boolean REPORT_WAIT_EVENTS = true; - private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true; - /** - * Calls object.wait. - * @param waitMs time to wait in milliseconds. - * @param object to wait for (used for calling object.wait) - * @param string text used in debug prints. - * @see java.lang.Object.wait - */ - static final void wait(long waitMs, Object object, String string) { - waitImpl(waitMs, object, string); - } - /** - * Calls object.wait. - * @param object to wait for (used for calling object.wait) - * @param string text used in debug prints. - * @see java.lang.Object.wait - */ - static final void wait(Object object, String string) { - waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string); - } - private static final void report(String string, Object object) { - String msg = "DEBUG: " + string - + " " + object.getClass().getSimpleName() - + " hid=" + System.identityHashCode(object); - if (REPORT_WAIT_EVENTS_VERBOSE) - new Throwable(msg).printStackTrace(); - else - System.err.println(msg); - } - private static final void waitImpl(long waitMs, Object object, String string) { - if (REPORT_WAIT_EVENTS) - report("Wait for event started.", object); - try { - object.wait(waitMs); // milliseconds - } catch (InterruptedException e) { - if (REPORT_WAIT_EVENTS) - report("Wait for event interrupted.", object); - } finally { - if (REPORT_WAIT_EVENTS) - report("Wait for event stopped.", object); - } - } -} +package org.simantics.db.server.internal; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.simantics.db.common.utils.Logger; +import org.simantics.db.server.ProCoreException; +import org.simantics.db.server.protocol.AbstractFunction; +import org.simantics.db.server.protocol.DataBuffer; +import org.simantics.db.server.protocol.ExceptionFunction; +import org.simantics.db.server.protocol.GetClusterNewFunction; +import org.simantics.db.server.protocol.MessageNumber; + +import gnu.trove.iterator.TIntObjectIterator; +import gnu.trove.map.hash.TIntObjectHashMap; + +public class ConnectionManager { + private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS; + private static final ConnectionManager connectionManager = new ConnectionManager(); + public static ConnectionManager getInstance() { + return connectionManager.instance(); + } + private final Manager manager = new Manager(); + private final Thread thread = new Thread(manager, "Connection Manager"); + private boolean stop = false; + protected List connections = Collections.synchronizedList(new LinkedList()); + private ConnectionManager() { + Util.trace("ConnectionManager.new"); + } + private ConnectionManager instance() { + if (!thread.isAlive()) + start(); + return this; + } + protected class Manager implements Runnable { + private Selector selector; + private CountDownLatch selectorOpen = new CountDownLatch(1); + Manager() { + } + public void run() { + try { + selector = Selector.open(); + selectorOpen.countDown(); + while (true) { + int n = selector.select(); + try { + if (n > 0) + processSelected(); + processConnectors(); + if (stop) { + selector.close(); + return; + } + } catch (CancelledKeyException e) { + Logger.defaultLogError("Cancelled key in select loop.", e); + } + } + } catch (Throwable x) { + Logger.defaultLogError("Select loop failed.", x); + } + } + void processConnectors() throws ClosedChannelException { + while (connections.size() > 0) { + Connection c = connections.remove(0); + c.register(selector); + } + } + void processSelected() { + Iterator i = selector.selectedKeys().iterator(); + while (i.hasNext()) { + SelectionKey sk = (SelectionKey)i.next(); + i.remove(); + try { + if (!sk.isValid()) + processDisconnect(sk); + else if (sk.isConnectable()) + processConnect(sk); + else if (sk.isReadable()) + processRead(sk); + } catch (CancelledKeyException e) { + processDisconnect(sk); + } + } + } + private void processDisconnect(SelectionKey key) { + Connection c = (Connection)key.attachment(); + if (null == c) + return; + c.onDisconnect(); + key.attach(null); // Just to make sure that reference to connection is released. + key.cancel(); + } + private void processConnect(SelectionKey key) { + Connection connection = (Connection)key.attachment(); + SocketChannel sc = (SocketChannel)key.channel(); + try { + if (sc.finishConnect()) { + int ops = key.interestOps() & ~SelectionKey.OP_CONNECT; + ops |= SelectionKey.OP_READ; + key.interestOps(ops); + Connection c = connection.onConnectSucceeded(key); + if (c == null) { // Connector refused to accept connection. + key.attach(null); // Just to make sure that reference to connector is released. + key.cancel(); + sc.close(); + } else + key.attach(c); + } + } catch (Throwable x) { + try { + if (key.isValid()) + key.attach(null); // Just to make sure that reference to connector is released. + key.cancel(); + try { + sc.close(); + } catch (IOException e) { + if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used. + Logger.defaultLogError("Failed to close connection.", e); + } + } finally { + connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x)); + } + } + } + } + private void processRead(SelectionKey key) { + Connection c = (Connection)key.attachment(); + SocketChannel sc = (SocketChannel)key.channel(); + try { + int ops = key.interestOps(); + ops |= SelectionKey.OP_READ; + key.interestOps(ops); + c.onRead(); + } catch (IOException x) { + if (key.isValid()) { + key.attach(null); // Just to make sure that reference to connection is released. + key.cancel(); + } + if (sc.isOpen()) + try { + sc.close(); + } catch (IOException e) { + } + } + } + void connect(Connection connection) throws ProCoreException { + try { + if (connections.contains(connection)) + throw new ProCoreException("Connection already registered. connection=" + connection); + connection.prepare4Connection(SocketChannel.open()); + connections.add(connection); + manager.selector.wakeup(); + connection.wait4Connection(); + } catch(IOException e) { + throw new ConnectionException("Failed to connect to " + connection + ".", e); + } catch (InterruptedException e) { + if (!connection.isConnected()) + throw new NotConnectedException("Connection interrupted to " + connection + "."); + } + } + private void start() { + thread.start(); + boolean once = true; + while (manager.selectorOpen.getCount() > 0) { + try { + manager.selectorOpen.await(); + } catch (InterruptedException e) { + if (once) { + once = false; + Util.logError("Wait for selector open interrupted. Continuing wait.", e); + } + } + } + } + public void stop() { + stop = true; + manager.selector.wakeup(); + } +} +//class MethodQueue { +// private LinkedList methods = new LinkedList(); +// MethodQueue() { +// } +// public synchronized int size() { +// return methods.size(); +// } +// public synchronized void addFirst(Method method) { +// methods.addFirst(method); +// } +// public synchronized void addLast(Method method) { +// methods.addLast(method); +// } +// public synchronized Method removeFirst() { +// try { +// Method method = methods.removeFirst(); +// return method; +// } catch (NoSuchElementException e) { +// return null; +// } +// } +// public synchronized void close() { +// while (size() > 0) { +// Method method = removeFirst(); +// method.setExceptionText("Closing."); +// method.gotResponse(); +// } +// } +// +//} +//class RequestQueue { +// private LinkedList functions = new LinkedList(); +// +// RequestQueue() { +// } +// public synchronized int size() { +// return functions.size(); +// } +// public synchronized void addFirst(AbstractFunction e) { +// functions.addFirst(e); +// } +// public synchronized void addLast(AbstractFunction e) { +// functions.addLast(e); +// } +// public synchronized AbstractFunction removeFirst() { +// try { +// AbstractFunction ret = functions.removeFirst(); +// return ret; +// } catch (NoSuchElementException e) { +// return null; +// } +// } +//} +class MethodQueue { + private TIntObjectHashMap methodMap = new TIntObjectHashMap(); + MethodQueue() { + } + public synchronized int size() { + return methodMap.size(); + } + public synchronized boolean isEmpty() { + return 0 == size(); + } + public synchronized void add(Method method) throws ProCoreException { + int token = method.getToken(); + if (0 == token) + throw new ProCoreException("Illegal token number."); + Method oldMethod = methodMap.put(token, method); + if (null != oldMethod) + throw new ProCoreException("Duplicate token number."); + } + public synchronized Method remove(int token) { + Method method = methodMap.remove(token); + return method; + } + synchronized void close() { + try { + TIntObjectIterator iterator = methodMap.iterator(); + while (iterator.hasNext()) { + iterator.advance(); + Method method = iterator.value(); + if (null == method) + Util.logError("Null function in method queue."); + method.gotException("Method queue closing."); + } + } finally { + methodMap.clear(); + } + } +// public synchronized Method removeAny() throws ProCoreException { +// TIntObjectIterator iterator = methodMap.iterator(); +// if (!iterator.hasNext()) +// return null; +// iterator.advance(); +// Method method = iterator.value(); +// if (null == method) +// throw new ProCoreException("Null function in response queue."); +// methodMap.remove(method.getToken()); +// return method; +// } +} +class Channel { + private SelectionKey selectionKey = null; + private SocketChannel socketChannel = null; + Channel() { + } + void register(Selector selector, Connection connection) + throws ClosedChannelException { + if (null == socketChannel) + throw new ClosedChannelException(); + socketChannel.register(selector, SelectionKey.OP_CONNECT, connection); + } + void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address) + throws ProCoreException, IOException { + if (null == socketChannel) + throw new NotConnectedException("Illegal argument. No socket channel to connect to."); + if (null == address) + throw new NotConnectedException("Illegal argument. No address to connect to."); + if (null != this.socketChannel) + throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this); + socketChannel.configureBlocking(false); + boolean connected = socketChannel.connect(address); + if (connected) + if (DebugPolicy.REPORT_DEBUG_EVENTS) + Util.showDebug("Connected to " + socketChannel + "."); + this.socketChannel = socketChannel; // Waiting for connect. + } +// private static final int IPTOS_LOWDELAY = 0x10; + synchronized void connect(SelectionKey key) { + this.selectionKey = key; + this.socketChannel = (SocketChannel)key.channel(); + if (DebugPolicy.REPORT_DEBUG_EVENTS) + Util.showDebug("Connected to " + socketChannel + "."); +// try { +// this.socketChannel.socket().setKeepAlive(true); +// this.socketChannel.socket().setSoLinger(true, 1); +// } catch (SocketException e) { +// e.printStackTrace(); +// } +// try { +// this.socketChannel.socket().setTcpNoDelay(true); + // Crashes Kalle! + // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY); +// } catch (SocketException e) { +// e.printStackTrace(); +// } + } + synchronized void disconnect() { + if (selectionKey == null) + return; + try { + try { + if (DebugPolicy.REPORT_DEBUG_EVENTS) + Util.showDebug("Disconnected from " + socketChannel); + selectionKey.interestOps(0); + selectionKey.cancel(); + selectionKey.selector().wakeup(); + } catch (CancelledKeyException e) { + if (DebugPolicy.REPORT_DEBUG_OPERATIONS) + Logger.defaultLogError("Failed to disconnect cleanly.", e); + } catch (IllegalArgumentException e) { + if (DebugPolicy.REPORT_DEBUG_OPERATIONS) + Logger.defaultLogError("Failed to disconnect cleanly.", e); + } + try { // Note that this must be after the selectionKey operations above. + socketChannel.socket().close(); + } catch (IOException e) { + if (DebugPolicy.REPORT_DEBUG_OPERATIONS) + Logger.defaultLogError("Failed to close socket cleanly.", e); + } + } finally { + selectionKey = null; + socketChannel = null; + } + } + synchronized boolean isConnected() { + boolean ret = selectionKey != null && socketChannel.isConnected(); + return ret; + } + synchronized SocketChannel socket() throws IOException { + if (null == socketChannel) + throw new IOException("Socket channel is null."); + return socketChannel; + } + synchronized void listenReadEvents() + throws IOException { + if (null == selectionKey) + throw new IOException("Trying to listen connection which is not connected."); + int ops = selectionKey.interestOps() | SelectionKey.OP_READ; + selectionKey.interestOps(ops); + selectionKey.selector().wakeup(); + } +} +class Connection { + private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS; + private static int lastSentTokenNumber = 0; + private final ConnectionThread connectionThread; + private InetSocketAddress address; + private Channel channel; + private CountDownLatch connectionCreated; + private ProCoreException exception; + private ByteBuffer readBuffer; + Connection(ConnectionThread connectionThread) { + this.connectionThread = connectionThread; + init4Connection(InetSocketAddress.createUnresolved("", 0)); + } + @Override + public String toString() { + return "Connection address=" + address + " token=" + lastSentTokenNumber; + } + boolean addressNotInitialized() { + return address.equals(InetSocketAddress.createUnresolved("", 0)); + } + void register(Selector selector) + throws ClosedChannelException { + channel.register(selector, this); + } + InetSocketAddress getAddress() { + return address; + } + boolean equalsAddress(InetSocketAddress address) { + return address.equals(address); + } + void init4Connection(InetSocketAddress address) { + this.address = address; + if (null != channel && channel.isConnected()) + channel.disconnect(); + channel = new Channel(); + connectionCreated = new CountDownLatch(1); + exception = null; + readBuffer = connectionThread.getInputBuffer(); + } + void prepare4Connection(SocketChannel socketChannel) + throws ProCoreException, IOException { + if (isConnected()) + throw new ProCoreException("Illegal state exception. Already connected."); + if (address.isUnresolved()) + address = new InetSocketAddress(address.getHostName(), address.getPort()); + if (address.getPort() == 0) + throw new ProCoreException("Port 0 not supported as connection address."); + channel.prepare4Connection(socketChannel, address); + } + void wait4Connection() throws InterruptedException { + connectionCreated.await(); + } + synchronized void disconnect() { + if (!isConnected()) + return; + channel.disconnect(); + } + private void throwIOExceptionFromRead(IOException e) + throws IOException { + disconnect(); // This ensures that isConnected will be false after this. + if (null != e) + throw e; + throw new IOException("Failed to read."); + } + // Called by synchronized handleInput. + private void sendMessage(ByteBuffer byteBuffer) + throws IOException, InterruptedException { + int left = byteBuffer.remaining(); + while (left > 0) { + ByteBuffer slice = byteBuffer.slice(); + int size = Math.min(left, 1000000); + slice.limit(size); + sendBuffer(slice); + int position = byteBuffer.position() + size; + byteBuffer.position(position); + left -= size; + } + } + private void sendBuffer(ByteBuffer byteBuffer) + throws IOException, InterruptedException { + int left = byteBuffer.remaining(); + while (left > 0) { + int n = channel.socket().write(byteBuffer); + if (n > 0) { + left -= n; + continue; + } + // System.err.println("############### YIELD ##############################"); + Thread.yield(); + if (left > 0) + Logger.defaultLogInfo("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining() + + ", write return =" + n); + } + } + static public class MessageHeader { + public int token; + public int lastTokenIn; + public MessageHeader(int token, int lastTokenIn) { + this.token = token; + this.lastTokenIn = lastTokenIn; + } + } + // Called by synchronized call. + protected void call(MessageHeader messageHeader, Method method) + throws IOException, InterruptedException { + byte[] bytes = new byte[20]; + ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20); + header.order(ByteOrder.LITTLE_ENDIAN); + header.putInt(messageHeader.lastTokenIn); + header.putInt(messageHeader.token); + int messageNumber = method.requestNumber; + header.putInt(messageNumber); + header.putInt(0); // inflate size, zero if no compression + ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order. + header.putInt(data.position()); + header.clear(); + synchronized (this) { + if (DebugPolicy.REPORT_DEBUG_EVENTS) + Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber); + // Receiver expects header and data to form continuous block. + sendMessage(header); + data.limit(data.position()); + data.rewind(); + sendMessage(data); + if (DebugPolicy.REPORT_DEBUG_EVENTS) + Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber); + } + } + void sendMethod(MethodQueue mq, Method m, int lastTokenIn) + throws ProCoreException, IOException, InterruptedException { + m.prepareForSendingRequest(); + if (DEBUG) + Util.showDebug("Setting last token out=" + lastTokenIn); + synchronized (this) { + MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn); + m.setToken(messageHeader.token); + mq.add(m); + call(messageHeader, m); + } + } + // Called by session manager. + void onRead() throws IOException { + if (readBuffer.remaining() < 1) + throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead.")); + int n = -1; + try { + n = channel.socket().read(readBuffer); + } catch (IOException e) { + throwIOExceptionFromRead(e); + } + if (n < 0) // Assuming that this is same as broken connection. + throwIOExceptionFromRead(new IOException("Failed to read.")); + else if (n < 1) + return; // No data, no error. + try { + readBuffer = connectionThread.handleInput(readBuffer, n); + } catch (Throwable t) { + if (t instanceof IOException) + throw (IOException)t; + else + throw new IOException("Throwable from handleInput.", t); + } + } + // Called by session manager. + void onConnectFailed(ProCoreException exception) { + if (null != exception) + this.exception = exception; + else + this.exception = new NotConnectedException("Failed to create connection."); + connectionCreated.countDown(); + } + // Called by session manager. + Connection onConnectSucceeded(SelectionKey key) { + Connection ret = null; + try { + channel.connect(key); + ret = this; + } catch (Throwable t) { + exception = new NotConnectedException("Failed to finalize connection.", t); + // Connection refused. + } + connectionCreated.countDown(); + return ret; + } + // Called by session manager. + void onDisconnect() { + disconnect(); + } + boolean isConnected() { + return channel.isConnected(); + } +// boolean isStopping() { +// } + boolean isExcepted() { + return null == exception; + } + boolean isOk() { + return isConnected() && !isExcepted(); + } +} +class HeaderData { + int lastTokenIn; + int token; + int messageNumber; + int inflateSize; + int deflateSize; +} +class ReceiveData { + public int lastTokenIn; + public int token; + public int messageNumber; +// public DataBuffer inflateBuffer; + public ByteBuffer buffer; + public int inflateSize; + ReceiveData(HeaderData headerData) { + lastTokenIn = headerData.lastTokenIn; + token = headerData.token; + messageNumber = headerData.messageNumber; + } +} +abstract class MethodHandler { + abstract void got(Packet packet, AbstractFunction function) throws ProCoreException; + DataBuffer getDataBuffer(Packet packet) { + packet.bytes.position(20); + DataBuffer db = new DataBuffer(packet.bytes, 0); + return db; + } +} +class ErrorHandler extends MethodHandler { + private final ProCoreException exception; + ErrorHandler(String s) { + if (null == s || s == "") + s = "Ilegal argument. Missing description for remote call exception."; + this.exception = new ProCoreException(s); + } + @Override + void got(Packet packet, AbstractFunction function) throws ProCoreException { + throw exception; + } +} +class ExceptionHandler extends MethodHandler { + @Override + void got(Packet packet, AbstractFunction function) throws ProCoreException { + packet.bytes.position(20); + ExceptionFunction exception = new ExceptionFunction(); + exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0)); + throw new ProCoreException(exception.what); + } +} +class DefaultHandler extends MethodHandler { + @Override + void got(Packet packet, AbstractFunction function) throws ProCoreException { + function.deserialize(function.getResponseNumber(), getDataBuffer(packet)); + } +} +class GetClusterHandler extends MethodHandler { + private final GetClusterNewFunction function; + public GetClusterHandler(final GetClusterNewFunction function) { + this.function = function; + } + @Override + void got(Packet packet, AbstractFunction function) throws ProCoreException { + this.function.inflateSize = packet.header.inflateSize; + packet.bytes.position(20); + packet.bytes.limit(20 + packet.header.deflateSize); + this.function.deflated = packet.bytes.compact(); + this.function.deflated.position(0); + this.function.deflated.limit(packet.header.deflateSize); + } +} +//class AckHandler extends MethodHandler { +// @Override +// void got(Packet packet, AbstractFunction function) throws ProCoreException { +// packet.first.position(20); +// NullFunction ack = new NullFunction(); +// ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0)); +// Util.showDebug("ack message number=" + ack.messageNumber); +// } +//} +abstract class AbstractMethod { +// abstract ByteBuffer serialize(ByteOrder byteOrder); + protected final int requestNumber; + protected final int responseNumber; + protected int receivedNumber = 0; + boolean hasResponse() { + return responseNumber != 0; + } + private int token = 0; + void setToken(int token) { + this.token = token; + } + int getToken() { + return token; + } + AbstractMethod(int requestNumber, int responseNumber) { + this.requestNumber = requestNumber; + this.responseNumber = responseNumber; + } + void prepareForSendingRequest() { + receivedNumber = 0; + } +} +class Method extends AbstractMethod { + private final ExceptionHandler exceptionHandler = new ExceptionHandler(); + private final AbstractFunction function; + private Packet packet = null; + private CountDownLatch gotPacket = new CountDownLatch(1); + private MethodHandler handler = new DefaultHandler(); + private final EventHandler eventHandler; + Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) { + super(function.getRequestNumber(), function.getResponseNumber()); + this.eventHandler = eventHandler; + if (null != handler) + this.handler = handler; + switch (function.getRequestNumber()) { + default: + break; + case MessageNumber.AcceptCommitRequest: + case MessageNumber.UpdateClusterRequest: + gotPacket.countDown(); + break; + } + this.function = function; + } // MARK: waitforReply + void waitForReply(Connection c) throws ProCoreException, InterruptedException { + while (gotPacket.getCount() > 0) { + gotPacket.await(30, TimeUnit.SECONDS); + if (gotPacket.getCount() > 0) { + System.err.println("waitForReply timeout:"); + System.err.println("wait token=" + getToken()); + // MARK: c.onWait(getToken()); + } + } + if (null != packet) + handler.got(packet, function); + } + EventHandler getEventHandler(EventHandler eventHandler) { + if (null != this.eventHandler) + return this.eventHandler; + else + return eventHandler; + } + ByteBuffer serialize(ByteOrder byteOrder) { + DataBuffer db = function.serialize(byteOrder); + return db.getByteBuffer(); + } + void gotPacket(Packet packet) { + this.packet = packet; + gotPacket.countDown(); + } + void gotException(String s) { + handler = new ErrorHandler(s); + gotPacket.countDown(); + } + void gotException(Packet packet) { + this.packet = packet; + handler = exceptionHandler; + gotPacket.countDown(); + } +// void gotAck(Packet packet) { +// this.packet = packet; +// handler = new AckHandler(); +// gotPacket.countDown(); +// } +} +class DebugPolicy { + static final boolean REPORT_DEBUG_EVENTS = false; + static final boolean REPORT_SERVER_EVENTS = false; + static final boolean REPORT_DEBUG_OPERATIONS = false; + static final boolean REPORT_TIME_CONSUMING_REQUESTS = false; + static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false; + + private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS; + private static final long LONG_EXECUTION_REPORT_PERIOD = 10000; + static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD); + + // Time to wait for an event (in milliseconds). + // Code logic should assume that this wait never expires. + // Should be big enough so that if this time is expired it indicates an error situation. + private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour + private static final boolean REPORT_WAIT_EVENTS = true; + private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true; + /** + * Calls object.wait. + * @param waitMs time to wait in milliseconds. + * @param object to wait for (used for calling object.wait) + * @param string text used in debug prints. + * @see java.lang.Object.wait + */ + static final void wait(long waitMs, Object object, String string) { + waitImpl(waitMs, object, string); + } + /** + * Calls object.wait. + * @param object to wait for (used for calling object.wait) + * @param string text used in debug prints. + * @see java.lang.Object.wait + */ + static final void wait(Object object, String string) { + waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string); + } + private static final void report(String string, Object object) { + String msg = "DEBUG: " + string + + " " + object.getClass().getSimpleName() + + " hid=" + System.identityHashCode(object); + if (REPORT_WAIT_EVENTS_VERBOSE) + new Throwable(msg).printStackTrace(); + else + System.err.println(msg); + } + private static final void waitImpl(long waitMs, Object object, String string) { + if (REPORT_WAIT_EVENTS) + report("Wait for event started.", object); + try { + object.wait(waitMs); // milliseconds + } catch (InterruptedException e) { + if (REPORT_WAIT_EVENTS) + report("Wait for event interrupted.", object); + } finally { + if (REPORT_WAIT_EVENTS) + report("Wait for event stopped.", object); + } + } +}