]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.databoard/src/org/simantics/databoard/method/TcpConnection.java
Improved Acorn database rollback logic.
[simantics/platform.git] / bundles / org.simantics.databoard / src / org / simantics / databoard / method / TcpConnection.java
1 /*******************************************************************************
2  *  Copyright (c) 2010 Association for Decentralized Information Management in
3  *  Industry THTH ry.
4  *  All rights reserved. This program and the accompanying materials
5  *  are made available under the terms of the Eclipse Public License v1.0
6  *  which accompanies this distribution, and is available at
7  *  http://www.eclipse.org/legal/epl-v10.html
8  *
9  *  Contributors:
10  *      VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.databoard.method;
13
14 import gnu.trove.map.hash.TObjectIntHashMap;
15
16 import java.io.EOFException;
17 import java.io.IOException;
18 import java.net.Socket;
19 import java.net.SocketException;
20 import java.nio.charset.Charset;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.SynchronousQueue;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import org.simantics.databoard.Bindings;
35 import org.simantics.databoard.annotations.Union;
36 import org.simantics.databoard.binding.Binding;
37 import org.simantics.databoard.binding.RecordBinding;
38 import org.simantics.databoard.binding.UnionBinding;
39 import org.simantics.databoard.serialization.Serializer;
40 import org.simantics.databoard.serialization.SerializerConstructionException;
41 import org.simantics.databoard.util.binary.BinaryReadable;
42 import org.simantics.databoard.util.binary.BinaryWriteable;
43 import org.simantics.databoard.util.binary.InputStreamReadable;
44 import org.simantics.databoard.util.binary.OutputStreamWriteable;
45
46 /**
47  * Connection is a class that handles request-response communication over a
48  * socket. 
49  * <p>
50  * Requests have asynchronous result. The result can be acquired using one of 
51  * the three methods:
52  *  1) Blocking read AsyncResult.waitForResponse()
53  *  2) Poll  AsyncResult.getResponse()
54  *  3) Listen AsyncResult.setListener()
55  * <p>
56  * The socket must be established before Connection is instantiated.
57  * Closing connection does not close its Socket.
58  * If the socket is closed before connection there an error is thrown.
59  * The error is available by placing listener.
60  * The proper order to close a connection is to close Connection first
61  * and then Socket. 
62  *  
63  * @author Toni Kalajainen <toni.kalajainen@vtt.fi>
64  */
65 public class TcpConnection implements MethodInterface {
66
67         public static final ExecutorService SHARED_EXECUTOR_SERVICE = 
68                 new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
69         
70         static final Serializer MESSAGE_SERIALIZER = Bindings.getSerializerUnchecked( Bindings.getBindingUnchecked(Message.class) );
71         static Charset UTF8 = Charset.forName("UTF8");
72         
73         Handshake local, remote;
74         
75         Interface remoteType;
76         MethodTypeDefinition[] localMethods, remoteMethods;
77         HashMap<MethodTypeDefinition, Integer> localMethodsMap, remoteMethodsMap;
78         
79         Socket socket;
80                 
81         // if false, there is an error in the socket or the connection has been shutdown 
82         boolean active = true;
83
84         // Objects used for handling local services 
85         MethodInterface methodInterface;
86                 
87         // Objects used for reading data
88         ConcurrentHashMap<Integer, PendingRequest> requests = new ConcurrentHashMap<Integer, PendingRequest>();  
89         List<Object> inIdentities = new ArrayList<Object>();
90         BinaryReadable in;
91         int maxRecvSize;
92         
93         // Object used for writing data
94         public ExecutorService writeExecutor = SHARED_EXECUTOR_SERVICE;
95         TObjectIntHashMap<Object> outIdentities = new TObjectIntHashMap<Object>();
96         BinaryWriteable out;
97         AtomicInteger requestCounter = new AtomicInteger(0);
98         int maxSendSize;
99         
100         // Cached method descriptions
101         Map<String, MethodType> methodTypes = new ConcurrentHashMap<String, MethodType>();
102         
103         /**
104          * Handshake a socket
105          * 
106          * @param socket 
107          * @param localData local data
108          * @return the remote data
109          * @throws IOException
110          * @throws RuntimeException unexpected error (BindingException or EncodingException) 
111          */
112         public static Handshake handshake(final Socket socket, final Handshake localData)
113         throws IOException
114         {
115                 final BinaryReadable bin = new InputStreamReadable( socket.getInputStream(), Long.MAX_VALUE );
116                 final BinaryWriteable bout = new OutputStreamWriteable( socket.getOutputStream() );
117                 ExecutorService writeExecutor = SHARED_EXECUTOR_SERVICE;
118                 
119                 // do hand-shake                
120                 final Exception[] writeError = new Exception[1];
121                 final Semaphore s = new Semaphore(0);           
122                 writeExecutor.execute(new Runnable() {
123                         @Override
124                         public void run() {
125                                 try {
126                                         TObjectIntHashMap<Object> outIdentities = new TObjectIntHashMap<Object>();                                      
127                                         Handshake.SERIALIZER.serialize(bout, outIdentities, localData);
128                                         bout.flush();
129                                         outIdentities.clear();
130                                 } catch (IOException e) {
131                                         writeError[0] = e;
132                                 } finally {
133                                         s.release(1);
134                                 }
135                         }});
136                 
137                 // Read remote peer's handshake 
138                 List<Object> inIdentities = new ArrayList<Object>();
139                 Handshake result = (Handshake) Handshake.SERIALIZER.deserialize(bin, inIdentities);
140                 inIdentities.clear();
141                 
142                 // Check that write was ok
143                 try {
144                         s.acquire(1);
145                         Exception e = writeError[0];
146                         if (e!=null && e instanceof IOException) 
147                                 throw (IOException) e;
148                         if (e!=null)
149                                 throw new RuntimeException(e);                  
150                 } catch (InterruptedException e) {
151                         throw new RuntimeException(e);
152                 } finally {
153 //                      writeExecutor.shutdown();
154                 }
155                 
156                 return result;
157         }
158         
159         /**
160          * Create a connection to a hand-shaken socket 
161          * 
162          * @param socket
163          * @param methodInterface local method handler 
164          * @param localData
165          * @param remoteData
166          * @throws IOException
167          */
168         public TcpConnection(Socket socket, MethodInterface methodInterface, Handshake localData, Handshake remoteData) 
169         throws IOException {
170                 if (socket==null || localData==null || remoteData==null) 
171                         throw new IllegalArgumentException("null arg");
172                 
173                 this.methodInterface = methodInterface;
174                 this.socket = socket;
175                 this.local = localData;
176                 this.remote = remoteData;
177                 this.maxSendSize = Math.min(localData.sendMsgLimit, remoteData.recvMsgLimit);
178                 this.maxRecvSize = Math.min(localData.recvMsgLimit, remoteData.sendMsgLimit);
179                 
180                 this.localMethods = local.methods;
181                 this.remoteMethods = remote.methods;
182                 this.remoteType = new Interface(this.remoteMethods);
183                 this.localMethodsMap = new HashMap<MethodTypeDefinition, Integer>();
184                 this.remoteMethodsMap = new HashMap<MethodTypeDefinition, Integer>();
185                 for (int i=0; i<localMethods.length; i++)
186                         localMethodsMap.put(localMethods[i], i);
187                 for (int i=0; i<remoteMethods.length; i++)
188                         remoteMethodsMap.put(remoteMethods[i], i);
189 //              remoteMethodsMap.trimToSize();
190 //              localMethodsMap.trimToSize();
191                 
192                 in = new InputStreamReadable( socket.getInputStream(), Long.MAX_VALUE );
193                 out = new OutputStreamWriteable( socket.getOutputStream() );
194                 
195                 String threadName = "Connection-"+socket.getInetAddress().getHostAddress()+":"+socket.getPort();
196                 
197                 thread.setName( threadName );
198                 thread.start();
199         }
200
201         @Override
202         public Interface getInterface() {
203                 return remoteType;
204         }
205         
206         @Override
207         public Method getMethod(MethodTypeBinding binding)
208         throws MethodNotSupportedException {
209                 // consumer suggests object bindings
210                 MethodTypeDefinition description = binding.getMethodDefinition();
211         
212                 if (!remoteMethodsMap.containsKey(description)) {
213         /*
214                         System.out.println("Method not found: "+description);
215                         System.out.println("Existing methods:" );
216                         for (MethodTypeDefinition k : remoteMethodsMap.keySet()) {
217                                 System.out.print(k);
218                                 if (k.getType().requestType.getComponentCount()>0) {
219                                         System.out.print(System.identityHashCode( k.getType().requestType.getComponentType(0) ) );
220                                 }
221                                 System.out.println();                                   
222                         }
223 */
224                         throw new MethodNotSupportedException(description.getName());
225                 }
226                 
227                 int id = remoteMethodsMap.get(description);
228                 
229                 try {
230                         return new MethodImpl(id, binding);
231                 } catch (SerializerConstructionException e) {
232                         throw new MethodNotSupportedException(e);
233                 }
234         }
235         
236         @Override
237         public Method getMethod(MethodTypeDefinition description)
238         throws MethodNotSupportedException {
239                 // producer suggests object bindings
240                 if (!remoteMethodsMap.containsKey(description)) {
241                         throw new MethodNotSupportedException(description.getName());
242                 }               
243                 int id = remoteMethodsMap.get(description);
244                 
245                 RecordBinding reqBinding = (RecordBinding) Bindings.getMutableBinding(description.getType().getRequestType());
246                 Binding resBinding = Bindings.getMutableBinding(description.getType().getResponseType());
247                 UnionBinding errBinding = (UnionBinding) Bindings.getMutableBinding(description.getType().getErrorType());
248                 MethodTypeBinding binding = new MethodTypeBinding(description, reqBinding, resBinding, errBinding);
249                 
250                 try {
251                         return new MethodImpl(id, binding);
252                 } catch (SerializerConstructionException e) {
253                         // Generic binding should work
254                         throw new MethodNotSupportedException(e);
255                 }
256         }
257         
258         public Socket getSocket()
259         {
260                 return socket;
261         }
262         
263         public interface ConnectionListener {
264                 /**
265                  * There was an error and connection was closed
266                  * 
267                  * @param error
268                  */
269                 void onError(Exception error);
270                 
271                 /**
272                  * close() was invoked
273                  */
274                 void onClosed();
275         }
276         
277         CopyOnWriteArrayList<ConnectionListener> listeners = new CopyOnWriteArrayList<ConnectionListener>();
278         
279         public synchronized void addConnectionListener(ConnectionListener listener) {           
280                 listeners.add( listener );
281         }
282         
283         public void removeConnectionListener(ConnectionListener listener) {
284                 listeners.remove( listener );
285         }
286         
287         class MethodImpl implements Method {
288                 int methodId;
289                 MethodTypeBinding methodBinding;
290                 Serializer responseSerializer;
291                 Serializer requestSerializer;
292                 Serializer errorSerializer;
293                 
294                 MethodImpl(int methodId, MethodTypeBinding methodBinding) throws SerializerConstructionException
295                 {
296                         this.methodId = methodId;
297                         this.methodBinding = methodBinding;
298                         this.requestSerializer = Bindings.getSerializer( methodBinding.getRequestBinding() );
299                         this.responseSerializer = Bindings.getSerializer( methodBinding.getResponseBinding() );
300                         this.errorSerializer = Bindings.getSerializer( methodBinding.getErrorBinding() );
301                 }               
302                 
303                 @Override
304                 public AsyncResult invoke(final Object request) {
305                         // Write, async
306                         final PendingRequest result = new PendingRequest(this, requestCounter.getAndIncrement());
307                         requests.put(result.requestId, result);
308                                                 
309                         if (!active) {
310                                 result.setInvokeException(new InvokeException(new ConnectionClosedException()));
311                         } else {                        
312                                 writeExecutor.execute(new Runnable() {
313                                 @Override
314                                 public void run() {                     
315                                   synchronized(TcpConnection.this) {
316                                         try {
317                                                 int size= requestSerializer.getSize(request, outIdentities);
318                                                 if (size>maxSendSize) {
319                                                         result.setInvokeException(new InvokeException(new MessageOverflowException()));
320                                                         return;
321                                                 }
322                                                 outIdentities.clear();
323
324                                                 RequestHeader reqHeader = new RequestHeader();
325                                                 reqHeader.methodId = methodId; 
326                                                 reqHeader.requestId = result.requestId;
327                                                 MESSAGE_SERIALIZER.serialize(out, outIdentities, reqHeader);
328                                                 outIdentities.clear();
329                                                 out.writeInt(size);
330                                                 requestSerializer.serialize(out, outIdentities, request);
331                                                 outIdentities.clear();
332                                                 out.flush();
333                                         } catch (IOException e) {
334                                                 result.setInvokeException(new InvokeException(e));
335                                         } catch (RuntimeException e) {
336                                                 result.setInvokeException(new InvokeException(e));
337                                         }
338                                   }
339                                 }});
340                         }
341                         return result;
342                 }
343
344                 @Override
345                 public MethodTypeBinding getMethodBinding() {
346                         return methodBinding;
347                 }
348         }
349         
350         void setClosed() 
351         {
352                 for (ConnectionListener listener : listeners)
353                         listener.onClosed();
354         }       
355         void setError(Exception e) 
356         {
357                 for (ConnectionListener listener : listeners)
358                         listener.onError(e);
359                 close();                
360         }
361         
362         /**
363          * Get method interface that handles services locally (service requests by peer)
364          * 
365          * @return local method interface
366          */
367         public MethodInterface getLocalMethodInterface()
368         {
369                 return methodInterface;
370         }
371         
372         /**
373          * Get method interface that handles services locally (service requests by peer)
374          * 
375          * @return local method interface
376          */
377         public MethodTypeDefinition[] getLocalMethodDescriptions()
378         {
379                 return localMethods;
380         }       
381         
382         public MethodInterface getRemoteMethodInterface() {
383                 return this;
384         }
385         
386         /**
387          * Close the connection. All pending service request are canceled.
388          * The socket is not closed.
389          */
390         public void close() {
391                 active = false;
392                 // cancel all pending requests
393                 ArrayList<PendingRequest> reqs = new ArrayList<PendingRequest>(requests.values());
394                 for (PendingRequest pr : reqs) {
395                         pr.setInvokeException(new InvokeException(new ConnectionClosedException()));
396                 }
397                 requests.values().removeAll(reqs);
398                 // shutdown inthread
399                 thread.interrupt();     
400 //              for (ConnectionListener listener : listeners)
401 //                      listener.onClosed();
402         }
403         
404         /**
405          * Get the active connection of current thread
406          * 
407          * @return Connection or <code>null</code> if current thread does not run connection
408          */
409         public static TcpConnection getCurrentConnection() {
410                 Thread t = Thread.currentThread();
411                 if (t instanceof ConnectionThread == false) return null;
412                 ConnectionThread ct = (ConnectionThread) t;
413                 return ct.getConnection();
414         }
415         
416         /**
417          * Connection Thread deserializes incoming messages from the TCP Stream.
418          *
419          */
420         class ConnectionThread extends Thread {
421                 public ConnectionThread() {
422                         setDaemon(true);
423                 }
424                 
425                 public TcpConnection getConnection() {
426                         return TcpConnection.this;
427                 }
428                 
429                 public void run() {
430                         while (!Thread.interrupted()) {
431                                 try {
432                                         Message header = (Message) MESSAGE_SERIALIZER.deserialize(in, inIdentities);
433                                         if (header instanceof RequestHeader) {
434                                                 final RequestHeader reqHeader = (RequestHeader) header;
435
436                                                 int size = in.readInt();
437                                                 if (size>maxRecvSize) {
438                                                         setError(new MessageOverflowException());
439                                                         return;
440                                                 }
441                                                 
442                                                 int methodId = reqHeader.methodId;
443                                                 if (methodId<0||methodId>=localMethods.length) {
444                                                         setError(new Exception("ProtocolError"));
445                                                         return;
446                                                 }
447                                                 MethodTypeDefinition methodDescription = localMethods[methodId];
448                                                 // Let back-end determine bindings
449                                                 try {
450                                                         final Method method = methodInterface.getMethod(methodDescription);
451                                                         final MethodTypeBinding methodBinding = method.getMethodBinding();
452                                                         // Deserialize payload                                          
453                                                         final Object request = Bindings.getSerializerUnchecked(methodBinding.getRequestBinding()).deserialize(in, inIdentities);
454                                                         inIdentities.clear();
455                                                                 
456                                                         // Invoke method
457                                                         method.invoke(request).setListener(new InvokeListener() {
458                                                                 @Override
459                                                                 public void onCompleted(final Object response) {
460                                                                                 // Write RESP
461                                                                                 writeExecutor.execute(new Runnable() {
462                                                                                         @Override
463                                                                                         public void run() {
464                                                                                           synchronized(TcpConnection.this) {
465                                                                                                 try {
466                                                                                                         Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getResponseBinding());
467                                                                                                         int size = serializer.getSize(response, outIdentities);
468                                                                                                         outIdentities.clear();
469                                                                                                         if (size > maxSendSize) {
470                                                                                                                 ResponseTooLargeError tooLarge = new ResponseTooLargeError();
471                                                                                                                 tooLarge.requestId = reqHeader.requestId;
472                                                                                                                 MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);
473                                                                                                                 outIdentities.clear();                                                                                                  
474                                                                                                                 return;
475                                                                                                         }
476
477                                                                                                         ResponseHeader respHeader = new ResponseHeader();
478                                                                                                         respHeader.requestId = reqHeader.requestId;
479                                                                                                         MESSAGE_SERIALIZER.serialize(out, outIdentities, respHeader);
480                                                                                                         outIdentities.clear();
481                                                                                                         out.writeInt(size);
482                                                                                                         
483                                                                                                         serializer.serialize(out, outIdentities, response);
484                                                                                                         outIdentities.clear();
485                                                                                                         out.flush();
486                                                                                                 } catch (IOException e) {
487                                                                                                         setError(e);
488                                                                                                 } catch (RuntimeException e) {
489                                                                                                         setError(e);
490                                                                                                 }
491                                                                                           }
492                                                                                         }});
493                                                                 }
494                                                                 @Override
495                                                                 public void onException(final Exception cause) {
496                                                                                 // Write ERRO
497                                                                                 writeExecutor.execute(new Runnable() {
498                                                                                         @Override
499                                                                                         public void run() {
500                                                                                           synchronized(TcpConnection.this) {
501                                                                                                 try {
502                                                                                                         Exception_ msg = new Exception_();
503                                                                                                         msg.message = cause.getClass().getName()+": "+cause.getMessage(); 
504                                                                                                         
505                                                                                                         MESSAGE_SERIALIZER.serialize(out, outIdentities, msg);
506                                                                                                         outIdentities.clear();
507                                                                                                         out.flush();
508                                                                                                 } catch (IOException e) {
509                                                                                                         setError(e);
510                                                                                                 } catch (RuntimeException e) {
511                                                                                                         setError(e);
512                                                                                                 }
513                                                                                           }
514                                                                                         }});                                                                    
515                                                                         }
516                                                                 @Override
517                                                                 public void onExecutionError(final Object error) {
518                                                                                 // Write ERRO
519                                                                                 writeExecutor.execute(new Runnable() {
520                                                                                         @Override
521                                                                                         public void run() {
522                                                                                           synchronized(TcpConnection.this) {
523                                                                                                 try {
524                                                                                                         Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getErrorBinding());
525                                                                                                         int size = serializer.getSize(error, outIdentities);
526                                                                                                         outIdentities.clear();
527                                                                                                         
528                                                                                                         if (size > maxSendSize) {
529                                                                                                                 ResponseTooLargeError tooLarge = new ResponseTooLargeError();
530                                                                                                                 tooLarge.requestId = reqHeader.requestId;
531                                                                                                                 MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);
532                                                                                                                 outIdentities.clear();                                                                                                  
533                                                                                                                 return;
534                                                                                                         }
535                                                                                                         
536                                                                                                         ExecutionError_ errorHeader = new ExecutionError_();
537                                                                                                         errorHeader.requestId = reqHeader.requestId;
538                                                                                                         MESSAGE_SERIALIZER.serialize(out, outIdentities, errorHeader);
539                                                                                                         outIdentities.clear();
540                                                                                                         out.writeInt(size);
541                                                                                                         serializer.serialize(out, outIdentities, error);
542                                                                                                         outIdentities.clear();
543                                                                                                         out.flush();
544                                                                                                 } catch (IOException e) {
545                                                                                                         setError(e);
546                                                                                                 } catch (RuntimeException e) {
547                                                                                                         setError(e);
548                                                                                                 }
549                                                                                           }
550                                                                 }});                    
551                                                         }});
552
553                                                 } catch (MethodNotSupportedException e) {
554                                                         in.skipBytes(size);
555                                                         // return with an error
556                                                         final InvalidMethodError error = new InvalidMethodError();
557                                                         error.requestId = reqHeader.requestId;
558                                                         writeExecutor.execute(new Runnable() {
559                                                                 @Override
560                                                                 public void run() {
561                                                                   synchronized(TcpConnection.this) {
562                                                                         try {
563                                                                                 MESSAGE_SERIALIZER.serialize(out, outIdentities, error);
564                                                                                 outIdentities.clear();
565                                                                                 out.flush();
566                                                                         } catch (IOException e) {
567                                                                                 setError(e);
568                                                                         } catch (RuntimeException e) {
569                                                                                 setError(e);
570                                                                         }
571                                                                   }
572                                                                 }});                                                    
573                                                         continue;                                                               
574                                                 } 
575                                                                 
576         
577                                         } else if (header instanceof ResponseHeader) {
578                                                 int requestId = ((ResponseHeader)header).requestId;
579                                                 PendingRequest req = requests.remove(requestId);
580                                                 if (req==null) {
581                                                         setError(new RuntimeException("Request by id "+requestId+" does not exist"));
582                                                         return;                                         
583                                                 }                                               
584                                                 int size = in.readInt();
585                                                 if (size>maxRecvSize) {
586                                                         // TODO SOMETHING
587                                                 }
588                                                 Object response = req.method.responseSerializer.deserialize(in, inIdentities);
589                                                 inIdentities.clear();
590                                                 req.setResponse(response);
591                                         } else if (header instanceof ExecutionError_) {                                         
592                                                 int requestId = ((ExecutionError_)header).requestId;
593                                                 PendingRequest req = requests.remove(requestId);
594                                                 if (req==null) {
595                                                         setError(new RuntimeException("Request by id "+requestId+" does not exist"));
596                                                         return;
597                                                 }
598                                                 int size = in.readInt();
599                                                 if (size>maxRecvSize) {
600                                                         // TODO SOMETHING
601                                                 }
602                                                 Object executionError = req.method.errorSerializer.deserialize(in, inIdentities);
603                                                 inIdentities.clear();
604                                                 req.setExecutionError(executionError);
605                                         } else if (header instanceof Exception_) {
606                                                 int requestId = ((Exception_)header).requestId;
607                                                 PendingRequest req = requests.remove(requestId);
608                                                 req.setExecutionError(new Exception(((Exception_)header).message));
609                                         } else if (header instanceof InvalidMethodError) {
610                                                 int requestId = ((InvalidMethodError)header).requestId;
611                                                 PendingRequest req = requests.remove(requestId);
612                                                 req.setInvokeException(new InvokeException(new MethodNotSupportedException("?")));
613                                         } else if (header instanceof ResponseTooLargeError) {
614                                                 int requestId = ((ResponseTooLargeError)header).requestId;
615                                                 PendingRequest req = requests.remove(requestId);
616                                                 req.setInvokeException(new InvokeException(new MessageOverflowException()));
617                                         }
618                                         
619                                 } catch (EOFException e) {
620                                         setClosed();
621                                         break;
622                                 } catch (SocketException e) {
623                                         if (e.getMessage().equals("Socket Closed"))
624                                                 setClosed();
625                                         else
626                                                 setError(e);
627                                         break;
628                                 } catch (IOException e) {
629                                         setError(e);
630                                         break;
631                                 }
632                         }
633                         try {
634                                 socket.close();
635                         } catch (IOException e) {
636                         }
637                         // Close pending requests
638                         close();
639                 };
640         }
641
642         // Thread that reads input data
643         ConnectionThread thread = new ConnectionThread();
644         
645         class PendingRequest extends AsyncResultImpl {
646
647                 MethodImpl method;
648                 
649                 // request id
650                 int requestId;
651                 
652                 public PendingRequest(MethodImpl method, int requestId) {
653                         this.method = method; 
654                         this.requestId = requestId;
655                 }               
656         }
657         
658
659         @Union({RequestHeader.class, ResponseHeader.class, ExecutionError_.class, Exception_.class, InvalidMethodError.class, ResponseTooLargeError.class})
660         public static class Message {}
661
662         public static class RequestHeader extends Message {
663                 public int requestId;
664                 public int methodId;            
665                 // Request Object
666                 public RequestHeader() {}
667         }
668
669         public static class ResponseHeader extends Message {
670                 public int requestId;
671                 // Response object
672                 public ResponseHeader() {}
673         }
674
675         // Error while invoking a method
676         public static class ExecutionError_ extends Message {
677                 public int requestId;
678                 // Error object
679                 public ExecutionError_() {}
680         }
681
682         // MethodName does not exist
683         public static class InvalidMethodError extends Message {
684                 public int requestId;
685                 public InvalidMethodError() {}
686         }
687
688         // Exception, not in method but somewhere else 
689         public static class Exception_ extends Message {
690                 public int requestId;
691                 public String message;
692                 public Exception_() {}
693         }
694
695         public static class ResponseTooLargeError extends Message {
696                 public int requestId;
697                 public ResponseTooLargeError() {}
698         }
699         
700 }
701