]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.databoard/src/org/simantics/databoard/method/TcpConnection.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.databoard / src / org / simantics / databoard / method / TcpConnection.java
1 /*******************************************************************************\r
2  *  Copyright (c) 2010 Association for Decentralized Information Management in\r
3  *  Industry THTH ry.\r
4  *  All rights reserved. This program and the accompanying materials\r
5  *  are made available under the terms of the Eclipse Public License v1.0\r
6  *  which accompanies this distribution, and is available at\r
7  *  http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  *  Contributors:\r
10  *      VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package org.simantics.databoard.method;
13
14 import gnu.trove.map.hash.TObjectIntHashMap;\r
15 \r
16 import java.io.EOFException;\r
17 import java.io.IOException;\r
18 import java.net.Socket;\r
19 import java.net.SocketException;\r
20 import java.nio.charset.Charset;\r
21 import java.util.ArrayList;\r
22 import java.util.HashMap;\r
23 import java.util.List;\r
24 import java.util.Map;\r
25 import java.util.concurrent.ConcurrentHashMap;\r
26 import java.util.concurrent.CopyOnWriteArrayList;\r
27 import java.util.concurrent.ExecutorService;\r
28 import java.util.concurrent.Semaphore;\r
29 import java.util.concurrent.SynchronousQueue;\r
30 import java.util.concurrent.ThreadPoolExecutor;\r
31 import java.util.concurrent.TimeUnit;\r
32 import java.util.concurrent.atomic.AtomicInteger;\r
33 \r
34 import org.simantics.databoard.Bindings;\r
35 import org.simantics.databoard.annotations.Union;\r
36 import org.simantics.databoard.binding.Binding;\r
37 import org.simantics.databoard.binding.RecordBinding;\r
38 import org.simantics.databoard.binding.UnionBinding;\r
39 import org.simantics.databoard.serialization.Serializer;\r
40 import org.simantics.databoard.serialization.SerializerConstructionException;\r
41 import org.simantics.databoard.util.binary.BinaryReadable;\r
42 import org.simantics.databoard.util.binary.BinaryWriteable;\r
43 import org.simantics.databoard.util.binary.InputStreamReadable;\r
44 import org.simantics.databoard.util.binary.OutputStreamWriteable;\r
45
46 /**
47  * Connection is a class that handles request-response communication over a
48  * socket. 
49  * <p>
50  * Requests have asynchronous result. The result can be acquired using one of 
51  * the three methods:
52  *  1) Blocking read AsyncResult.waitForResponse()
53  *  2) Poll  AsyncResult.getResponse()
54  *  3) Listen AsyncResult.setListener()
55  * <p>
56  * The socket must be established before Connection is instantiated.
57  * Closing connection does not close its Socket.
58  * If the socket is closed before connection there an error is thrown.
59  * The error is available by placing listener.
60  * The proper order to close a connection is to close Connection first
61  * and then Socket. 
62  *  
63  * @author Toni Kalajainen <toni.kalajainen@vtt.fi>
64  */
65 public class TcpConnection implements MethodInterface {
66 \r
67         public static final ExecutorService SHARED_EXECUTOR_SERVICE = \r
68                 new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());\r
69         
70         static final Serializer MESSAGE_SERIALIZER = Bindings.getSerializerUnchecked( Bindings.getBindingUnchecked(Message.class) );
71         static Charset UTF8 = Charset.forName("UTF8");
72         
73         Handshake local, remote;
74         
75         Interface remoteType;
76         MethodTypeDefinition[] localMethods, remoteMethods;
77         HashMap<MethodTypeDefinition, Integer> localMethodsMap, remoteMethodsMap;
78         
79         Socket socket;
80                 
81         // if false, there is an error in the socket or the connection has been shutdown 
82         boolean active = true;
83
84         // Objects used for handling local services 
85         MethodInterface methodInterface;
86                 
87         // Objects used for reading data
88         ConcurrentHashMap<Integer, PendingRequest> requests = new ConcurrentHashMap<Integer, PendingRequest>();  
89         List<Object> inIdentities = new ArrayList<Object>();
90         BinaryReadable in;
91         int maxRecvSize;
92         
93         // Object used for writing data
94         public ExecutorService writeExecutor = SHARED_EXECUTOR_SERVICE;
95         TObjectIntHashMap<Object> outIdentities = new TObjectIntHashMap<Object>();
96         BinaryWriteable out;
97         AtomicInteger requestCounter = new AtomicInteger(0);
98         int maxSendSize;
99         
100         // Cached method descriptions
101         Map<String, MethodType> methodTypes = new ConcurrentHashMap<String, MethodType>();
102         
103         /**
104          * Handshake a socket
105          * 
106          * @param socket 
107          * @param localData local data
108          * @return the remote data
109          * @throws IOException
110          * @throws RuntimeException unexpected error (BindingException or EncodingException) 
111          */
112         public static Handshake handshake(final Socket socket, final Handshake localData)
113         throws IOException
114         {
115                 final BinaryReadable bin = new InputStreamReadable( socket.getInputStream(), Long.MAX_VALUE );
116                 final BinaryWriteable bout = new OutputStreamWriteable( socket.getOutputStream() );
117                 ExecutorService writeExecutor = SHARED_EXECUTOR_SERVICE;
118                 
119                 // do hand-shake                
120                 final Exception[] writeError = new Exception[1];
121                 final Semaphore s = new Semaphore(0);           
122                 writeExecutor.execute(new Runnable() {
123                         @Override
124                         public void run() {
125                                 try {
126                                         TObjectIntHashMap<Object> outIdentities = new TObjectIntHashMap<Object>();                                      
127                                         Handshake.SERIALIZER.serialize(bout, outIdentities, localData);
128                                         bout.flush();
129                                         outIdentities.clear();
130                                 } catch (IOException e) {
131                                         writeError[0] = e;
132                                 } finally {
133                                         s.release(1);
134                                 }
135                         }});
136                 
137                 // Read remote peer's handshake 
138                 List<Object> inIdentities = new ArrayList<Object>();
139                 Handshake result = (Handshake) Handshake.SERIALIZER.deserialize(bin, inIdentities);
140                 inIdentities.clear();
141                 
142                 // Check that write was ok
143                 try {
144                         s.acquire(1);
145                         Exception e = writeError[0];
146                         if (e!=null && e instanceof IOException) 
147                                 throw (IOException) e;
148                         if (e!=null)
149                                 throw new RuntimeException(e);                  
150                 } catch (InterruptedException e) {
151                         throw new RuntimeException(e);
152                 } finally {
153 //                      writeExecutor.shutdown();
154                 }
155                 
156                 return result;
157         }
158         
159         /**
160          * Create a connection to a hand-shaken socket 
161          * 
162          * @param socket
163          * @param methodInterface local method handler 
164          * @param localData
165          * @param remoteData
166          * @throws IOException
167          */
168         public TcpConnection(Socket socket, MethodInterface methodInterface, Handshake localData, Handshake remoteData) 
169         throws IOException {
170                 if (socket==null || localData==null || remoteData==null) 
171                         throw new IllegalArgumentException("null arg");
172                 
173                 this.methodInterface = methodInterface;
174                 this.socket = socket;
175                 this.local = localData;
176                 this.remote = remoteData;
177                 this.maxSendSize = Math.min(localData.sendMsgLimit, remoteData.recvMsgLimit);
178                 this.maxRecvSize = Math.min(localData.recvMsgLimit, remoteData.sendMsgLimit);
179                 
180                 this.localMethods = local.methods;
181                 this.remoteMethods = remote.methods;
182                 this.remoteType = new Interface(this.remoteMethods);
183                 this.localMethodsMap = new HashMap<MethodTypeDefinition, Integer>();
184                 this.remoteMethodsMap = new HashMap<MethodTypeDefinition, Integer>();
185                 for (int i=0; i<localMethods.length; i++)
186                         localMethodsMap.put(localMethods[i], i);
187                 for (int i=0; i<remoteMethods.length; i++)
188                         remoteMethodsMap.put(remoteMethods[i], i);
189 //              remoteMethodsMap.trimToSize();
190 //              localMethodsMap.trimToSize();
191                 
192                 in = new InputStreamReadable( socket.getInputStream(), Long.MAX_VALUE );
193                 out = new OutputStreamWriteable( socket.getOutputStream() );\r
194                 \r
195                 String threadName = "Connection-"+socket.getInetAddress().getHostAddress()+":"+socket.getPort();\r
196                 \r
197                 thread.setName( threadName );
198                 thread.start();
199         }
200
201         @Override
202         public Interface getInterface() {
203                 return remoteType;
204         }
205         
206         @Override
207         public Method getMethod(MethodTypeBinding binding)
208         throws MethodNotSupportedException {
209                 // consumer suggests object bindings
210                 MethodTypeDefinition description = binding.getMethodDefinition();
211         \r
212                 if (!remoteMethodsMap.containsKey(description)) {\r
213         /*\r
214                         System.out.println("Method not found: "+description);\r
215                         System.out.println("Existing methods:" );\r
216                         for (MethodTypeDefinition k : remoteMethodsMap.keySet()) {\r
217                                 System.out.print(k);\r
218                                 if (k.getType().requestType.getComponentCount()>0) {\r
219                                         System.out.print(System.identityHashCode( k.getType().requestType.getComponentType(0) ) );\r
220                                 }\r
221                                 System.out.println();                                   \r
222                         }\r
223 */
224                         throw new MethodNotSupportedException(description.getName());
225                 }
226                 
227                 int id = remoteMethodsMap.get(description);
228                 
229                 try {
230                         return new MethodImpl(id, binding);
231                 } catch (SerializerConstructionException e) {
232                         throw new MethodNotSupportedException(e);
233                 }
234         }
235         
236         @Override
237         public Method getMethod(MethodTypeDefinition description)
238         throws MethodNotSupportedException {
239                 // producer suggests object bindings
240                 if (!remoteMethodsMap.containsKey(description)) {
241                         throw new MethodNotSupportedException(description.getName());
242                 }               
243                 int id = remoteMethodsMap.get(description);
244                 
245                 RecordBinding reqBinding = (RecordBinding) Bindings.getMutableBinding(description.getType().getRequestType());
246                 Binding resBinding = Bindings.getMutableBinding(description.getType().getResponseType());
247                 UnionBinding errBinding = (UnionBinding) Bindings.getMutableBinding(description.getType().getErrorType());
248                 MethodTypeBinding binding = new MethodTypeBinding(description, reqBinding, resBinding, errBinding);
249                 
250                 try {
251                         return new MethodImpl(id, binding);
252                 } catch (SerializerConstructionException e) {
253                         // Generic binding should work
254                         throw new MethodNotSupportedException(e);
255                 }
256         }
257         
258         public Socket getSocket()
259         {
260                 return socket;
261         }
262         
263         public interface ConnectionListener {
264                 /**
265                  * There was an error and connection was closed
266                  * 
267                  * @param error
268                  */
269                 void onError(Exception error);
270                 
271                 /**
272                  * close() was invoked
273                  */
274                 void onClosed();
275         }
276         \r
277         CopyOnWriteArrayList<ConnectionListener> listeners = new CopyOnWriteArrayList<ConnectionListener>();
278         
279         public synchronized void addConnectionListener(ConnectionListener listener) {           
280                 listeners.add( listener );
281         }\r
282         \r
283         public void removeConnectionListener(ConnectionListener listener) {\r
284                 listeners.remove( listener );\r
285         }
286         
287         class MethodImpl implements Method {
288                 int methodId;
289                 MethodTypeBinding methodBinding;
290                 Serializer responseSerializer;
291                 Serializer requestSerializer;
292                 Serializer errorSerializer;
293                 
294                 MethodImpl(int methodId, MethodTypeBinding methodBinding) throws SerializerConstructionException
295                 {
296                         this.methodId = methodId;
297                         this.methodBinding = methodBinding;
298                         this.requestSerializer = Bindings.getSerializer( methodBinding.getRequestBinding() );
299                         this.responseSerializer = Bindings.getSerializer( methodBinding.getResponseBinding() );
300                         this.errorSerializer = Bindings.getSerializer( methodBinding.getErrorBinding() );
301                 }               
302                 
303                 @Override
304                 public AsyncResult invoke(final Object request) {
305                         // Write, async
306                         final PendingRequest result = new PendingRequest(this, requestCounter.getAndIncrement());
307                         requests.put(result.requestId, result);\r
308                                                 
309                         if (!active) {
310                                 result.setInvokeException(new InvokeException(new ConnectionClosedException()));
311                         } else {                        
312                                 writeExecutor.execute(new Runnable() {
313                                 @Override
314                                 public void run() {                     \r
315                                   synchronized(TcpConnection.this) {
316                                         try {
317                                                 int size= requestSerializer.getSize(request, outIdentities);
318                                                 if (size>maxSendSize) {
319                                                         result.setInvokeException(new InvokeException(new MessageOverflowException()));
320                                                         return;
321                                                 }
322                                                 outIdentities.clear();
323
324                                                 RequestHeader reqHeader = new RequestHeader();
325                                                 reqHeader.methodId = methodId; 
326                                                 reqHeader.requestId = result.requestId;
327                                                 MESSAGE_SERIALIZER.serialize(out, outIdentities, reqHeader);
328                                                 outIdentities.clear();
329                                                 out.writeInt(size);
330                                                 requestSerializer.serialize(out, outIdentities, request);
331                                                 outIdentities.clear();
332                                                 out.flush();
333                                         } catch (IOException e) {
334                                                 result.setInvokeException(new InvokeException(e));
335                                         } catch (RuntimeException e) {
336                                                 result.setInvokeException(new InvokeException(e));
337                                         }\r
338                                   }
339                                 }});
340                         }
341                         return result;
342                 }
343
344                 @Override
345                 public MethodTypeBinding getMethodBinding() {
346                         return methodBinding;
347                 }
348         }
349         \r
350         void setClosed() \r
351         {\r
352                 for (ConnectionListener listener : listeners)\r
353                         listener.onClosed();\r
354         }       
355         void setError(Exception e) 
356         {\r
357                 for (ConnectionListener listener : listeners)
358                         listener.onError(e);
359                 close();                
360         }
361         
362         /**
363          * Get method interface that handles services locally (service requests by peer)
364          * 
365          * @return local method interface
366          */
367         public MethodInterface getLocalMethodInterface()
368         {
369                 return methodInterface;
370         }
371         
372         /**
373          * Get method interface that handles services locally (service requests by peer)
374          * 
375          * @return local method interface
376          */
377         public MethodTypeDefinition[] getLocalMethodDescriptions()
378         {
379                 return localMethods;
380         }       
381         \r
382         public MethodInterface getRemoteMethodInterface() {\r
383                 return this;\r
384         }\r
385         
386         /**
387          * Close the connection. All pending service request are canceled.
388          * The socket is not closed.
389          */
390         public void close() {
391                 active = false;
392                 // cancel all pending requests
393                 ArrayList<PendingRequest> reqs = new ArrayList<PendingRequest>(requests.values());
394                 for (PendingRequest pr : reqs) {
395                         pr.setInvokeException(new InvokeException(new ConnectionClosedException()));
396                 }
397                 requests.values().removeAll(reqs);
398                 // shutdown inthread
399                 thread.interrupt();     
400 //              for (ConnectionListener listener : listeners)\r
401 //                      listener.onClosed();\r
402         }\r
403         \r
404         /**\r
405          * Get the active connection of current thread\r
406          * \r
407          * @return Connection or <code>null</code> if current thread does not run connection\r
408          */\r
409         public static TcpConnection getCurrentConnection() {\r
410                 Thread t = Thread.currentThread();\r
411                 if (t instanceof ConnectionThread == false) return null;\r
412                 ConnectionThread ct = (ConnectionThread) t;\r
413                 return ct.getConnection();\r
414         }\r
415         \r
416         /**\r
417          * Connection Thread deserializes incoming messages from the TCP Stream.\r
418          *\r
419          */\r
420         class ConnectionThread extends Thread {\r
421                 public ConnectionThread() {\r
422                         setDaemon(true);\r
423                 }\r
424                 \r
425                 public TcpConnection getConnection() {\r
426                         return TcpConnection.this;\r
427                 }\r
428                 \r
429                 public void run() {\r
430                         while (!Thread.interrupted()) {\r
431                                 try {\r
432                                         Message header = (Message) MESSAGE_SERIALIZER.deserialize(in, inIdentities);\r
433                                         if (header instanceof RequestHeader) {\r
434                                                 final RequestHeader reqHeader = (RequestHeader) header;\r
435 \r
436                                                 int size = in.readInt();\r
437                                                 if (size>maxRecvSize) {\r
438                                                         setError(new MessageOverflowException());\r
439                                                         return;\r
440                                                 }\r
441                                                 \r
442                                                 int methodId = reqHeader.methodId;\r
443                                                 if (methodId<0||methodId>=localMethods.length) {\r
444                                                         setError(new Exception("ProtocolError"));\r
445                                                         return;\r
446                                                 }\r
447                                                 MethodTypeDefinition methodDescription = localMethods[methodId];\r
448                                                 // Let back-end determine bindings\r
449                                                 try {\r
450                                                         final Method method = methodInterface.getMethod(methodDescription);\r
451                                                         final MethodTypeBinding methodBinding = method.getMethodBinding();\r
452                                                         // Deserialize payload                                          \r
453                                                         final Object request = Bindings.getSerializerUnchecked(methodBinding.getRequestBinding()).deserialize(in, inIdentities);\r
454                                                         inIdentities.clear();\r
455                                                                 \r
456                                                         // Invoke method\r
457                                                         method.invoke(request).setListener(new InvokeListener() {\r
458                                                                 @Override\r
459                                                                 public void onCompleted(final Object response) {\r
460                                                                                 // Write RESP\r
461                                                                                 writeExecutor.execute(new Runnable() {\r
462                                                                                         @Override\r
463                                                                                         public void run() {\r
464                                                                                           synchronized(TcpConnection.this) {\r
465                                                                                                 try {\r
466                                                                                                         Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getResponseBinding());\r
467                                                                                                         int size = serializer.getSize(response, outIdentities);\r
468                                                                                                         outIdentities.clear();\r
469                                                                                                         if (size > maxSendSize) {\r
470                                                                                                                 ResponseTooLargeError tooLarge = new ResponseTooLargeError();\r
471                                                                                                                 tooLarge.requestId = reqHeader.requestId;\r
472                                                                                                                 MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);\r
473                                                                                                                 outIdentities.clear();                                                                                                  \r
474                                                                                                                 return;\r
475                                                                                                         }\r
476 \r
477                                                                                                         ResponseHeader respHeader = new ResponseHeader();\r
478                                                                                                         respHeader.requestId = reqHeader.requestId;\r
479                                                                                                         MESSAGE_SERIALIZER.serialize(out, outIdentities, respHeader);\r
480                                                                                                         outIdentities.clear();\r
481                                                                                                         out.writeInt(size);\r
482                                                                                                         \r
483                                                                                                         serializer.serialize(out, outIdentities, response);\r
484                                                                                                         outIdentities.clear();\r
485                                                                                                         out.flush();\r
486                                                                                                 } catch (IOException e) {\r
487                                                                                                         setError(e);\r
488                                                                                                 } catch (RuntimeException e) {\r
489                                                                                                         setError(e);\r
490                                                                                                 }\r
491                                                                                           }\r
492                                                                                         }});\r
493                                                                 }\r
494                                                                 @Override\r
495                                                                 public void onException(final Exception cause) {\r
496                                                                                 // Write ERRO\r
497                                                                                 writeExecutor.execute(new Runnable() {\r
498                                                                                         @Override\r
499                                                                                         public void run() {\r
500                                                                                           synchronized(TcpConnection.this) {\r
501                                                                                                 try {\r
502                                                                                                         Exception_ msg = new Exception_();\r
503                                                                                                         msg.message = cause.getClass().getName()+": "+cause.getMessage(); \r
504                                                                                                         \r
505                                                                                                         MESSAGE_SERIALIZER.serialize(out, outIdentities, msg);\r
506                                                                                                         outIdentities.clear();\r
507                                                                                                         out.flush();\r
508                                                                                                 } catch (IOException e) {\r
509                                                                                                         setError(e);\r
510                                                                                                 } catch (RuntimeException e) {\r
511                                                                                                         setError(e);\r
512                                                                                                 }\r
513                                                                                           }\r
514                                                                                         }});                                                                    \r
515                                                                         }\r
516                                                                 @Override\r
517                                                                 public void onExecutionError(final Object error) {\r
518                                                                                 // Write ERRO\r
519                                                                                 writeExecutor.execute(new Runnable() {\r
520                                                                                         @Override\r
521                                                                                         public void run() {\r
522                                                                                           synchronized(TcpConnection.this) {\r
523                                                                                                 try {\r
524                                                                                                         Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getErrorBinding());\r
525                                                                                                         int size = serializer.getSize(error, outIdentities);\r
526                                                                                                         outIdentities.clear();\r
527                                                                                                         \r
528                                                                                                         if (size > maxSendSize) {\r
529                                                                                                                 ResponseTooLargeError tooLarge = new ResponseTooLargeError();\r
530                                                                                                                 tooLarge.requestId = reqHeader.requestId;\r
531                                                                                                                 MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);\r
532                                                                                                                 outIdentities.clear();                                                                                                  \r
533                                                                                                                 return;\r
534                                                                                                         }\r
535                                                                                                         \r
536                                                                                                         ExecutionError_ errorHeader = new ExecutionError_();\r
537                                                                                                         errorHeader.requestId = reqHeader.requestId;\r
538                                                                                                         MESSAGE_SERIALIZER.serialize(out, outIdentities, errorHeader);\r
539                                                                                                         outIdentities.clear();\r
540                                                                                                         out.writeInt(size);\r
541                                                                                                         serializer.serialize(out, outIdentities, error);\r
542                                                                                                         outIdentities.clear();\r
543                                                                                                         out.flush();\r
544                                                                                                 } catch (IOException e) {\r
545                                                                                                         setError(e);\r
546                                                                                                 } catch (RuntimeException e) {\r
547                                                                                                         setError(e);\r
548                                                                                                 }\r
549                                                                                           }\r
550                                                                 }});                    \r
551                                                         }});\r
552 \r
553                                                 } catch (MethodNotSupportedException e) {\r
554                                                         in.skipBytes(size);\r
555                                                         // return with an error\r
556                                                         final InvalidMethodError error = new InvalidMethodError();\r
557                                                         error.requestId = reqHeader.requestId;\r
558                                                         writeExecutor.execute(new Runnable() {\r
559                                                                 @Override\r
560                                                                 public void run() {\r
561                                                                   synchronized(TcpConnection.this) {\r
562                                                                         try {\r
563                                                                                 MESSAGE_SERIALIZER.serialize(out, outIdentities, error);\r
564                                                                                 outIdentities.clear();\r
565                                                                                 out.flush();\r
566                                                                         } catch (IOException e) {\r
567                                                                                 setError(e);\r
568                                                                         } catch (RuntimeException e) {\r
569                                                                                 setError(e);\r
570                                                                         }\r
571                                                                   }\r
572                                                                 }});                                                    \r
573                                                         continue;                                                               \r
574                                                 } \r
575                                                                 \r
576         \r
577                                         } else if (header instanceof ResponseHeader) {\r
578                                                 int requestId = ((ResponseHeader)header).requestId;\r
579                                                 PendingRequest req = requests.remove(requestId);\r
580                                                 if (req==null) {\r
581                                                         setError(new RuntimeException("Request by id "+requestId+" does not exist"));\r
582                                                         return;                                         \r
583                                                 }                                               \r
584                                                 int size = in.readInt();\r
585                                                 if (size>maxRecvSize) {\r
586                                                         // TODO SOMETHING\r
587                                                 }\r
588                                                 Object response = req.method.responseSerializer.deserialize(in, inIdentities);\r
589                                                 inIdentities.clear();\r
590                                                 req.setResponse(response);\r
591                                         } else if (header instanceof ExecutionError_) {                                         \r
592                                                 int requestId = ((ExecutionError_)header).requestId;\r
593                                                 PendingRequest req = requests.remove(requestId);\r
594                                                 if (req==null) {\r
595                                                         setError(new RuntimeException("Request by id "+requestId+" does not exist"));\r
596                                                         return;\r
597                                                 }\r
598                                                 int size = in.readInt();\r
599                                                 if (size>maxRecvSize) {\r
600                                                         // TODO SOMETHING\r
601                                                 }\r
602                                                 Object executionError = req.method.errorSerializer.deserialize(in, inIdentities);\r
603                                                 inIdentities.clear();\r
604                                                 req.setExecutionError(executionError);\r
605                                         } else if (header instanceof Exception_) {\r
606                                                 int requestId = ((Exception_)header).requestId;\r
607                                                 PendingRequest req = requests.remove(requestId);\r
608                                                 req.setExecutionError(new Exception(((Exception_)header).message));\r
609                                         } else if (header instanceof InvalidMethodError) {\r
610                                                 int requestId = ((InvalidMethodError)header).requestId;\r
611                                                 PendingRequest req = requests.remove(requestId);\r
612                                                 req.setInvokeException(new InvokeException(new MethodNotSupportedException("?")));\r
613                                         } else if (header instanceof ResponseTooLargeError) {\r
614                                                 int requestId = ((ResponseTooLargeError)header).requestId;\r
615                                                 PendingRequest req = requests.remove(requestId);\r
616                                                 req.setInvokeException(new InvokeException(new MessageOverflowException()));\r
617                                         }\r
618                                         \r
619                                 } catch (EOFException e) {\r
620                                         setClosed();\r
621                                         break;\r
622                                 } catch (SocketException e) {\r
623                                         if (e.getMessage().equals("Socket Closed"))\r
624                                                 setClosed();\r
625                                         else\r
626                                                 setError(e);\r
627                                         break;\r
628                                 } catch (IOException e) {\r
629                                         setError(e);\r
630                                         break;\r
631                                 }\r
632                         }\r
633                         try {\r
634                                 socket.close();\r
635                         } catch (IOException e) {\r
636                         }\r
637                         // Close pending requests\r
638                         close();\r
639                 };\r
640         }
641
642         // Thread that reads input data
643         ConnectionThread thread = new ConnectionThread();
644         
645         class PendingRequest extends AsyncResultImpl {
646
647                 MethodImpl method;
648                 
649                 // request id
650                 int requestId;
651                 
652                 public PendingRequest(MethodImpl method, int requestId) {
653                         this.method = method; 
654                         this.requestId = requestId;
655                 }               
656         }
657         
658
659         @Union({RequestHeader.class, ResponseHeader.class, ExecutionError_.class, Exception_.class, InvalidMethodError.class, ResponseTooLargeError.class})
660         public static class Message {}
661
662         public static class RequestHeader extends Message {
663                 public int requestId;
664                 public int methodId;            
665                 // Request Object
666                 public RequestHeader() {}
667         }
668
669         public static class ResponseHeader extends Message {
670                 public int requestId;
671                 // Response object
672                 public ResponseHeader() {}
673         }
674
675         // Error while invoking a method
676         public static class ExecutionError_ extends Message {
677                 public int requestId;
678                 // Error object
679                 public ExecutionError_() {}
680         }
681
682         // MethodName does not exist
683         public static class InvalidMethodError extends Message {
684                 public int requestId;
685                 public InvalidMethodError() {}
686         }
687
688         // Exception, not in method but somewhere else 
689         public static class Exception_ extends Message {
690                 public int requestId;
691                 public String message;
692                 public Exception_() {}
693         }
694
695         public static class ResponseTooLargeError extends Message {
696                 public int requestId;
697                 public ResponseTooLargeError() {}
698         }
699         
700 }
701