]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ConnectionManager.java
Goodbye db-client.log
[simantics/platform.git] / bundles / org.simantics.db.server / src / org / simantics / db / server / internal / ConnectionManager.java
1 package org.simantics.db.server.internal;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.ByteBuffer;
6 import java.nio.ByteOrder;
7 import java.nio.channels.CancelledKeyException;
8 import java.nio.channels.ClosedChannelException;
9 import java.nio.channels.SelectionKey;
10 import java.nio.channels.Selector;
11 import java.nio.channels.SocketChannel;
12 import java.util.Collections;
13 import java.util.Iterator;
14 import java.util.LinkedList;
15 import java.util.List;
16 import java.util.concurrent.CountDownLatch;
17 import java.util.concurrent.TimeUnit;
18
19 import org.simantics.db.common.utils.Logger;
20 import org.simantics.db.server.ProCoreException;
21 import org.simantics.db.server.protocol.AbstractFunction;
22 import org.simantics.db.server.protocol.DataBuffer;
23 import org.simantics.db.server.protocol.ExceptionFunction;
24 import org.simantics.db.server.protocol.GetClusterNewFunction;
25 import org.simantics.db.server.protocol.MessageNumber;
26
27 import gnu.trove.iterator.TIntObjectIterator;
28 import gnu.trove.map.hash.TIntObjectHashMap;
29
30 public class ConnectionManager {
31     private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS;
32     private static final ConnectionManager connectionManager = new ConnectionManager();
33     public static ConnectionManager getInstance() {
34         return connectionManager.instance();
35     }
36     private final Manager manager = new Manager();
37     private final Thread thread = new Thread(manager, "Connection Manager");
38     private boolean stop = false;
39     protected List<Connection> connections = Collections.synchronizedList(new LinkedList<Connection>());
40     private ConnectionManager() {
41         Util.trace("ConnectionManager.new");
42     }
43     private ConnectionManager instance() {
44         if (!thread.isAlive())
45             start();
46         return this;
47     }
48     protected class Manager implements Runnable {
49         private Selector selector;
50         private CountDownLatch selectorOpen = new CountDownLatch(1);
51         Manager() {
52         }
53         public void run() {
54             try {
55                 selector = Selector.open();
56                 selectorOpen.countDown();
57                 while (true) {
58                     int n = selector.select();
59                     try {
60                         if (n > 0)
61                             processSelected();
62                         processConnectors();
63                         if (stop) {
64                             selector.close();
65                             return;
66                         }
67                     } catch (CancelledKeyException e) {
68                         Logger.defaultLogError("Cancelled key in select loop.", e);
69                     }
70                 }
71             } catch (Throwable x) {
72                 Logger.defaultLogError("Select loop failed.", x);
73             }
74         }
75         void processConnectors() throws ClosedChannelException {
76             while (connections.size() > 0) {
77                 Connection c = connections.remove(0);
78                 c.register(selector);
79             }
80         }
81         void processSelected() {
82             Iterator<SelectionKey> i = selector.selectedKeys().iterator();
83             while (i.hasNext()) {
84                 SelectionKey sk = (SelectionKey)i.next();
85                 i.remove();
86                 try {
87                     if (!sk.isValid())
88                         processDisconnect(sk);
89                     else if (sk.isConnectable())
90                         processConnect(sk);
91                     else if (sk.isReadable())
92                         processRead(sk);
93                 } catch (CancelledKeyException e) {
94                     processDisconnect(sk);
95                 }
96             }
97         }
98         private void processDisconnect(SelectionKey key) {
99             Connection c = (Connection)key.attachment();
100             if (null == c)
101                 return;
102             c.onDisconnect();
103             key.attach(null); // Just to make sure that reference to connection is released.
104             key.cancel();
105         }
106         private void processConnect(SelectionKey key) {
107                 Connection connection = (Connection)key.attachment();
108                 SocketChannel sc = (SocketChannel)key.channel();
109                 try {
110                     if (sc.finishConnect()) {
111                         int ops = key.interestOps() & ~SelectionKey.OP_CONNECT;
112                         ops |= SelectionKey.OP_READ;
113                         key.interestOps(ops);
114                         Connection c = connection.onConnectSucceeded(key);
115                         if (c == null) { // Connector refused to accept connection.
116                             key.attach(null); // Just to make sure that reference to connector is released.
117                             key.cancel();
118                             sc.close();
119                         } else
120                             key.attach(c);
121                     }
122                 } catch (Throwable x) {
123                     try {
124                         if (key.isValid())
125                             key.attach(null); // Just to make sure that reference to connector is released.
126                             key.cancel();
127                         try {
128                             sc.close();
129                         } catch (IOException e) {
130                             if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used.
131                                 Logger.defaultLogError("Failed to close connection.", e);
132                         }
133                     } finally {
134                         connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x));
135                     }
136                 }
137             }
138         }
139         private void processRead(SelectionKey key) {
140             Connection c = (Connection)key.attachment();
141             SocketChannel sc = (SocketChannel)key.channel();
142             try {
143                 int ops = key.interestOps();
144                 ops |= SelectionKey.OP_READ;
145                 key.interestOps(ops);
146                 c.onRead();
147             } catch (IOException x) {
148                 if (key.isValid()) {
149                     key.attach(null); // Just to make sure that reference to connection is released.
150                     key.cancel();
151                 }
152                 if (sc.isOpen())
153                     try {
154                         sc.close();
155                     } catch (IOException e) {
156                     }
157             }
158         }
159         void connect(Connection connection) throws ProCoreException {
160             try {
161                 if (connections.contains(connection))
162                     throw new ProCoreException("Connection already registered. connection=" + connection);
163                 connection.prepare4Connection(SocketChannel.open());
164                 connections.add(connection);
165                 manager.selector.wakeup();
166                 connection.wait4Connection();
167             } catch(IOException e) {
168                 throw new ConnectionException("Failed to connect to " + connection + ".", e);
169             } catch (InterruptedException e) {
170                 if (!connection.isConnected())
171                     throw new NotConnectedException("Connection interrupted to "  + connection + ".");
172             }
173         }
174     private void start() {
175         thread.start();
176         boolean once = true;
177         while (manager.selectorOpen.getCount() > 0) {
178             try {
179                 manager.selectorOpen.await();
180             } catch (InterruptedException e) {
181                 if (once) {
182                     once = false;
183                     Util.logError("Wait for selector open interrupted. Continuing wait.", e);
184                 }
185             }
186         }
187     }
188     public void stop() {
189         stop = true;
190         manager.selector.wakeup();
191     }
192 }
193 //class MethodQueue {
194 //    private LinkedList<Method> methods = new LinkedList<Method>();
195 //    MethodQueue() {
196 //    }
197 //    public synchronized int size() {
198 //        return methods.size();
199 //    }
200 //    public synchronized void addFirst(Method method) {
201 //        methods.addFirst(method);
202 //    }
203 //    public synchronized void addLast(Method method) {
204 //        methods.addLast(method);
205 //    }
206 //    public synchronized Method removeFirst() {
207 //        try {
208 //            Method method = methods.removeFirst();
209 //            return method;
210 //        } catch (NoSuchElementException e) {
211 //            return null;
212 //        }
213 //    }
214 //    public synchronized void close() {
215 //        while (size() > 0) {
216 //            Method method = removeFirst();
217 //            method.setExceptionText("Closing.");
218 //            method.gotResponse();
219 //        }
220 //    }
221 //
222 //}
223 //class RequestQueue {
224 //    private LinkedList<AbstractFunction> functions = new LinkedList<AbstractFunction>();
225 //
226 //    RequestQueue() {
227 //    }
228 //    public synchronized int size() {
229 //        return functions.size();
230 //    }
231 //    public synchronized void addFirst(AbstractFunction e) {
232 //        functions.addFirst(e);
233 //    }
234 //    public synchronized void addLast(AbstractFunction e) {
235 //        functions.addLast(e);
236 //    }
237 //    public synchronized AbstractFunction removeFirst() {
238 //        try {
239 //            AbstractFunction ret = functions.removeFirst();
240 //            return ret;
241 //        } catch (NoSuchElementException e) {
242 //            return null;
243 //        }
244 //    }
245 //}
246 class MethodQueue {
247     private TIntObjectHashMap<Method> methodMap = new TIntObjectHashMap<Method>();
248     MethodQueue() {
249     }
250     public synchronized int size() {
251         return methodMap.size();
252     }
253     public synchronized boolean isEmpty() {
254         return 0 == size();
255     }
256     public synchronized void add(Method method) throws ProCoreException {
257         int token = method.getToken();
258         if (0 == token)
259             throw new ProCoreException("Illegal token number.");
260         Method oldMethod = methodMap.put(token, method);
261         if (null != oldMethod)
262             throw new ProCoreException("Duplicate token number.");
263     }
264     public synchronized Method remove(int token) {
265         Method method = methodMap.remove(token);
266         return method;
267     }
268     synchronized void close() {
269         try {
270             TIntObjectIterator<Method> iterator = methodMap.iterator();
271             while (iterator.hasNext()) {
272               iterator.advance();
273               Method method = iterator.value();
274               if (null == method)
275                   Util.logError("Null function in method queue.");
276               method.gotException("Method queue closing.");
277           }
278         } finally {
279             methodMap.clear();
280         }
281     }
282 //    public synchronized Method removeAny() throws ProCoreException {
283 //        TIntObjectIterator<Method> iterator = methodMap.iterator();
284 //        if (!iterator.hasNext())
285 //            return null;
286 //        iterator.advance();
287 //        Method method = iterator.value();
288 //        if (null == method)
289 //            throw new ProCoreException("Null function in response queue.");
290 //        methodMap.remove(method.getToken());
291 //        return method;
292 //    }
293 }
294 class Channel {
295     private SelectionKey selectionKey = null;
296     private SocketChannel socketChannel = null;
297     Channel() {
298     }
299     void register(Selector selector, Connection connection)
300     throws ClosedChannelException {
301         if (null == socketChannel)
302             throw new ClosedChannelException();
303         socketChannel.register(selector, SelectionKey.OP_CONNECT, connection);
304     }
305     void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address)
306     throws ProCoreException, IOException {
307         if (null == socketChannel)
308             throw new NotConnectedException("Illegal argument. No socket channel to connect to.");
309         if (null == address)
310             throw new NotConnectedException("Illegal argument. No address to connect to.");
311         if (null != this.socketChannel)
312             throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this);
313         socketChannel.configureBlocking(false);
314         boolean connected = socketChannel.connect(address);
315         if (connected)
316             if (DebugPolicy.REPORT_DEBUG_EVENTS)
317                 Util.showDebug("Connected to " + socketChannel + ".");
318         this.socketChannel = socketChannel; // Waiting for connect.
319     }
320 //    private static final int IPTOS_LOWDELAY = 0x10;
321     synchronized void connect(SelectionKey key) {
322         this.selectionKey = key;
323         this.socketChannel = (SocketChannel)key.channel();
324         if (DebugPolicy.REPORT_DEBUG_EVENTS)
325             Util.showDebug("Connected to " + socketChannel + ".");
326 //        try {
327 //            this.socketChannel.socket().setKeepAlive(true);
328 //            this.socketChannel.socket().setSoLinger(true, 1);
329 //        } catch (SocketException e) {
330 //            e.printStackTrace();
331 //        }
332 //        try {
333 //            this.socketChannel.socket().setTcpNoDelay(true);
334         // Crashes Kalle!
335         // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY);
336 //        } catch (SocketException e) {
337 //            e.printStackTrace();
338 //        }
339     }
340     synchronized void disconnect() {
341         if (selectionKey == null)
342             return;
343         try {
344             try {
345                 if (DebugPolicy.REPORT_DEBUG_EVENTS)
346                     Util.showDebug("Disconnected from " + socketChannel);
347                 selectionKey.interestOps(0);
348                 selectionKey.cancel();
349                 selectionKey.selector().wakeup();
350             } catch (CancelledKeyException e) {
351                 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
352                     Logger.defaultLogError("Failed to disconnect cleanly.", e);
353             } catch (IllegalArgumentException e) {
354                 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
355                     Logger.defaultLogError("Failed to disconnect cleanly.", e);
356             }
357             try { // Note that this must be after the selectionKey operations above.
358                 socketChannel.socket().close();
359             } catch (IOException e) {
360                 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)
361                     Logger.defaultLogError("Failed to close socket cleanly.", e);
362             }
363         } finally {
364             selectionKey = null;
365             socketChannel = null;
366         }
367     }
368     synchronized boolean isConnected() {
369         boolean ret = selectionKey != null && socketChannel.isConnected();
370         return ret;
371     }
372     synchronized SocketChannel socket() throws IOException {
373         if (null == socketChannel)
374             throw new IOException("Socket channel is null.");
375         return socketChannel;
376     }
377     synchronized void listenReadEvents()
378     throws IOException {
379         if (null == selectionKey)
380             throw new IOException("Trying to listen connection which is not connected.");
381         int ops = selectionKey.interestOps() | SelectionKey.OP_READ;
382         selectionKey.interestOps(ops);
383         selectionKey.selector().wakeup();
384     }
385 }
386 class Connection {
387     private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS;
388     private static int lastSentTokenNumber = 0;
389     private final ConnectionThread connectionThread;
390     private InetSocketAddress address;
391     private Channel channel;
392     private CountDownLatch connectionCreated;
393     private ProCoreException exception;
394     private ByteBuffer readBuffer;
395     Connection(ConnectionThread connectionThread) {
396         this.connectionThread = connectionThread;
397         init4Connection(InetSocketAddress.createUnresolved("", 0));
398     }
399     @Override
400     public String toString() {
401         return "Connection address=" + address + " token=" + lastSentTokenNumber;
402     }
403     boolean addressNotInitialized() {
404         return address.equals(InetSocketAddress.createUnresolved("", 0));
405     }
406     void register(Selector selector)
407     throws ClosedChannelException {
408         channel.register(selector, this);
409     }
410     InetSocketAddress getAddress() {
411         return address;
412     }
413     boolean equalsAddress(InetSocketAddress address) {
414         return address.equals(address);
415     }
416     void init4Connection(InetSocketAddress address) {
417         this.address = address;
418         if (null != channel && channel.isConnected())
419             channel.disconnect();
420         channel = new Channel();
421         connectionCreated = new CountDownLatch(1);
422         exception = null;
423         readBuffer = connectionThread.getInputBuffer();
424     }
425     void prepare4Connection(SocketChannel socketChannel)
426     throws ProCoreException, IOException {
427         if (isConnected())
428             throw new ProCoreException("Illegal state exception. Already connected.");
429         if (address.isUnresolved())
430             address = new InetSocketAddress(address.getHostName(), address.getPort());
431         if (address.getPort() == 0)
432             throw new ProCoreException("Port 0 not supported as connection address.");
433         channel.prepare4Connection(socketChannel, address);
434     }
435     void wait4Connection() throws InterruptedException {
436         connectionCreated.await();
437     }
438     synchronized void disconnect() {
439         if (!isConnected())
440             return;
441         channel.disconnect();
442     }
443     private void throwIOExceptionFromRead(IOException e)
444     throws IOException {
445         disconnect(); // This ensures that isConnected will be false after this.
446         if (null != e)
447             throw e;
448         throw new IOException("Failed to read.");
449     }
450     // Called by synchronized handleInput.
451     private void sendMessage(ByteBuffer byteBuffer)
452     throws IOException, InterruptedException {
453         int left = byteBuffer.remaining();
454         while (left > 0) {
455             ByteBuffer slice = byteBuffer.slice();
456             int size = Math.min(left, 1000000);
457             slice.limit(size);
458             sendBuffer(slice);
459             int position = byteBuffer.position() + size;
460             byteBuffer.position(position);
461             left -= size;
462         }
463     }
464     private void sendBuffer(ByteBuffer byteBuffer)
465     throws IOException, InterruptedException {
466         int left = byteBuffer.remaining();
467         while (left > 0) {
468             int n = channel.socket().write(byteBuffer);
469             if (n > 0) {
470                 left -= n;
471                 continue;
472             }
473             // System.err.println("############### YIELD ##############################");
474             Thread.yield();
475             if (left > 0)
476                 Logger.defaultLogInfo("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining()
477                         + ", write return =" + n);
478         }
479     }
480     static public class MessageHeader {
481         public int token;
482         public int lastTokenIn;
483         public MessageHeader(int token, int lastTokenIn) {
484             this.token = token;
485             this.lastTokenIn = lastTokenIn;
486         }
487     }
488     // Called by synchronized call.
489     protected void call(MessageHeader messageHeader, Method method)
490     throws IOException, InterruptedException {
491         byte[] bytes = new byte[20];
492         ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20);
493         header.order(ByteOrder.LITTLE_ENDIAN);
494         header.putInt(messageHeader.lastTokenIn);
495         header.putInt(messageHeader.token);
496         int messageNumber = method.requestNumber;
497         header.putInt(messageNumber);
498         header.putInt(0); // inflate size, zero if no compression
499         ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order.
500         header.putInt(data.position());
501         header.clear();
502         synchronized (this) {
503             if (DebugPolicy.REPORT_DEBUG_EVENTS)
504                 Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber);
505             // Receiver expects header and data to form continuous block.
506             sendMessage(header);
507             data.limit(data.position());
508             data.rewind();
509             sendMessage(data);
510             if (DebugPolicy.REPORT_DEBUG_EVENTS)
511                 Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber);
512         }
513     }
514     void sendMethod(MethodQueue mq, Method m, int lastTokenIn)
515     throws ProCoreException, IOException, InterruptedException {
516         m.prepareForSendingRequest();
517         if (DEBUG)
518             Util.showDebug("Setting last token out=" + lastTokenIn);
519         synchronized (this) {
520             MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn);
521             m.setToken(messageHeader.token);
522             mq.add(m);
523             call(messageHeader, m);
524         }
525     }
526     // Called by session manager.
527     void onRead() throws IOException {
528         if (readBuffer.remaining() < 1)
529             throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead."));
530         int n = -1;
531         try {
532             n = channel.socket().read(readBuffer);
533         } catch (IOException e) {
534             throwIOExceptionFromRead(e);
535         }
536         if (n < 0) // Assuming that this is same as broken connection.
537             throwIOExceptionFromRead(new IOException("Failed to read."));
538         else if (n < 1)
539             return; // No data, no error.
540         try {
541             readBuffer = connectionThread.handleInput(readBuffer, n);
542         } catch (Throwable t) {
543             if (t instanceof IOException)
544                 throw (IOException)t;
545             else
546                 throw new IOException("Throwable from handleInput.", t);
547         }
548     }
549     // Called by session manager.
550     void onConnectFailed(ProCoreException exception) {
551         if (null != exception)
552             this.exception = exception;
553         else
554             this.exception = new NotConnectedException("Failed to create connection.");
555         connectionCreated.countDown();
556     }
557     // Called by session manager.
558     Connection onConnectSucceeded(SelectionKey key) {
559         Connection ret = null;
560         try {
561             channel.connect(key);
562             ret = this;
563         } catch (Throwable t) {
564             exception = new NotConnectedException("Failed to finalize connection.", t);
565             // Connection refused.
566         }
567         connectionCreated.countDown();
568         return ret;
569     }
570     // Called by session manager.
571     void onDisconnect() {
572         disconnect();
573     }
574     boolean isConnected() {
575         return channel.isConnected();
576     }
577 //    boolean isStopping() {
578 //    }
579     boolean isExcepted() {
580         return null == exception;
581     }
582     boolean isOk() {
583         return isConnected() && !isExcepted();
584     }
585 }
586 class HeaderData {
587     int lastTokenIn;
588     int token;
589     int messageNumber;
590     int inflateSize;
591     int deflateSize;
592 }
593 class ReceiveData {
594     public int lastTokenIn;
595     public int token;
596     public int messageNumber;
597 //    public DataBuffer inflateBuffer;
598     public ByteBuffer buffer;
599     public int inflateSize;
600     ReceiveData(HeaderData headerData) {
601         lastTokenIn = headerData.lastTokenIn;
602         token = headerData.token;
603         messageNumber = headerData.messageNumber;
604     }
605 }
606 abstract class MethodHandler {
607     abstract void got(Packet packet, AbstractFunction function) throws ProCoreException;
608     DataBuffer getDataBuffer(Packet packet) {
609         packet.bytes.position(20);
610         DataBuffer db = new DataBuffer(packet.bytes, 0);
611         return db;
612     }
613 }
614 class ErrorHandler extends MethodHandler {
615     private final ProCoreException exception;
616     ErrorHandler(String s) {
617         if (null == s || s == "")
618             s = "Ilegal argument. Missing description for remote call exception.";
619         this.exception = new ProCoreException(s);
620     }
621     @Override
622     void got(Packet packet, AbstractFunction function) throws ProCoreException {
623         throw exception;
624     }
625 }
626 class ExceptionHandler extends MethodHandler {
627     @Override
628     void got(Packet packet, AbstractFunction function) throws ProCoreException {
629         packet.bytes.position(20);
630         ExceptionFunction exception = new ExceptionFunction();
631         exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0));
632         throw new ProCoreException(exception.what);
633     }
634 }
635 class DefaultHandler extends MethodHandler {
636     @Override
637     void got(Packet packet, AbstractFunction function) throws ProCoreException {
638         function.deserialize(function.getResponseNumber(), getDataBuffer(packet));
639     }
640 }
641 class GetClusterHandler extends MethodHandler {
642     private final GetClusterNewFunction function;
643     public GetClusterHandler(final GetClusterNewFunction function) {
644         this.function = function;
645     }
646     @Override
647     void got(Packet packet, AbstractFunction function) throws ProCoreException {
648         this.function.inflateSize = packet.header.inflateSize;
649         packet.bytes.position(20);
650         packet.bytes.limit(20 + packet.header.deflateSize);
651         this.function.deflated = packet.bytes.compact();
652         this.function.deflated.position(0);
653         this.function.deflated.limit(packet.header.deflateSize);
654     }
655 }
656 //class AckHandler extends MethodHandler {
657 //    @Override
658 //    void got(Packet packet, AbstractFunction function) throws ProCoreException {
659 //        packet.first.position(20);
660 //        NullFunction ack = new NullFunction();
661 //        ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0));
662 //        Util.showDebug("ack message number=" + ack.messageNumber);
663 //    }
664 //}
665 abstract class AbstractMethod {
666 //  abstract ByteBuffer serialize(ByteOrder byteOrder);
667   protected final int requestNumber;
668   protected final int responseNumber;
669   protected int receivedNumber = 0;
670   boolean hasResponse() {
671       return responseNumber != 0;
672   }
673   private int token = 0;
674   void setToken(int token) {
675       this.token = token;
676   }
677   int getToken() {
678       return token;
679   }
680   AbstractMethod(int requestNumber, int responseNumber) {
681       this.requestNumber = requestNumber;
682       this.responseNumber = responseNumber;
683   }
684   void prepareForSendingRequest() {
685       receivedNumber = 0;
686   }
687 }
688 class Method extends AbstractMethod {
689     private final ExceptionHandler exceptionHandler = new ExceptionHandler();
690     private final AbstractFunction function;
691     private Packet packet = null;
692     private CountDownLatch gotPacket = new CountDownLatch(1);
693     private MethodHandler handler = new DefaultHandler();
694     private final EventHandler eventHandler;
695     Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) {
696         super(function.getRequestNumber(), function.getResponseNumber());
697         this.eventHandler = eventHandler;
698         if (null != handler)
699             this.handler = handler;
700         switch (function.getRequestNumber()) {
701             default:
702                 break;
703             case MessageNumber.AcceptCommitRequest:
704             case MessageNumber.UpdateClusterRequest:
705                 gotPacket.countDown();
706             break;
707         }
708         this.function = function;
709     } // MARK: waitforReply
710     void waitForReply(Connection c) throws ProCoreException, InterruptedException {
711         while (gotPacket.getCount() > 0) {
712             gotPacket.await(30, TimeUnit.SECONDS);
713             if (gotPacket.getCount() > 0) {
714                 System.err.println("waitForReply timeout:");
715                 System.err.println("wait token=" + getToken());
716                 // MARK: c.onWait(getToken());
717             }
718         }
719         if (null != packet)
720             handler.got(packet, function);
721     }
722     EventHandler getEventHandler(EventHandler eventHandler) {
723         if (null != this.eventHandler)
724             return this.eventHandler;
725         else
726             return eventHandler;
727     }
728     ByteBuffer serialize(ByteOrder byteOrder) {
729         DataBuffer db = function.serialize(byteOrder);
730         return db.getByteBuffer();
731     }
732     void gotPacket(Packet packet) {
733         this.packet = packet;
734         gotPacket.countDown();
735     }
736     void gotException(String s) {
737         handler = new ErrorHandler(s);
738         gotPacket.countDown();
739     }
740     void gotException(Packet packet) {
741         this.packet = packet;
742         handler = exceptionHandler;
743         gotPacket.countDown();
744     }
745 //    void gotAck(Packet packet) {
746 //        this.packet = packet;
747 //        handler = new AckHandler();
748 //        gotPacket.countDown();
749 //    }
750 }
751 class DebugPolicy {
752     static final boolean REPORT_DEBUG_EVENTS = false;
753     static final boolean REPORT_SERVER_EVENTS = false;
754     static final boolean REPORT_DEBUG_OPERATIONS = false;
755     static final boolean REPORT_TIME_CONSUMING_REQUESTS = false;
756     static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false;
757
758     private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS;
759     private static final long LONG_EXECUTION_REPORT_PERIOD = 10000;
760     static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD);
761
762     // Time to wait for an event (in milliseconds).
763     // Code logic should assume that this wait never expires.
764     // Should be big enough so that if this time is expired it indicates an error situation.
765     private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour
766     private static final boolean REPORT_WAIT_EVENTS = true;
767     private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true;
768     /**
769      * Calls object.wait.
770      * @param waitMs time to wait in milliseconds.
771      * @param object to wait for (used for calling object.wait)
772      * @param string text used in debug prints.
773      * @see java.lang.Object.wait
774      */
775     static final void wait(long waitMs, Object object, String string) {
776         waitImpl(waitMs, object, string);
777     }
778     /**
779      * Calls object.wait.
780      * @param object to wait for (used for calling object.wait)
781      * @param string text used in debug prints.
782      * @see java.lang.Object.wait
783      */
784     static final void wait(Object object, String string) {
785         waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string);
786     }
787     private static final void report(String string, Object object) {
788         String msg = "DEBUG: " + string
789         + " " + object.getClass().getSimpleName()
790         + " hid=" + System.identityHashCode(object);
791         if (REPORT_WAIT_EVENTS_VERBOSE)
792             new Throwable(msg).printStackTrace();
793         else
794             System.err.println(msg);
795     }
796     private static final void waitImpl(long waitMs, Object object, String string) {
797         if (REPORT_WAIT_EVENTS)
798             report("Wait for event started.", object);
799         try {
800             object.wait(waitMs); // milliseconds
801         } catch (InterruptedException e) {
802             if (REPORT_WAIT_EVENTS)
803                 report("Wait for event interrupted.", object);
804         } finally {
805             if (REPORT_WAIT_EVENTS)
806                 report("Wait for event stopped.", object);
807         }
808     }
809 }