1 /*******************************************************************************
\r
2 * Copyright (c) 2010 Association for Decentralized Information Management in
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package org.simantics.databoard.method;
14 import gnu.trove.map.hash.TObjectIntHashMap;
\r
16 import java.io.EOFException;
\r
17 import java.io.IOException;
\r
18 import java.net.Socket;
\r
19 import java.net.SocketException;
\r
20 import java.nio.charset.Charset;
\r
21 import java.util.ArrayList;
\r
22 import java.util.HashMap;
\r
23 import java.util.List;
\r
24 import java.util.Map;
\r
25 import java.util.concurrent.ConcurrentHashMap;
\r
26 import java.util.concurrent.CopyOnWriteArrayList;
\r
27 import java.util.concurrent.ExecutorService;
\r
28 import java.util.concurrent.Semaphore;
\r
29 import java.util.concurrent.SynchronousQueue;
\r
30 import java.util.concurrent.ThreadPoolExecutor;
\r
31 import java.util.concurrent.TimeUnit;
\r
32 import java.util.concurrent.atomic.AtomicInteger;
\r
34 import org.simantics.databoard.Bindings;
\r
35 import org.simantics.databoard.annotations.Union;
\r
36 import org.simantics.databoard.binding.Binding;
\r
37 import org.simantics.databoard.binding.RecordBinding;
\r
38 import org.simantics.databoard.binding.UnionBinding;
\r
39 import org.simantics.databoard.serialization.Serializer;
\r
40 import org.simantics.databoard.serialization.SerializerConstructionException;
\r
41 import org.simantics.databoard.util.binary.BinaryReadable;
\r
42 import org.simantics.databoard.util.binary.BinaryWriteable;
\r
43 import org.simantics.databoard.util.binary.InputStreamReadable;
\r
44 import org.simantics.databoard.util.binary.OutputStreamWriteable;
\r
47 * Connection is a class that handles request-response communication over a
50 * Requests have asynchronous result. The result can be acquired using one of
52 * 1) Blocking read AsyncResult.waitForResponse()
53 * 2) Poll AsyncResult.getResponse()
54 * 3) Listen AsyncResult.setListener()
56 * The socket must be established before Connection is instantiated.
57 * Closing connection does not close its Socket.
58 * If the socket is closed before connection there an error is thrown.
59 * The error is available by placing listener.
60 * The proper order to close a connection is to close Connection first
63 * @author Toni Kalajainen <toni.kalajainen@vtt.fi>
65 public class TcpConnection implements MethodInterface {
67 public static final ExecutorService SHARED_EXECUTOR_SERVICE =
\r
68 new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
\r
70 static final Serializer MESSAGE_SERIALIZER = Bindings.getSerializerUnchecked( Bindings.getBindingUnchecked(Message.class) );
71 static Charset UTF8 = Charset.forName("UTF8");
73 Handshake local, remote;
76 MethodTypeDefinition[] localMethods, remoteMethods;
77 HashMap<MethodTypeDefinition, Integer> localMethodsMap, remoteMethodsMap;
81 // if false, there is an error in the socket or the connection has been shutdown
82 boolean active = true;
84 // Objects used for handling local services
85 MethodInterface methodInterface;
87 // Objects used for reading data
88 ConcurrentHashMap<Integer, PendingRequest> requests = new ConcurrentHashMap<Integer, PendingRequest>();
89 List<Object> inIdentities = new ArrayList<Object>();
93 // Object used for writing data
94 public ExecutorService writeExecutor = SHARED_EXECUTOR_SERVICE;
95 TObjectIntHashMap<Object> outIdentities = new TObjectIntHashMap<Object>();
97 AtomicInteger requestCounter = new AtomicInteger(0);
100 // Cached method descriptions
101 Map<String, MethodType> methodTypes = new ConcurrentHashMap<String, MethodType>();
107 * @param localData local data
108 * @return the remote data
109 * @throws IOException
110 * @throws RuntimeException unexpected error (BindingException or EncodingException)
112 public static Handshake handshake(final Socket socket, final Handshake localData)
115 final BinaryReadable bin = new InputStreamReadable( socket.getInputStream(), Long.MAX_VALUE );
116 final BinaryWriteable bout = new OutputStreamWriteable( socket.getOutputStream() );
117 ExecutorService writeExecutor = SHARED_EXECUTOR_SERVICE;
120 final Exception[] writeError = new Exception[1];
121 final Semaphore s = new Semaphore(0);
122 writeExecutor.execute(new Runnable() {
126 TObjectIntHashMap<Object> outIdentities = new TObjectIntHashMap<Object>();
127 Handshake.SERIALIZER.serialize(bout, outIdentities, localData);
129 outIdentities.clear();
130 } catch (IOException e) {
137 // Read remote peer's handshake
138 List<Object> inIdentities = new ArrayList<Object>();
139 Handshake result = (Handshake) Handshake.SERIALIZER.deserialize(bin, inIdentities);
140 inIdentities.clear();
142 // Check that write was ok
145 Exception e = writeError[0];
146 if (e!=null && e instanceof IOException)
147 throw (IOException) e;
149 throw new RuntimeException(e);
150 } catch (InterruptedException e) {
151 throw new RuntimeException(e);
153 // writeExecutor.shutdown();
160 * Create a connection to a hand-shaken socket
163 * @param methodInterface local method handler
166 * @throws IOException
168 public TcpConnection(Socket socket, MethodInterface methodInterface, Handshake localData, Handshake remoteData)
170 if (socket==null || localData==null || remoteData==null)
171 throw new IllegalArgumentException("null arg");
173 this.methodInterface = methodInterface;
174 this.socket = socket;
175 this.local = localData;
176 this.remote = remoteData;
177 this.maxSendSize = Math.min(localData.sendMsgLimit, remoteData.recvMsgLimit);
178 this.maxRecvSize = Math.min(localData.recvMsgLimit, remoteData.sendMsgLimit);
180 this.localMethods = local.methods;
181 this.remoteMethods = remote.methods;
182 this.remoteType = new Interface(this.remoteMethods);
183 this.localMethodsMap = new HashMap<MethodTypeDefinition, Integer>();
184 this.remoteMethodsMap = new HashMap<MethodTypeDefinition, Integer>();
185 for (int i=0; i<localMethods.length; i++)
186 localMethodsMap.put(localMethods[i], i);
187 for (int i=0; i<remoteMethods.length; i++)
188 remoteMethodsMap.put(remoteMethods[i], i);
189 // remoteMethodsMap.trimToSize();
190 // localMethodsMap.trimToSize();
192 in = new InputStreamReadable( socket.getInputStream(), Long.MAX_VALUE );
193 out = new OutputStreamWriteable( socket.getOutputStream() );
\r
195 String threadName = "Connection-"+socket.getInetAddress().getHostAddress()+":"+socket.getPort();
\r
197 thread.setName( threadName );
202 public Interface getInterface() {
207 public Method getMethod(MethodTypeBinding binding)
208 throws MethodNotSupportedException {
209 // consumer suggests object bindings
210 MethodTypeDefinition description = binding.getMethodDefinition();
212 if (!remoteMethodsMap.containsKey(description)) {
\r
214 System.out.println("Method not found: "+description);
\r
215 System.out.println("Existing methods:" );
\r
216 for (MethodTypeDefinition k : remoteMethodsMap.keySet()) {
\r
217 System.out.print(k);
\r
218 if (k.getType().requestType.getComponentCount()>0) {
\r
219 System.out.print(System.identityHashCode( k.getType().requestType.getComponentType(0) ) );
\r
221 System.out.println();
\r
224 throw new MethodNotSupportedException(description.getName());
227 int id = remoteMethodsMap.get(description);
230 return new MethodImpl(id, binding);
231 } catch (SerializerConstructionException e) {
232 throw new MethodNotSupportedException(e);
237 public Method getMethod(MethodTypeDefinition description)
238 throws MethodNotSupportedException {
239 // producer suggests object bindings
240 if (!remoteMethodsMap.containsKey(description)) {
241 throw new MethodNotSupportedException(description.getName());
243 int id = remoteMethodsMap.get(description);
245 RecordBinding reqBinding = (RecordBinding) Bindings.getMutableBinding(description.getType().getRequestType());
246 Binding resBinding = Bindings.getMutableBinding(description.getType().getResponseType());
247 UnionBinding errBinding = (UnionBinding) Bindings.getMutableBinding(description.getType().getErrorType());
248 MethodTypeBinding binding = new MethodTypeBinding(description, reqBinding, resBinding, errBinding);
251 return new MethodImpl(id, binding);
252 } catch (SerializerConstructionException e) {
253 // Generic binding should work
254 throw new MethodNotSupportedException(e);
258 public Socket getSocket()
263 public interface ConnectionListener {
265 * There was an error and connection was closed
269 void onError(Exception error);
272 * close() was invoked
277 CopyOnWriteArrayList<ConnectionListener> listeners = new CopyOnWriteArrayList<ConnectionListener>();
279 public synchronized void addConnectionListener(ConnectionListener listener) {
280 listeners.add( listener );
283 public void removeConnectionListener(ConnectionListener listener) {
\r
284 listeners.remove( listener );
\r
287 class MethodImpl implements Method {
289 MethodTypeBinding methodBinding;
290 Serializer responseSerializer;
291 Serializer requestSerializer;
292 Serializer errorSerializer;
294 MethodImpl(int methodId, MethodTypeBinding methodBinding) throws SerializerConstructionException
296 this.methodId = methodId;
297 this.methodBinding = methodBinding;
298 this.requestSerializer = Bindings.getSerializer( methodBinding.getRequestBinding() );
299 this.responseSerializer = Bindings.getSerializer( methodBinding.getResponseBinding() );
300 this.errorSerializer = Bindings.getSerializer( methodBinding.getErrorBinding() );
304 public AsyncResult invoke(final Object request) {
306 final PendingRequest result = new PendingRequest(this, requestCounter.getAndIncrement());
307 requests.put(result.requestId, result);
\r
310 result.setInvokeException(new InvokeException(new ConnectionClosedException()));
312 writeExecutor.execute(new Runnable() {
314 public void run() {
\r
315 synchronized(TcpConnection.this) {
317 int size= requestSerializer.getSize(request, outIdentities);
318 if (size>maxSendSize) {
319 result.setInvokeException(new InvokeException(new MessageOverflowException()));
322 outIdentities.clear();
324 RequestHeader reqHeader = new RequestHeader();
325 reqHeader.methodId = methodId;
326 reqHeader.requestId = result.requestId;
327 MESSAGE_SERIALIZER.serialize(out, outIdentities, reqHeader);
328 outIdentities.clear();
330 requestSerializer.serialize(out, outIdentities, request);
331 outIdentities.clear();
333 } catch (IOException e) {
334 result.setInvokeException(new InvokeException(e));
335 } catch (RuntimeException e) {
336 result.setInvokeException(new InvokeException(e));
345 public MethodTypeBinding getMethodBinding() {
346 return methodBinding;
352 for (ConnectionListener listener : listeners)
\r
353 listener.onClosed();
\r
355 void setError(Exception e)
357 for (ConnectionListener listener : listeners)
363 * Get method interface that handles services locally (service requests by peer)
365 * @return local method interface
367 public MethodInterface getLocalMethodInterface()
369 return methodInterface;
373 * Get method interface that handles services locally (service requests by peer)
375 * @return local method interface
377 public MethodTypeDefinition[] getLocalMethodDescriptions()
382 public MethodInterface getRemoteMethodInterface() {
\r
387 * Close the connection. All pending service request are canceled.
388 * The socket is not closed.
390 public void close() {
392 // cancel all pending requests
393 ArrayList<PendingRequest> reqs = new ArrayList<PendingRequest>(requests.values());
394 for (PendingRequest pr : reqs) {
395 pr.setInvokeException(new InvokeException(new ConnectionClosedException()));
397 requests.values().removeAll(reqs);
400 // for (ConnectionListener listener : listeners)
\r
401 // listener.onClosed();
\r
405 * Get the active connection of current thread
\r
407 * @return Connection or <code>null</code> if current thread does not run connection
\r
409 public static TcpConnection getCurrentConnection() {
\r
410 Thread t = Thread.currentThread();
\r
411 if (t instanceof ConnectionThread == false) return null;
\r
412 ConnectionThread ct = (ConnectionThread) t;
\r
413 return ct.getConnection();
\r
417 * Connection Thread deserializes incoming messages from the TCP Stream.
\r
420 class ConnectionThread extends Thread {
\r
421 public ConnectionThread() {
\r
425 public TcpConnection getConnection() {
\r
426 return TcpConnection.this;
\r
429 public void run() {
\r
430 while (!Thread.interrupted()) {
\r
432 Message header = (Message) MESSAGE_SERIALIZER.deserialize(in, inIdentities);
\r
433 if (header instanceof RequestHeader) {
\r
434 final RequestHeader reqHeader = (RequestHeader) header;
\r
436 int size = in.readInt();
\r
437 if (size>maxRecvSize) {
\r
438 setError(new MessageOverflowException());
\r
442 int methodId = reqHeader.methodId;
\r
443 if (methodId<0||methodId>=localMethods.length) {
\r
444 setError(new Exception("ProtocolError"));
\r
447 MethodTypeDefinition methodDescription = localMethods[methodId];
\r
448 // Let back-end determine bindings
\r
450 final Method method = methodInterface.getMethod(methodDescription);
\r
451 final MethodTypeBinding methodBinding = method.getMethodBinding();
\r
452 // Deserialize payload
\r
453 final Object request = Bindings.getSerializerUnchecked(methodBinding.getRequestBinding()).deserialize(in, inIdentities);
\r
454 inIdentities.clear();
\r
457 method.invoke(request).setListener(new InvokeListener() {
\r
459 public void onCompleted(final Object response) {
\r
461 writeExecutor.execute(new Runnable() {
\r
463 public void run() {
\r
464 synchronized(TcpConnection.this) {
\r
466 Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getResponseBinding());
\r
467 int size = serializer.getSize(response, outIdentities);
\r
468 outIdentities.clear();
\r
469 if (size > maxSendSize) {
\r
470 ResponseTooLargeError tooLarge = new ResponseTooLargeError();
\r
471 tooLarge.requestId = reqHeader.requestId;
\r
472 MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);
\r
473 outIdentities.clear();
\r
477 ResponseHeader respHeader = new ResponseHeader();
\r
478 respHeader.requestId = reqHeader.requestId;
\r
479 MESSAGE_SERIALIZER.serialize(out, outIdentities, respHeader);
\r
480 outIdentities.clear();
\r
481 out.writeInt(size);
\r
483 serializer.serialize(out, outIdentities, response);
\r
484 outIdentities.clear();
\r
486 } catch (IOException e) {
\r
488 } catch (RuntimeException e) {
\r
495 public void onException(final Exception cause) {
\r
497 writeExecutor.execute(new Runnable() {
\r
499 public void run() {
\r
500 synchronized(TcpConnection.this) {
\r
502 Exception_ msg = new Exception_();
\r
503 msg.message = cause.getClass().getName()+": "+cause.getMessage();
\r
505 MESSAGE_SERIALIZER.serialize(out, outIdentities, msg);
\r
506 outIdentities.clear();
\r
508 } catch (IOException e) {
\r
510 } catch (RuntimeException e) {
\r
517 public void onExecutionError(final Object error) {
\r
519 writeExecutor.execute(new Runnable() {
\r
521 public void run() {
\r
522 synchronized(TcpConnection.this) {
\r
524 Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getErrorBinding());
\r
525 int size = serializer.getSize(error, outIdentities);
\r
526 outIdentities.clear();
\r
528 if (size > maxSendSize) {
\r
529 ResponseTooLargeError tooLarge = new ResponseTooLargeError();
\r
530 tooLarge.requestId = reqHeader.requestId;
\r
531 MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);
\r
532 outIdentities.clear();
\r
536 ExecutionError_ errorHeader = new ExecutionError_();
\r
537 errorHeader.requestId = reqHeader.requestId;
\r
538 MESSAGE_SERIALIZER.serialize(out, outIdentities, errorHeader);
\r
539 outIdentities.clear();
\r
540 out.writeInt(size);
\r
541 serializer.serialize(out, outIdentities, error);
\r
542 outIdentities.clear();
\r
544 } catch (IOException e) {
\r
546 } catch (RuntimeException e) {
\r
553 } catch (MethodNotSupportedException e) {
\r
554 in.skipBytes(size);
\r
555 // return with an error
\r
556 final InvalidMethodError error = new InvalidMethodError();
\r
557 error.requestId = reqHeader.requestId;
\r
558 writeExecutor.execute(new Runnable() {
\r
560 public void run() {
\r
561 synchronized(TcpConnection.this) {
\r
563 MESSAGE_SERIALIZER.serialize(out, outIdentities, error);
\r
564 outIdentities.clear();
\r
566 } catch (IOException e) {
\r
568 } catch (RuntimeException e) {
\r
577 } else if (header instanceof ResponseHeader) {
\r
578 int requestId = ((ResponseHeader)header).requestId;
\r
579 PendingRequest req = requests.remove(requestId);
\r
581 setError(new RuntimeException("Request by id "+requestId+" does not exist"));
\r
584 int size = in.readInt();
\r
585 if (size>maxRecvSize) {
\r
588 Object response = req.method.responseSerializer.deserialize(in, inIdentities);
\r
589 inIdentities.clear();
\r
590 req.setResponse(response);
\r
591 } else if (header instanceof ExecutionError_) {
\r
592 int requestId = ((ExecutionError_)header).requestId;
\r
593 PendingRequest req = requests.remove(requestId);
\r
595 setError(new RuntimeException("Request by id "+requestId+" does not exist"));
\r
598 int size = in.readInt();
\r
599 if (size>maxRecvSize) {
\r
602 Object executionError = req.method.errorSerializer.deserialize(in, inIdentities);
\r
603 inIdentities.clear();
\r
604 req.setExecutionError(executionError);
\r
605 } else if (header instanceof Exception_) {
\r
606 int requestId = ((Exception_)header).requestId;
\r
607 PendingRequest req = requests.remove(requestId);
\r
608 req.setExecutionError(new Exception(((Exception_)header).message));
\r
609 } else if (header instanceof InvalidMethodError) {
\r
610 int requestId = ((InvalidMethodError)header).requestId;
\r
611 PendingRequest req = requests.remove(requestId);
\r
612 req.setInvokeException(new InvokeException(new MethodNotSupportedException("?")));
\r
613 } else if (header instanceof ResponseTooLargeError) {
\r
614 int requestId = ((ResponseTooLargeError)header).requestId;
\r
615 PendingRequest req = requests.remove(requestId);
\r
616 req.setInvokeException(new InvokeException(new MessageOverflowException()));
\r
619 } catch (EOFException e) {
\r
622 } catch (SocketException e) {
\r
623 if (e.getMessage().equals("Socket Closed"))
\r
628 } catch (IOException e) {
\r
635 } catch (IOException e) {
\r
637 // Close pending requests
\r
642 // Thread that reads input data
643 ConnectionThread thread = new ConnectionThread();
645 class PendingRequest extends AsyncResultImpl {
652 public PendingRequest(MethodImpl method, int requestId) {
653 this.method = method;
654 this.requestId = requestId;
659 @Union({RequestHeader.class, ResponseHeader.class, ExecutionError_.class, Exception_.class, InvalidMethodError.class, ResponseTooLargeError.class})
660 public static class Message {}
662 public static class RequestHeader extends Message {
663 public int requestId;
666 public RequestHeader() {}
669 public static class ResponseHeader extends Message {
670 public int requestId;
672 public ResponseHeader() {}
675 // Error while invoking a method
676 public static class ExecutionError_ extends Message {
677 public int requestId;
679 public ExecutionError_() {}
682 // MethodName does not exist
683 public static class InvalidMethodError extends Message {
684 public int requestId;
685 public InvalidMethodError() {}
688 // Exception, not in method but somewhere else
689 public static class Exception_ extends Message {
690 public int requestId;
691 public String message;
692 public Exception_() {}
695 public static class ResponseTooLargeError extends Message {
696 public int requestId;
697 public ResponseTooLargeError() {}