1 package org.simantics.db.server.internal;
\r
3 import java.io.IOException;
\r
4 import java.net.InetSocketAddress;
\r
5 import java.nio.ByteBuffer;
\r
6 import java.nio.ByteOrder;
\r
7 import java.nio.channels.CancelledKeyException;
\r
8 import java.nio.channels.ClosedChannelException;
\r
9 import java.nio.channels.SelectionKey;
\r
10 import java.nio.channels.Selector;
\r
11 import java.nio.channels.SocketChannel;
\r
12 import java.util.Collections;
\r
13 import java.util.Iterator;
\r
14 import java.util.LinkedList;
\r
15 import java.util.List;
\r
16 import java.util.concurrent.CountDownLatch;
\r
17 import java.util.concurrent.TimeUnit;
\r
19 import org.simantics.db.common.utils.Logger;
\r
20 import org.simantics.db.server.ProCoreException;
\r
21 import org.simantics.db.server.protocol.AbstractFunction;
\r
22 import org.simantics.db.server.protocol.DataBuffer;
\r
23 import org.simantics.db.server.protocol.ExceptionFunction;
\r
24 import org.simantics.db.server.protocol.GetClusterNewFunction;
\r
25 import org.simantics.db.server.protocol.MessageNumber;
\r
27 import gnu.trove.iterator.TIntObjectIterator;
\r
28 import gnu.trove.map.hash.TIntObjectHashMap;
\r
30 public class ConnectionManager {
\r
31 private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS;
\r
32 private static final ConnectionManager connectionManager = new ConnectionManager();
\r
33 public static ConnectionManager getInstance() {
\r
34 return connectionManager.instance();
\r
36 private final Manager manager = new Manager();
\r
37 private final Thread thread = new Thread(manager, "Connection Manager");
\r
38 private boolean stop = false;
\r
39 protected List<Connection> connections = Collections.synchronizedList(new LinkedList<Connection>());
\r
40 private ConnectionManager() {
\r
41 Util.trace("ConnectionManager.new");
\r
43 private ConnectionManager instance() {
\r
44 if (!thread.isAlive())
\r
48 protected class Manager implements Runnable {
\r
49 private Selector selector;
\r
50 private CountDownLatch selectorOpen = new CountDownLatch(1);
\r
55 selector = Selector.open();
\r
56 selectorOpen.countDown();
\r
58 int n = selector.select();
\r
62 processConnectors();
\r
67 } catch (CancelledKeyException e) {
\r
68 Logger.defaultLogError("Cancelled key in select loop.", e);
\r
71 } catch (Throwable x) {
\r
72 Logger.defaultLogError("Select loop failed.", x);
\r
75 void processConnectors() throws ClosedChannelException {
\r
76 while (connections.size() > 0) {
\r
77 Connection c = connections.remove(0);
\r
78 c.register(selector);
\r
81 void processSelected() {
\r
82 Iterator<SelectionKey> i = selector.selectedKeys().iterator();
\r
83 while (i.hasNext()) {
\r
84 SelectionKey sk = (SelectionKey)i.next();
\r
88 processDisconnect(sk);
\r
89 else if (sk.isConnectable())
\r
91 else if (sk.isReadable())
\r
93 } catch (CancelledKeyException e) {
\r
94 processDisconnect(sk);
\r
98 private void processDisconnect(SelectionKey key) {
\r
99 Connection c = (Connection)key.attachment();
\r
103 key.attach(null); // Just to make sure that reference to connection is released.
\r
106 private void processConnect(SelectionKey key) {
\r
107 Connection connection = (Connection)key.attachment();
\r
108 SocketChannel sc = (SocketChannel)key.channel();
\r
110 if (sc.finishConnect()) {
\r
111 int ops = key.interestOps() & ~SelectionKey.OP_CONNECT;
\r
112 ops |= SelectionKey.OP_READ;
\r
113 key.interestOps(ops);
\r
114 Connection c = connection.onConnectSucceeded(key);
\r
115 if (c == null) { // Connector refused to accept connection.
\r
116 key.attach(null); // Just to make sure that reference to connector is released.
\r
122 } catch (Throwable x) {
\r
125 key.attach(null); // Just to make sure that reference to connector is released.
\r
129 } catch (IOException e) {
\r
130 if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used.
\r
131 Logger.defaultLogError("Failed to close connection.", e);
\r
134 connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x));
\r
139 private void processRead(SelectionKey key) {
\r
140 Connection c = (Connection)key.attachment();
\r
141 SocketChannel sc = (SocketChannel)key.channel();
\r
143 int ops = key.interestOps();
\r
144 ops |= SelectionKey.OP_READ;
\r
145 key.interestOps(ops);
\r
147 } catch (IOException x) {
\r
148 if (key.isValid()) {
\r
149 key.attach(null); // Just to make sure that reference to connection is released.
\r
155 } catch (IOException e) {
\r
159 void connect(Connection connection) throws ProCoreException {
\r
161 if (connections.contains(connection))
\r
162 throw new ProCoreException("Connection already registered. connection=" + connection);
\r
163 connection.prepare4Connection(SocketChannel.open());
\r
164 connections.add(connection);
\r
165 manager.selector.wakeup();
\r
166 connection.wait4Connection();
\r
167 } catch(IOException e) {
\r
168 throw new ConnectionException("Failed to connect to " + connection + ".", e);
\r
169 } catch (InterruptedException e) {
\r
170 if (!connection.isConnected())
\r
171 throw new NotConnectedException("Connection interrupted to " + connection + ".");
\r
174 private void start() {
\r
176 boolean once = true;
\r
177 while (manager.selectorOpen.getCount() > 0) {
\r
179 manager.selectorOpen.await();
\r
180 } catch (InterruptedException e) {
\r
183 Util.logError("Wait for selector open interrupted. Continuing wait.", e);
\r
188 public void stop() {
\r
190 manager.selector.wakeup();
\r
193 //class MethodQueue {
\r
194 // private LinkedList<Method> methods = new LinkedList<Method>();
\r
197 // public synchronized int size() {
\r
198 // return methods.size();
\r
200 // public synchronized void addFirst(Method method) {
\r
201 // methods.addFirst(method);
\r
203 // public synchronized void addLast(Method method) {
\r
204 // methods.addLast(method);
\r
206 // public synchronized Method removeFirst() {
\r
208 // Method method = methods.removeFirst();
\r
210 // } catch (NoSuchElementException e) {
\r
214 // public synchronized void close() {
\r
215 // while (size() > 0) {
\r
216 // Method method = removeFirst();
\r
217 // method.setExceptionText("Closing.");
\r
218 // method.gotResponse();
\r
223 //class RequestQueue {
\r
224 // private LinkedList<AbstractFunction> functions = new LinkedList<AbstractFunction>();
\r
226 // RequestQueue() {
\r
228 // public synchronized int size() {
\r
229 // return functions.size();
\r
231 // public synchronized void addFirst(AbstractFunction e) {
\r
232 // functions.addFirst(e);
\r
234 // public synchronized void addLast(AbstractFunction e) {
\r
235 // functions.addLast(e);
\r
237 // public synchronized AbstractFunction removeFirst() {
\r
239 // AbstractFunction ret = functions.removeFirst();
\r
241 // } catch (NoSuchElementException e) {
\r
246 class MethodQueue {
\r
247 private TIntObjectHashMap<Method> methodMap = new TIntObjectHashMap<Method>();
\r
250 public synchronized int size() {
\r
251 return methodMap.size();
\r
253 public synchronized boolean isEmpty() {
\r
254 return 0 == size();
\r
256 public synchronized void add(Method method) throws ProCoreException {
\r
257 int token = method.getToken();
\r
259 throw new ProCoreException("Illegal token number.");
\r
260 Method oldMethod = methodMap.put(token, method);
\r
261 if (null != oldMethod)
\r
262 throw new ProCoreException("Duplicate token number.");
\r
264 public synchronized Method remove(int token) {
\r
265 Method method = methodMap.remove(token);
\r
268 synchronized void close() {
\r
270 TIntObjectIterator<Method> iterator = methodMap.iterator();
\r
271 while (iterator.hasNext()) {
\r
272 iterator.advance();
\r
273 Method method = iterator.value();
\r
274 if (null == method)
\r
275 Util.logError("Null function in method queue.");
\r
276 method.gotException("Method queue closing.");
\r
282 // public synchronized Method removeAny() throws ProCoreException {
\r
283 // TIntObjectIterator<Method> iterator = methodMap.iterator();
\r
284 // if (!iterator.hasNext())
\r
286 // iterator.advance();
\r
287 // Method method = iterator.value();
\r
288 // if (null == method)
\r
289 // throw new ProCoreException("Null function in response queue.");
\r
290 // methodMap.remove(method.getToken());
\r
295 private SelectionKey selectionKey = null;
\r
296 private SocketChannel socketChannel = null;
\r
299 void register(Selector selector, Connection connection)
\r
300 throws ClosedChannelException {
\r
301 if (null == socketChannel)
\r
302 throw new ClosedChannelException();
\r
303 socketChannel.register(selector, SelectionKey.OP_CONNECT, connection);
\r
305 void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address)
\r
306 throws ProCoreException, IOException {
\r
307 if (null == socketChannel)
\r
308 throw new NotConnectedException("Illegal argument. No socket channel to connect to.");
\r
309 if (null == address)
\r
310 throw new NotConnectedException("Illegal argument. No address to connect to.");
\r
311 if (null != this.socketChannel)
\r
312 throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this);
\r
313 socketChannel.configureBlocking(false);
\r
314 boolean connected = socketChannel.connect(address);
\r
316 if (DebugPolicy.REPORT_DEBUG_EVENTS)
\r
317 Util.showDebug("Connected to " + socketChannel + ".");
\r
318 this.socketChannel = socketChannel; // Waiting for connect.
\r
320 // private static final int IPTOS_LOWDELAY = 0x10;
\r
321 synchronized void connect(SelectionKey key) {
\r
322 this.selectionKey = key;
\r
323 this.socketChannel = (SocketChannel)key.channel();
\r
324 if (DebugPolicy.REPORT_DEBUG_EVENTS)
\r
325 Util.showDebug("Connected to " + socketChannel + ".");
\r
327 // this.socketChannel.socket().setKeepAlive(true);
\r
328 // this.socketChannel.socket().setSoLinger(true, 1);
\r
329 // } catch (SocketException e) {
\r
330 // e.printStackTrace();
\r
333 // this.socketChannel.socket().setTcpNoDelay(true);
\r
335 // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY);
\r
336 // } catch (SocketException e) {
\r
337 // e.printStackTrace();
\r
340 synchronized void disconnect() {
\r
341 if (selectionKey == null)
\r
345 if (DebugPolicy.REPORT_DEBUG_EVENTS)
\r
346 Util.showDebug("Disconnected from " + socketChannel);
\r
347 selectionKey.interestOps(0);
\r
348 selectionKey.cancel();
\r
349 selectionKey.selector().wakeup();
\r
350 } catch (CancelledKeyException e) {
\r
351 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
\r
352 Logger.defaultLogError("Failed to disconnect cleanly.", e);
\r
353 } catch (IllegalArgumentException e) {
\r
354 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
\r
355 Logger.defaultLogError("Failed to disconnect cleanly.", e);
\r
357 try { // Note that this must be after the selectionKey operations above.
\r
358 socketChannel.socket().close();
\r
359 } catch (IOException e) {
\r
360 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
\r
361 Logger.defaultLogError("Failed to close socket cleanly.", e);
\r
364 selectionKey = null;
\r
365 socketChannel = null;
\r
368 synchronized boolean isConnected() {
\r
369 boolean ret = selectionKey != null && socketChannel.isConnected();
\r
372 synchronized SocketChannel socket() throws IOException {
\r
373 if (null == socketChannel)
\r
374 throw new IOException("Socket channel is null.");
\r
375 return socketChannel;
\r
377 synchronized void listenReadEvents()
\r
378 throws IOException {
\r
379 if (null == selectionKey)
\r
380 throw new IOException("Trying to listen connection which is not connected.");
\r
381 int ops = selectionKey.interestOps() | SelectionKey.OP_READ;
\r
382 selectionKey.interestOps(ops);
\r
383 selectionKey.selector().wakeup();
\r
387 private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS;
\r
388 private static int lastSentTokenNumber = 0;
\r
389 private final ConnectionThread connectionThread;
\r
390 private InetSocketAddress address;
\r
391 private Channel channel;
\r
392 private CountDownLatch connectionCreated;
\r
393 private ProCoreException exception;
\r
394 private ByteBuffer readBuffer;
\r
395 Connection(ConnectionThread connectionThread) {
\r
396 this.connectionThread = connectionThread;
\r
397 init4Connection(InetSocketAddress.createUnresolved("", 0));
\r
400 public String toString() {
\r
401 return "Connection address=" + address + " token=" + lastSentTokenNumber;
\r
403 boolean addressNotInitialized() {
\r
404 return address.equals(InetSocketAddress.createUnresolved("", 0));
\r
406 void register(Selector selector)
\r
407 throws ClosedChannelException {
\r
408 channel.register(selector, this);
\r
410 InetSocketAddress getAddress() {
\r
413 boolean equalsAddress(InetSocketAddress address) {
\r
414 return address.equals(address);
\r
416 void init4Connection(InetSocketAddress address) {
\r
417 this.address = address;
\r
418 if (null != channel && channel.isConnected())
\r
419 channel.disconnect();
\r
420 channel = new Channel();
\r
421 connectionCreated = new CountDownLatch(1);
\r
423 readBuffer = connectionThread.getInputBuffer();
\r
425 void prepare4Connection(SocketChannel socketChannel)
\r
426 throws ProCoreException, IOException {
\r
428 throw new ProCoreException("Illegal state exception. Already connected.");
\r
429 if (address.isUnresolved())
\r
430 address = new InetSocketAddress(address.getHostName(), address.getPort());
\r
431 if (address.getPort() == 0)
\r
432 throw new ProCoreException("Port 0 not supported as connection address.");
\r
433 channel.prepare4Connection(socketChannel, address);
\r
435 void wait4Connection() throws InterruptedException {
\r
436 connectionCreated.await();
\r
438 synchronized void disconnect() {
\r
439 if (!isConnected())
\r
441 channel.disconnect();
\r
443 private void throwIOExceptionFromRead(IOException e)
\r
444 throws IOException {
\r
445 disconnect(); // This ensures that isConnected will be false after this.
\r
448 throw new IOException("Failed to read.");
\r
450 // Called by synchronized handleInput.
\r
451 private void sendMessage(ByteBuffer byteBuffer)
\r
452 throws IOException, InterruptedException {
\r
453 int left = byteBuffer.remaining();
\r
455 ByteBuffer slice = byteBuffer.slice();
\r
456 int size = Math.min(left, 1000000);
\r
459 int position = byteBuffer.position() + size;
\r
460 byteBuffer.position(position);
\r
464 private void sendBuffer(ByteBuffer byteBuffer)
\r
465 throws IOException, InterruptedException {
\r
466 int left = byteBuffer.remaining();
\r
468 int n = channel.socket().write(byteBuffer);
\r
473 // System.err.println("############### YIELD ##############################");
\r
476 Logger.defaultLogTrace("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining()
\r
477 + ", write return =" + n);
\r
480 static public class MessageHeader {
\r
482 public int lastTokenIn;
\r
483 public MessageHeader(int token, int lastTokenIn) {
\r
484 this.token = token;
\r
485 this.lastTokenIn = lastTokenIn;
\r
488 // Called by synchronized call.
\r
489 protected void call(MessageHeader messageHeader, Method method)
\r
490 throws IOException, InterruptedException {
\r
491 byte[] bytes = new byte[20];
\r
492 ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20);
\r
493 header.order(ByteOrder.LITTLE_ENDIAN);
\r
494 header.putInt(messageHeader.lastTokenIn);
\r
495 header.putInt(messageHeader.token);
\r
496 int messageNumber = method.requestNumber;
\r
497 header.putInt(messageNumber);
\r
498 header.putInt(0); // inflate size, zero if no compression
\r
499 ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order.
\r
500 header.putInt(data.position());
\r
502 synchronized (this) {
\r
503 if (DebugPolicy.REPORT_DEBUG_EVENTS)
\r
504 Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber);
\r
505 // Receiver expects header and data to form continuous block.
\r
506 sendMessage(header);
\r
507 data.limit(data.position());
\r
510 if (DebugPolicy.REPORT_DEBUG_EVENTS)
\r
511 Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber);
\r
514 void sendMethod(MethodQueue mq, Method m, int lastTokenIn)
\r
515 throws ProCoreException, IOException, InterruptedException {
\r
516 m.prepareForSendingRequest();
\r
518 Util.showDebug("Setting last token out=" + lastTokenIn);
\r
519 synchronized (this) {
\r
520 MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn);
\r
521 m.setToken(messageHeader.token);
\r
523 call(messageHeader, m);
\r
526 // Called by session manager.
\r
527 void onRead() throws IOException {
\r
528 if (readBuffer.remaining() < 1)
\r
529 throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead."));
\r
532 n = channel.socket().read(readBuffer);
\r
533 } catch (IOException e) {
\r
534 throwIOExceptionFromRead(e);
\r
536 if (n < 0) // Assuming that this is same as broken connection.
\r
537 throwIOExceptionFromRead(new IOException("Failed to read."));
\r
539 return; // No data, no error.
\r
541 readBuffer = connectionThread.handleInput(readBuffer, n);
\r
542 } catch (Throwable t) {
\r
543 if (t instanceof IOException)
\r
544 throw (IOException)t;
\r
546 throw new IOException("Throwable from handleInput.", t);
\r
549 // Called by session manager.
\r
550 void onConnectFailed(ProCoreException exception) {
\r
551 if (null != exception)
\r
552 this.exception = exception;
\r
554 this.exception = new NotConnectedException("Failed to create connection.");
\r
555 connectionCreated.countDown();
\r
557 // Called by session manager.
\r
558 Connection onConnectSucceeded(SelectionKey key) {
\r
559 Connection ret = null;
\r
561 channel.connect(key);
\r
563 } catch (Throwable t) {
\r
564 exception = new NotConnectedException("Failed to finalize connection.", t);
\r
565 // Connection refused.
\r
567 connectionCreated.countDown();
\r
570 // Called by session manager.
\r
571 void onDisconnect() {
\r
574 boolean isConnected() {
\r
575 return channel.isConnected();
\r
577 // boolean isStopping() {
\r
579 boolean isExcepted() {
\r
580 return null == exception;
\r
583 return isConnected() && !isExcepted();
\r
593 class ReceiveData {
\r
594 public int lastTokenIn;
\r
596 public int messageNumber;
\r
597 // public DataBuffer inflateBuffer;
\r
598 public ByteBuffer buffer;
\r
599 public int inflateSize;
\r
600 ReceiveData(HeaderData headerData) {
\r
601 lastTokenIn = headerData.lastTokenIn;
\r
602 token = headerData.token;
\r
603 messageNumber = headerData.messageNumber;
\r
606 abstract class MethodHandler {
\r
607 abstract void got(Packet packet, AbstractFunction function) throws ProCoreException;
\r
608 DataBuffer getDataBuffer(Packet packet) {
\r
609 packet.bytes.position(20);
\r
610 DataBuffer db = new DataBuffer(packet.bytes, 0);
\r
614 class ErrorHandler extends MethodHandler {
\r
615 private final ProCoreException exception;
\r
616 ErrorHandler(String s) {
\r
617 if (null == s || s == "")
\r
618 s = "Ilegal argument. Missing description for remote call exception.";
\r
619 this.exception = new ProCoreException(s);
\r
622 void got(Packet packet, AbstractFunction function) throws ProCoreException {
\r
626 class ExceptionHandler extends MethodHandler {
\r
628 void got(Packet packet, AbstractFunction function) throws ProCoreException {
\r
629 packet.bytes.position(20);
\r
630 ExceptionFunction exception = new ExceptionFunction();
\r
631 exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0));
\r
632 throw new ProCoreException(exception.what);
\r
635 class DefaultHandler extends MethodHandler {
\r
637 void got(Packet packet, AbstractFunction function) throws ProCoreException {
\r
638 function.deserialize(function.getResponseNumber(), getDataBuffer(packet));
\r
641 class GetClusterHandler extends MethodHandler {
\r
642 private final GetClusterNewFunction function;
\r
643 public GetClusterHandler(final GetClusterNewFunction function) {
\r
644 this.function = function;
\r
647 void got(Packet packet, AbstractFunction function) throws ProCoreException {
\r
648 this.function.inflateSize = packet.header.inflateSize;
\r
649 packet.bytes.position(20);
\r
650 packet.bytes.limit(20 + packet.header.deflateSize);
\r
651 this.function.deflated = packet.bytes.compact();
\r
652 this.function.deflated.position(0);
\r
653 this.function.deflated.limit(packet.header.deflateSize);
\r
656 //class AckHandler extends MethodHandler {
\r
658 // void got(Packet packet, AbstractFunction function) throws ProCoreException {
\r
659 // packet.first.position(20);
\r
660 // NullFunction ack = new NullFunction();
\r
661 // ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0));
\r
662 // Util.showDebug("ack message number=" + ack.messageNumber);
\r
665 abstract class AbstractMethod {
\r
666 // abstract ByteBuffer serialize(ByteOrder byteOrder);
\r
667 protected final int requestNumber;
\r
668 protected final int responseNumber;
\r
669 protected int receivedNumber = 0;
\r
670 boolean hasResponse() {
\r
671 return responseNumber != 0;
\r
673 private int token = 0;
\r
674 void setToken(int token) {
\r
675 this.token = token;
\r
680 AbstractMethod(int requestNumber, int responseNumber) {
\r
681 this.requestNumber = requestNumber;
\r
682 this.responseNumber = responseNumber;
\r
684 void prepareForSendingRequest() {
\r
685 receivedNumber = 0;
\r
688 class Method extends AbstractMethod {
\r
689 private final ExceptionHandler exceptionHandler = new ExceptionHandler();
\r
690 private final AbstractFunction function;
\r
691 private Packet packet = null;
\r
692 private CountDownLatch gotPacket = new CountDownLatch(1);
\r
693 private MethodHandler handler = new DefaultHandler();
\r
694 private final EventHandler eventHandler;
\r
695 Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) {
\r
696 super(function.getRequestNumber(), function.getResponseNumber());
\r
697 this.eventHandler = eventHandler;
\r
698 if (null != handler)
\r
699 this.handler = handler;
\r
700 switch (function.getRequestNumber()) {
\r
703 case MessageNumber.AcceptCommitRequest:
\r
704 case MessageNumber.UpdateClusterRequest:
\r
705 gotPacket.countDown();
\r
708 this.function = function;
\r
709 } // MARK: waitforReply
\r
710 void waitForReply(Connection c) throws ProCoreException, InterruptedException {
\r
711 while (gotPacket.getCount() > 0) {
\r
712 gotPacket.await(30, TimeUnit.SECONDS);
\r
713 if (gotPacket.getCount() > 0) {
\r
714 System.err.println("waitForReply timeout:");
\r
715 System.err.println("wait token=" + getToken());
\r
716 // MARK: c.onWait(getToken());
\r
719 if (null != packet)
\r
720 handler.got(packet, function);
\r
722 EventHandler getEventHandler(EventHandler eventHandler) {
\r
723 if (null != this.eventHandler)
\r
724 return this.eventHandler;
\r
726 return eventHandler;
\r
728 ByteBuffer serialize(ByteOrder byteOrder) {
\r
729 DataBuffer db = function.serialize(byteOrder);
\r
730 return db.getByteBuffer();
\r
732 void gotPacket(Packet packet) {
\r
733 this.packet = packet;
\r
734 gotPacket.countDown();
\r
736 void gotException(String s) {
\r
737 handler = new ErrorHandler(s);
\r
738 gotPacket.countDown();
\r
740 void gotException(Packet packet) {
\r
741 this.packet = packet;
\r
742 handler = exceptionHandler;
\r
743 gotPacket.countDown();
\r
745 // void gotAck(Packet packet) {
\r
746 // this.packet = packet;
\r
747 // handler = new AckHandler();
\r
748 // gotPacket.countDown();
\r
751 class DebugPolicy {
\r
752 static final boolean REPORT_DEBUG_EVENTS = false;
\r
753 static final boolean REPORT_SERVER_EVENTS = false;
\r
754 static final boolean REPORT_DEBUG_OPERATIONS = false;
\r
755 static final boolean REPORT_TIME_CONSUMING_REQUESTS = false;
\r
756 static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false;
\r
758 private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS;
\r
759 private static final long LONG_EXECUTION_REPORT_PERIOD = 10000;
\r
760 static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD);
\r
762 // Time to wait for an event (in milliseconds).
\r
763 // Code logic should assume that this wait never expires.
\r
764 // Should be big enough so that if this time is expired it indicates an error situation.
\r
765 private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour
\r
766 private static final boolean REPORT_WAIT_EVENTS = true;
\r
767 private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true;
\r
769 * Calls object.wait.
\r
770 * @param waitMs time to wait in milliseconds.
\r
771 * @param object to wait for (used for calling object.wait)
\r
772 * @param string text used in debug prints.
\r
773 * @see java.lang.Object.wait
\r
775 static final void wait(long waitMs, Object object, String string) {
\r
776 waitImpl(waitMs, object, string);
\r
779 * Calls object.wait.
\r
780 * @param object to wait for (used for calling object.wait)
\r
781 * @param string text used in debug prints.
\r
782 * @see java.lang.Object.wait
\r
784 static final void wait(Object object, String string) {
\r
785 waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string);
\r
787 private static final void report(String string, Object object) {
\r
788 String msg = "DEBUG: " + string
\r
789 + " " + object.getClass().getSimpleName()
\r
790 + " hid=" + System.identityHashCode(object);
\r
791 if (REPORT_WAIT_EVENTS_VERBOSE)
\r
792 new Throwable(msg).printStackTrace();
\r
794 System.err.println(msg);
\r
796 private static final void waitImpl(long waitMs, Object object, String string) {
\r
797 if (REPORT_WAIT_EVENTS)
\r
798 report("Wait for event started.", object);
\r
800 object.wait(waitMs); // milliseconds
\r
801 } catch (InterruptedException e) {
\r
802 if (REPORT_WAIT_EVENTS)
\r
803 report("Wait for event interrupted.", object);
\r
805 if (REPORT_WAIT_EVENTS)
\r
806 report("Wait for event stopped.", object);
\r