]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.server/src/org/simantics/db/server/internal/ConnectionManager.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.db.server / src / org / simantics / db / server / internal / ConnectionManager.java
1 package org.simantics.db.server.internal;\r
2 \r
3 import java.io.IOException;\r
4 import java.net.InetSocketAddress;\r
5 import java.nio.ByteBuffer;\r
6 import java.nio.ByteOrder;\r
7 import java.nio.channels.CancelledKeyException;\r
8 import java.nio.channels.ClosedChannelException;\r
9 import java.nio.channels.SelectionKey;\r
10 import java.nio.channels.Selector;\r
11 import java.nio.channels.SocketChannel;\r
12 import java.util.Collections;\r
13 import java.util.Iterator;\r
14 import java.util.LinkedList;\r
15 import java.util.List;\r
16 import java.util.concurrent.CountDownLatch;\r
17 import java.util.concurrent.TimeUnit;\r
18 \r
19 import org.simantics.db.common.utils.Logger;\r
20 import org.simantics.db.server.ProCoreException;\r
21 import org.simantics.db.server.protocol.AbstractFunction;\r
22 import org.simantics.db.server.protocol.DataBuffer;\r
23 import org.simantics.db.server.protocol.ExceptionFunction;\r
24 import org.simantics.db.server.protocol.GetClusterNewFunction;\r
25 import org.simantics.db.server.protocol.MessageNumber;\r
26 \r
27 import gnu.trove.iterator.TIntObjectIterator;\r
28 import gnu.trove.map.hash.TIntObjectHashMap;\r
29 \r
30 public class ConnectionManager {\r
31     private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_EVENTS;\r
32     private static final ConnectionManager connectionManager = new ConnectionManager();\r
33     public static ConnectionManager getInstance() {\r
34         return connectionManager.instance();\r
35     }\r
36     private final Manager manager = new Manager();\r
37     private final Thread thread = new Thread(manager, "Connection Manager");\r
38     private boolean stop = false;\r
39     protected List<Connection> connections = Collections.synchronizedList(new LinkedList<Connection>());\r
40     private ConnectionManager() {\r
41         Util.trace("ConnectionManager.new");\r
42     }\r
43     private ConnectionManager instance() {\r
44         if (!thread.isAlive())\r
45             start();\r
46         return this;\r
47     }\r
48     protected class Manager implements Runnable {\r
49         private Selector selector;\r
50         private CountDownLatch selectorOpen = new CountDownLatch(1);\r
51         Manager() {\r
52         }\r
53         public void run() {\r
54             try {\r
55                 selector = Selector.open();\r
56                 selectorOpen.countDown();\r
57                 while (true) {\r
58                     int n = selector.select();\r
59                     try {\r
60                         if (n > 0)\r
61                             processSelected();\r
62                         processConnectors();\r
63                         if (stop) {\r
64                             selector.close();\r
65                             return;\r
66                         }\r
67                     } catch (CancelledKeyException e) {\r
68                         Logger.defaultLogError("Cancelled key in select loop.", e);\r
69                     }\r
70                 }\r
71             } catch (Throwable x) {\r
72                 Logger.defaultLogError("Select loop failed.", x);\r
73             }\r
74         }\r
75         void processConnectors() throws ClosedChannelException {\r
76             while (connections.size() > 0) {\r
77                 Connection c = connections.remove(0);\r
78                 c.register(selector);\r
79             }\r
80         }\r
81         void processSelected() {\r
82             Iterator<SelectionKey> i = selector.selectedKeys().iterator();\r
83             while (i.hasNext()) {\r
84                 SelectionKey sk = (SelectionKey)i.next();\r
85                 i.remove();\r
86                 try {\r
87                     if (!sk.isValid())\r
88                         processDisconnect(sk);\r
89                     else if (sk.isConnectable())\r
90                         processConnect(sk);\r
91                     else if (sk.isReadable())\r
92                         processRead(sk);\r
93                 } catch (CancelledKeyException e) {\r
94                     processDisconnect(sk);\r
95                 }\r
96             }\r
97         }\r
98         private void processDisconnect(SelectionKey key) {\r
99             Connection c = (Connection)key.attachment();\r
100             if (null == c)\r
101                 return;\r
102             c.onDisconnect();\r
103             key.attach(null); // Just to make sure that reference to connection is released.\r
104             key.cancel();\r
105         }\r
106         private void processConnect(SelectionKey key) {\r
107                 Connection connection = (Connection)key.attachment();\r
108                 SocketChannel sc = (SocketChannel)key.channel();\r
109                 try {\r
110                     if (sc.finishConnect()) {\r
111                         int ops = key.interestOps() & ~SelectionKey.OP_CONNECT;\r
112                         ops |= SelectionKey.OP_READ;\r
113                         key.interestOps(ops);\r
114                         Connection c = connection.onConnectSucceeded(key);\r
115                         if (c == null) { // Connector refused to accept connection.\r
116                             key.attach(null); // Just to make sure that reference to connector is released.\r
117                             key.cancel();\r
118                             sc.close();\r
119                         } else\r
120                             key.attach(c);\r
121                     }\r
122                 } catch (Throwable x) {\r
123                     try {\r
124                         if (key.isValid())\r
125                             key.attach(null); // Just to make sure that reference to connector is released.\r
126                             key.cancel();\r
127                         try {\r
128                             sc.close();\r
129                         } catch (IOException e) {\r
130                             if (DEBUG) // This is not an error but because logger does not contain debug/info routine with exception this is used.\r
131                                 Logger.defaultLogError("Failed to close connection.", e);\r
132                         }\r
133                     } finally {\r
134                         connection.onConnectFailed(new NotConnectedException("Failed to establish connection.", x));\r
135                     }\r
136                 }\r
137             }\r
138         }\r
139         private void processRead(SelectionKey key) {\r
140             Connection c = (Connection)key.attachment();\r
141             SocketChannel sc = (SocketChannel)key.channel();\r
142             try {\r
143                 int ops = key.interestOps();\r
144                 ops |= SelectionKey.OP_READ;\r
145                 key.interestOps(ops);\r
146                 c.onRead();\r
147             } catch (IOException x) {\r
148                 if (key.isValid()) {\r
149                     key.attach(null); // Just to make sure that reference to connection is released.\r
150                     key.cancel();\r
151                 }\r
152                 if (sc.isOpen())\r
153                     try {\r
154                         sc.close();\r
155                     } catch (IOException e) {\r
156                     }\r
157             }\r
158         }\r
159         void connect(Connection connection) throws ProCoreException {\r
160             try {\r
161                 if (connections.contains(connection))\r
162                     throw new ProCoreException("Connection already registered. connection=" + connection);\r
163                 connection.prepare4Connection(SocketChannel.open());\r
164                 connections.add(connection);\r
165                 manager.selector.wakeup();\r
166                 connection.wait4Connection();\r
167             } catch(IOException e) {\r
168                 throw new ConnectionException("Failed to connect to " + connection + ".", e);\r
169             } catch (InterruptedException e) {\r
170                 if (!connection.isConnected())\r
171                     throw new NotConnectedException("Connection interrupted to "  + connection + ".");\r
172             }\r
173         }\r
174     private void start() {\r
175         thread.start();\r
176         boolean once = true;\r
177         while (manager.selectorOpen.getCount() > 0) {\r
178             try {\r
179                 manager.selectorOpen.await();\r
180             } catch (InterruptedException e) {\r
181                 if (once) {\r
182                     once = false;\r
183                     Util.logError("Wait for selector open interrupted. Continuing wait.", e);\r
184                 }\r
185             }\r
186         }\r
187     }\r
188     public void stop() {\r
189         stop = true;\r
190         manager.selector.wakeup();\r
191     }\r
192 }\r
193 //class MethodQueue {\r
194 //    private LinkedList<Method> methods = new LinkedList<Method>();\r
195 //    MethodQueue() {\r
196 //    }\r
197 //    public synchronized int size() {\r
198 //        return methods.size();\r
199 //    }\r
200 //    public synchronized void addFirst(Method method) {\r
201 //        methods.addFirst(method);\r
202 //    }\r
203 //    public synchronized void addLast(Method method) {\r
204 //        methods.addLast(method);\r
205 //    }\r
206 //    public synchronized Method removeFirst() {\r
207 //        try {\r
208 //            Method method = methods.removeFirst();\r
209 //            return method;\r
210 //        } catch (NoSuchElementException e) {\r
211 //            return null;\r
212 //        }\r
213 //    }\r
214 //    public synchronized void close() {\r
215 //        while (size() > 0) {\r
216 //            Method method = removeFirst();\r
217 //            method.setExceptionText("Closing.");\r
218 //            method.gotResponse();\r
219 //        }\r
220 //    }\r
221 //\r
222 //}\r
223 //class RequestQueue {\r
224 //    private LinkedList<AbstractFunction> functions = new LinkedList<AbstractFunction>();\r
225 //\r
226 //    RequestQueue() {\r
227 //    }\r
228 //    public synchronized int size() {\r
229 //        return functions.size();\r
230 //    }\r
231 //    public synchronized void addFirst(AbstractFunction e) {\r
232 //        functions.addFirst(e);\r
233 //    }\r
234 //    public synchronized void addLast(AbstractFunction e) {\r
235 //        functions.addLast(e);\r
236 //    }\r
237 //    public synchronized AbstractFunction removeFirst() {\r
238 //        try {\r
239 //            AbstractFunction ret = functions.removeFirst();\r
240 //            return ret;\r
241 //        } catch (NoSuchElementException e) {\r
242 //            return null;\r
243 //        }\r
244 //    }\r
245 //}\r
246 class MethodQueue {\r
247     private TIntObjectHashMap<Method> methodMap = new TIntObjectHashMap<Method>();\r
248     MethodQueue() {\r
249     }\r
250     public synchronized int size() {\r
251         return methodMap.size();\r
252     }\r
253     public synchronized boolean isEmpty() {\r
254         return 0 == size();\r
255     }\r
256     public synchronized void add(Method method) throws ProCoreException {\r
257         int token = method.getToken();\r
258         if (0 == token)\r
259             throw new ProCoreException("Illegal token number.");\r
260         Method oldMethod = methodMap.put(token, method);\r
261         if (null != oldMethod)\r
262             throw new ProCoreException("Duplicate token number.");\r
263     }\r
264     public synchronized Method remove(int token) {\r
265         Method method = methodMap.remove(token);\r
266         return method;\r
267     }\r
268     synchronized void close() {\r
269         try {\r
270             TIntObjectIterator<Method> iterator = methodMap.iterator();\r
271             while (iterator.hasNext()) {\r
272               iterator.advance();\r
273               Method method = iterator.value();\r
274               if (null == method)\r
275                   Util.logError("Null function in method queue.");\r
276               method.gotException("Method queue closing.");\r
277           }\r
278         } finally {\r
279             methodMap.clear();\r
280         }\r
281     }\r
282 //    public synchronized Method removeAny() throws ProCoreException {\r
283 //        TIntObjectIterator<Method> iterator = methodMap.iterator();\r
284 //        if (!iterator.hasNext())\r
285 //            return null;\r
286 //        iterator.advance();\r
287 //        Method method = iterator.value();\r
288 //        if (null == method)\r
289 //            throw new ProCoreException("Null function in response queue.");\r
290 //        methodMap.remove(method.getToken());\r
291 //        return method;\r
292 //    }\r
293 }\r
294 class Channel {\r
295     private SelectionKey selectionKey = null;\r
296     private SocketChannel socketChannel = null;\r
297     Channel() {\r
298     }\r
299     void register(Selector selector, Connection connection)\r
300     throws ClosedChannelException {\r
301         if (null == socketChannel)\r
302             throw new ClosedChannelException();\r
303         socketChannel.register(selector, SelectionKey.OP_CONNECT, connection);\r
304     }\r
305     void prepare4Connection(SocketChannel socketChannel, InetSocketAddress address)\r
306     throws ProCoreException, IOException {\r
307         if (null == socketChannel)\r
308             throw new NotConnectedException("Illegal argument. No socket channel to connect to.");\r
309         if (null == address)\r
310             throw new NotConnectedException("Illegal argument. No address to connect to.");\r
311         if (null != this.socketChannel)\r
312             throw new NotConnectedException("Illegal state. Socket channel already created. this=" + this);\r
313         socketChannel.configureBlocking(false);\r
314         boolean connected = socketChannel.connect(address);\r
315         if (connected)\r
316             if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
317                 Util.showDebug("Connected to " + socketChannel + ".");\r
318         this.socketChannel = socketChannel; // Waiting for connect.\r
319     }\r
320 //    private static final int IPTOS_LOWDELAY = 0x10;\r
321     synchronized void connect(SelectionKey key) {\r
322         this.selectionKey = key;\r
323         this.socketChannel = (SocketChannel)key.channel();\r
324         if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
325             Util.showDebug("Connected to " + socketChannel + ".");\r
326 //        try {\r
327 //            this.socketChannel.socket().setKeepAlive(true);\r
328 //            this.socketChannel.socket().setSoLinger(true, 1);\r
329 //        } catch (SocketException e) {\r
330 //            e.printStackTrace();\r
331 //        }\r
332 //        try {\r
333 //            this.socketChannel.socket().setTcpNoDelay(true);\r
334         // Crashes Kalle!\r
335         // this.socketChannel.socket().setTrafficClass(IPTOS_LOWDELAY);\r
336 //        } catch (SocketException e) {\r
337 //            e.printStackTrace();\r
338 //        }\r
339     }\r
340     synchronized void disconnect() {\r
341         if (selectionKey == null)\r
342             return;\r
343         try {\r
344             try {\r
345                 if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
346                     Util.showDebug("Disconnected from " + socketChannel);\r
347                 selectionKey.interestOps(0);\r
348                 selectionKey.cancel();\r
349                 selectionKey.selector().wakeup();\r
350             } catch (CancelledKeyException e) {\r
351                 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
352                     Logger.defaultLogError("Failed to disconnect cleanly.", e);\r
353             } catch (IllegalArgumentException e) {\r
354                 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
355                     Logger.defaultLogError("Failed to disconnect cleanly.", e);\r
356             }\r
357             try { // Note that this must be after the selectionKey operations above.\r
358                 socketChannel.socket().close();\r
359             } catch (IOException e) {\r
360                 if (DebugPolicy.REPORT_DEBUG_OPERATIONS)\r
361                     Logger.defaultLogError("Failed to close socket cleanly.", e);\r
362             }\r
363         } finally {\r
364             selectionKey = null;\r
365             socketChannel = null;\r
366         }\r
367     }\r
368     synchronized boolean isConnected() {\r
369         boolean ret = selectionKey != null && socketChannel.isConnected();\r
370         return ret;\r
371     }\r
372     synchronized SocketChannel socket() throws IOException {\r
373         if (null == socketChannel)\r
374             throw new IOException("Socket channel is null.");\r
375         return socketChannel;\r
376     }\r
377     synchronized void listenReadEvents()\r
378     throws IOException {\r
379         if (null == selectionKey)\r
380             throw new IOException("Trying to listen connection which is not connected.");\r
381         int ops = selectionKey.interestOps() | SelectionKey.OP_READ;\r
382         selectionKey.interestOps(ops);\r
383         selectionKey.selector().wakeup();\r
384     }\r
385 }\r
386 class Connection {\r
387     private static final boolean DEBUG = DebugPolicy.REPORT_DEBUG_OPERATIONS;\r
388     private static int lastSentTokenNumber = 0;\r
389     private final ConnectionThread connectionThread;\r
390     private InetSocketAddress address;\r
391     private Channel channel;\r
392     private CountDownLatch connectionCreated;\r
393     private ProCoreException exception;\r
394     private ByteBuffer readBuffer;\r
395     Connection(ConnectionThread connectionThread) {\r
396         this.connectionThread = connectionThread;\r
397         init4Connection(InetSocketAddress.createUnresolved("", 0));\r
398     }\r
399     @Override\r
400     public String toString() {\r
401         return "Connection address=" + address + " token=" + lastSentTokenNumber;\r
402     }\r
403     boolean addressNotInitialized() {\r
404         return address.equals(InetSocketAddress.createUnresolved("", 0));\r
405     }\r
406     void register(Selector selector)\r
407     throws ClosedChannelException {\r
408         channel.register(selector, this);\r
409     }\r
410     InetSocketAddress getAddress() {\r
411         return address;\r
412     }\r
413     boolean equalsAddress(InetSocketAddress address) {\r
414         return address.equals(address);\r
415     }\r
416     void init4Connection(InetSocketAddress address) {\r
417         this.address = address;\r
418         if (null != channel && channel.isConnected())\r
419             channel.disconnect();\r
420         channel = new Channel();\r
421         connectionCreated = new CountDownLatch(1);\r
422         exception = null;\r
423         readBuffer = connectionThread.getInputBuffer();\r
424     }\r
425     void prepare4Connection(SocketChannel socketChannel)\r
426     throws ProCoreException, IOException {\r
427         if (isConnected())\r
428             throw new ProCoreException("Illegal state exception. Already connected.");\r
429         if (address.isUnresolved())\r
430             address = new InetSocketAddress(address.getHostName(), address.getPort());\r
431         if (address.getPort() == 0)\r
432             throw new ProCoreException("Port 0 not supported as connection address.");\r
433         channel.prepare4Connection(socketChannel, address);\r
434     }\r
435     void wait4Connection() throws InterruptedException {\r
436         connectionCreated.await();\r
437     }\r
438     synchronized void disconnect() {\r
439         if (!isConnected())\r
440             return;\r
441         channel.disconnect();\r
442     }\r
443     private void throwIOExceptionFromRead(IOException e)\r
444     throws IOException {\r
445         disconnect(); // This ensures that isConnected will be false after this.\r
446         if (null != e)\r
447             throw e;\r
448         throw new IOException("Failed to read.");\r
449     }\r
450     // Called by synchronized handleInput.\r
451     private void sendMessage(ByteBuffer byteBuffer)\r
452     throws IOException, InterruptedException {\r
453         int left = byteBuffer.remaining();\r
454         while (left > 0) {\r
455             ByteBuffer slice = byteBuffer.slice();\r
456             int size = Math.min(left, 1000000);\r
457             slice.limit(size);\r
458             sendBuffer(slice);\r
459             int position = byteBuffer.position() + size;\r
460             byteBuffer.position(position);\r
461             left -= size;\r
462         }\r
463     }\r
464     private void sendBuffer(ByteBuffer byteBuffer)\r
465     throws IOException, InterruptedException {\r
466         int left = byteBuffer.remaining();\r
467         while (left > 0) {\r
468             int n = channel.socket().write(byteBuffer);\r
469             if (n > 0) {\r
470                 left -= n;\r
471                 continue;\r
472             }\r
473             // System.err.println("############### YIELD ##############################");\r
474             Thread.yield();\r
475             if (left > 0)\r
476                 Logger.defaultLogTrace("Could not send the whole byte buffer, left count = " + left + ", buffer remaining = " + byteBuffer.remaining()\r
477                         + ", write return =" + n);\r
478         }\r
479     }\r
480     static public class MessageHeader {\r
481         public int token;\r
482         public int lastTokenIn;\r
483         public MessageHeader(int token, int lastTokenIn) {\r
484             this.token = token;\r
485             this.lastTokenIn = lastTokenIn;\r
486         }\r
487     }\r
488     // Called by synchronized call.\r
489     protected void call(MessageHeader messageHeader, Method method)\r
490     throws IOException, InterruptedException {\r
491         byte[] bytes = new byte[20];\r
492         ByteBuffer header = ByteBuffer.wrap(bytes, 0, 20);\r
493         header.order(ByteOrder.LITTLE_ENDIAN);\r
494         header.putInt(messageHeader.lastTokenIn);\r
495         header.putInt(messageHeader.token);\r
496         int messageNumber = method.requestNumber;\r
497         header.putInt(messageNumber);\r
498         header.putInt(0); // inflate size, zero if no compression\r
499         ByteBuffer data = method.serialize(ByteOrder.LITTLE_ENDIAN); // TODO: remove byte order.\r
500         header.putInt(data.position());\r
501         header.clear();\r
502         synchronized (this) {\r
503             if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
504                 Util.showDebug("Sending message, token=" + messageHeader.token + "," + " message=" + messageNumber);\r
505             // Receiver expects header and data to form continuous block.\r
506             sendMessage(header);\r
507             data.limit(data.position());\r
508             data.rewind();\r
509             sendMessage(data);\r
510             if (DebugPolicy.REPORT_DEBUG_EVENTS)\r
511                 Util.showDebug("Sent message, token=" + messageHeader.token + ", message=" + messageNumber);\r
512         }\r
513     }\r
514     void sendMethod(MethodQueue mq, Method m, int lastTokenIn)\r
515     throws ProCoreException, IOException, InterruptedException {\r
516         m.prepareForSendingRequest();\r
517         if (DEBUG)\r
518             Util.showDebug("Setting last token out=" + lastTokenIn);\r
519         synchronized (this) {\r
520             MessageHeader messageHeader = new MessageHeader(++lastSentTokenNumber, lastTokenIn);\r
521             m.setToken(messageHeader.token);\r
522             mq.add(m);\r
523             call(messageHeader, m);\r
524         }\r
525     }\r
526     // Called by session manager.\r
527     void onRead() throws IOException {\r
528         if (readBuffer.remaining() < 1)\r
529             throwIOExceptionFromRead(new IOException("Internal error. Assertion failed. Read buffer full in Connection.onRead."));\r
530         int n = -1;\r
531         try {\r
532             n = channel.socket().read(readBuffer);\r
533         } catch (IOException e) {\r
534             throwIOExceptionFromRead(e);\r
535         }\r
536         if (n < 0) // Assuming that this is same as broken connection.\r
537             throwIOExceptionFromRead(new IOException("Failed to read."));\r
538         else if (n < 1)\r
539             return; // No data, no error.\r
540         try {\r
541             readBuffer = connectionThread.handleInput(readBuffer, n);\r
542         } catch (Throwable t) {\r
543             if (t instanceof IOException)\r
544                 throw (IOException)t;\r
545             else\r
546                 throw new IOException("Throwable from handleInput.", t);\r
547         }\r
548     }\r
549     // Called by session manager.\r
550     void onConnectFailed(ProCoreException exception) {\r
551         if (null != exception)\r
552             this.exception = exception;\r
553         else\r
554             this.exception = new NotConnectedException("Failed to create connection.");\r
555         connectionCreated.countDown();\r
556     }\r
557     // Called by session manager.\r
558     Connection onConnectSucceeded(SelectionKey key) {\r
559         Connection ret = null;\r
560         try {\r
561             channel.connect(key);\r
562             ret = this;\r
563         } catch (Throwable t) {\r
564             exception = new NotConnectedException("Failed to finalize connection.", t);\r
565             // Connection refused.\r
566         }\r
567         connectionCreated.countDown();\r
568         return ret;\r
569     }\r
570     // Called by session manager.\r
571     void onDisconnect() {\r
572         disconnect();\r
573     }\r
574     boolean isConnected() {\r
575         return channel.isConnected();\r
576     }\r
577 //    boolean isStopping() {\r
578 //    }\r
579     boolean isExcepted() {\r
580         return null == exception;\r
581     }\r
582     boolean isOk() {\r
583         return isConnected() && !isExcepted();\r
584     }\r
585 }\r
586 class HeaderData {\r
587     int lastTokenIn;\r
588     int token;\r
589     int messageNumber;\r
590     int inflateSize;\r
591     int deflateSize;\r
592 }\r
593 class ReceiveData {\r
594     public int lastTokenIn;\r
595     public int token;\r
596     public int messageNumber;\r
597 //    public DataBuffer inflateBuffer;\r
598     public ByteBuffer buffer;\r
599     public int inflateSize;\r
600     ReceiveData(HeaderData headerData) {\r
601         lastTokenIn = headerData.lastTokenIn;\r
602         token = headerData.token;\r
603         messageNumber = headerData.messageNumber;\r
604     }\r
605 }\r
606 abstract class MethodHandler {\r
607     abstract void got(Packet packet, AbstractFunction function) throws ProCoreException;\r
608     DataBuffer getDataBuffer(Packet packet) {\r
609         packet.bytes.position(20);\r
610         DataBuffer db = new DataBuffer(packet.bytes, 0);\r
611         return db;\r
612     }\r
613 }\r
614 class ErrorHandler extends MethodHandler {\r
615     private final ProCoreException exception;\r
616     ErrorHandler(String s) {\r
617         if (null == s || s == "")\r
618             s = "Ilegal argument. Missing description for remote call exception.";\r
619         this.exception = new ProCoreException(s);\r
620     }\r
621     @Override\r
622     void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
623         throw exception;\r
624     }\r
625 }\r
626 class ExceptionHandler extends MethodHandler {\r
627     @Override\r
628     void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
629         packet.bytes.position(20);\r
630         ExceptionFunction exception = new ExceptionFunction();\r
631         exception.deserialize(MessageNumber.ExceptionResponse, new DataBuffer(packet.bytes, 0));\r
632         throw new ProCoreException(exception.what);\r
633     }\r
634 }\r
635 class DefaultHandler extends MethodHandler {\r
636     @Override\r
637     void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
638         function.deserialize(function.getResponseNumber(), getDataBuffer(packet));\r
639     }\r
640 }\r
641 class GetClusterHandler extends MethodHandler {\r
642     private final GetClusterNewFunction function;\r
643     public GetClusterHandler(final GetClusterNewFunction function) {\r
644         this.function = function;\r
645     }\r
646     @Override\r
647     void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
648         this.function.inflateSize = packet.header.inflateSize;\r
649         packet.bytes.position(20);\r
650         packet.bytes.limit(20 + packet.header.deflateSize);\r
651         this.function.deflated = packet.bytes.compact();\r
652         this.function.deflated.position(0);\r
653         this.function.deflated.limit(packet.header.deflateSize);\r
654     }\r
655 }\r
656 //class AckHandler extends MethodHandler {\r
657 //    @Override\r
658 //    void got(Packet packet, AbstractFunction function) throws ProCoreException {\r
659 //        packet.first.position(20);\r
660 //        NullFunction ack = new NullFunction();\r
661 //        ack.deserialize(MessageNumber.NullMessage, new DataBuffer(packet.first, 0));\r
662 //        Util.showDebug("ack message number=" + ack.messageNumber);\r
663 //    }\r
664 //}\r
665 abstract class AbstractMethod {\r
666 //  abstract ByteBuffer serialize(ByteOrder byteOrder);\r
667   protected final int requestNumber;\r
668   protected final int responseNumber;\r
669   protected int receivedNumber = 0;\r
670   boolean hasResponse() {\r
671       return responseNumber != 0;\r
672   }\r
673   private int token = 0;\r
674   void setToken(int token) {\r
675       this.token = token;\r
676   }\r
677   int getToken() {\r
678       return token;\r
679   }\r
680   AbstractMethod(int requestNumber, int responseNumber) {\r
681       this.requestNumber = requestNumber;\r
682       this.responseNumber = responseNumber;\r
683   }\r
684   void prepareForSendingRequest() {\r
685       receivedNumber = 0;\r
686   }\r
687 }\r
688 class Method extends AbstractMethod {\r
689     private final ExceptionHandler exceptionHandler = new ExceptionHandler();\r
690     private final AbstractFunction function;\r
691     private Packet packet = null;\r
692     private CountDownLatch gotPacket = new CountDownLatch(1);\r
693     private MethodHandler handler = new DefaultHandler();\r
694     private final EventHandler eventHandler;\r
695     Method(AbstractFunction function, EventHandler eventHandler, MethodHandler handler) {\r
696         super(function.getRequestNumber(), function.getResponseNumber());\r
697         this.eventHandler = eventHandler;\r
698         if (null != handler)\r
699             this.handler = handler;\r
700         switch (function.getRequestNumber()) {\r
701             default:\r
702                 break;\r
703             case MessageNumber.AcceptCommitRequest:\r
704             case MessageNumber.UpdateClusterRequest:\r
705                 gotPacket.countDown();\r
706             break;\r
707         }\r
708         this.function = function;\r
709     } // MARK: waitforReply\r
710     void waitForReply(Connection c) throws ProCoreException, InterruptedException {\r
711         while (gotPacket.getCount() > 0) {\r
712             gotPacket.await(30, TimeUnit.SECONDS);\r
713             if (gotPacket.getCount() > 0) {\r
714                 System.err.println("waitForReply timeout:");\r
715                 System.err.println("wait token=" + getToken());\r
716                 // MARK: c.onWait(getToken());\r
717             }\r
718         }\r
719         if (null != packet)\r
720             handler.got(packet, function);\r
721     }\r
722     EventHandler getEventHandler(EventHandler eventHandler) {\r
723         if (null != this.eventHandler)\r
724             return this.eventHandler;\r
725         else\r
726             return eventHandler;\r
727     }\r
728     ByteBuffer serialize(ByteOrder byteOrder) {\r
729         DataBuffer db = function.serialize(byteOrder);\r
730         return db.getByteBuffer();\r
731     }\r
732     void gotPacket(Packet packet) {\r
733         this.packet = packet;\r
734         gotPacket.countDown();\r
735     }\r
736     void gotException(String s) {\r
737         handler = new ErrorHandler(s);\r
738         gotPacket.countDown();\r
739     }\r
740     void gotException(Packet packet) {\r
741         this.packet = packet;\r
742         handler = exceptionHandler;\r
743         gotPacket.countDown();\r
744     }\r
745 //    void gotAck(Packet packet) {\r
746 //        this.packet = packet;\r
747 //        handler = new AckHandler();\r
748 //        gotPacket.countDown();\r
749 //    }\r
750 }\r
751 class DebugPolicy {\r
752     static final boolean REPORT_DEBUG_EVENTS = false;\r
753     static final boolean REPORT_SERVER_EVENTS = false;\r
754     static final boolean REPORT_DEBUG_OPERATIONS = false;\r
755     static final boolean REPORT_TIME_CONSUMING_REQUESTS = false;\r
756     static final boolean REPORT_TIME_CONSUMING_FUNCTIONS = false;\r
757 \r
758     private static final TimeUnit LONG_EXECUTION_REPORT_PERIOD_UNIT = TimeUnit.MILLISECONDS;\r
759     private static final long LONG_EXECUTION_REPORT_PERIOD = 10000;\r
760     static final long REPORT_MS = LONG_EXECUTION_REPORT_PERIOD_UNIT.toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD);\r
761 \r
762     // Time to wait for an event (in milliseconds).\r
763     // Code logic should assume that this wait never expires.\r
764     // Should be big enough so that if this time is expired it indicates an error situation.\r
765     private static final long NON_EXPIRED_EVENT_WAIT_TIME_MS = 60000000; // one hour\r
766     private static final boolean REPORT_WAIT_EVENTS = true;\r
767     private static final boolean REPORT_WAIT_EVENTS_VERBOSE = true;\r
768     /**\r
769      * Calls object.wait.\r
770      * @param waitMs time to wait in milliseconds.\r
771      * @param object to wait for (used for calling object.wait)\r
772      * @param string text used in debug prints.\r
773      * @see java.lang.Object.wait\r
774      */\r
775     static final void wait(long waitMs, Object object, String string) {\r
776         waitImpl(waitMs, object, string);\r
777     }\r
778     /**\r
779      * Calls object.wait.\r
780      * @param object to wait for (used for calling object.wait)\r
781      * @param string text used in debug prints.\r
782      * @see java.lang.Object.wait\r
783      */\r
784     static final void wait(Object object, String string) {\r
785         waitImpl(NON_EXPIRED_EVENT_WAIT_TIME_MS, object, string);\r
786     }\r
787     private static final void report(String string, Object object) {\r
788         String msg = "DEBUG: " + string\r
789         + " " + object.getClass().getSimpleName()\r
790         + " hid=" + System.identityHashCode(object);\r
791         if (REPORT_WAIT_EVENTS_VERBOSE)\r
792             new Throwable(msg).printStackTrace();\r
793         else\r
794             System.err.println(msg);\r
795     }\r
796     private static final void waitImpl(long waitMs, Object object, String string) {\r
797         if (REPORT_WAIT_EVENTS)\r
798             report("Wait for event started.", object);\r
799         try {\r
800             object.wait(waitMs); // milliseconds\r
801         } catch (InterruptedException e) {\r
802             if (REPORT_WAIT_EVENTS)\r
803                 report("Wait for event interrupted.", object);\r
804         } finally {\r
805             if (REPORT_WAIT_EVENTS)\r
806                 report("Wait for event stopped.", object);\r
807         }\r
808     }\r
809 }\r