1 /*******************************************************************************
2 * Copyright (c) 2010 Association for Decentralized Information Management in
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.databoard.method;
14 import gnu.trove.map.hash.TObjectIntHashMap;
16 import java.io.EOFException;
17 import java.io.IOException;
18 import java.net.Socket;
19 import java.net.SocketException;
20 import java.nio.charset.Charset;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.SynchronousQueue;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
34 import org.simantics.databoard.Bindings;
35 import org.simantics.databoard.annotations.Union;
36 import org.simantics.databoard.binding.Binding;
37 import org.simantics.databoard.binding.RecordBinding;
38 import org.simantics.databoard.binding.UnionBinding;
39 import org.simantics.databoard.serialization.Serializer;
40 import org.simantics.databoard.serialization.SerializerConstructionException;
41 import org.simantics.databoard.util.binary.BinaryReadable;
42 import org.simantics.databoard.util.binary.BinaryWriteable;
43 import org.simantics.databoard.util.binary.InputStreamReadable;
44 import org.simantics.databoard.util.binary.OutputStreamWriteable;
47 * Connection is a class that handles request-response communication over a
50 * Requests have asynchronous result. The result can be acquired using one of
52 * 1) Blocking read AsyncResult.waitForResponse()
53 * 2) Poll AsyncResult.getResponse()
54 * 3) Listen AsyncResult.setListener()
56 * The socket must be established before Connection is instantiated.
57 * Closing connection does not close its Socket.
58 * If the socket is closed before connection there an error is thrown.
59 * The error is available by placing listener.
60 * The proper order to close a connection is to close Connection first
63 * @author Toni Kalajainen <toni.kalajainen@vtt.fi>
65 public class TcpConnection implements MethodInterface {
67 public static final ExecutorService SHARED_EXECUTOR_SERVICE =
68 new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
70 static final Serializer MESSAGE_SERIALIZER = Bindings.getSerializerUnchecked( Bindings.getBindingUnchecked(Message.class) );
71 static Charset UTF8 = Charset.forName("UTF8");
73 Handshake local, remote;
76 MethodTypeDefinition[] localMethods, remoteMethods;
77 HashMap<MethodTypeDefinition, Integer> localMethodsMap, remoteMethodsMap;
81 // if false, there is an error in the socket or the connection has been shutdown
82 boolean active = true;
84 // Objects used for handling local services
85 MethodInterface methodInterface;
87 // Objects used for reading data
88 ConcurrentHashMap<Integer, PendingRequest> requests = new ConcurrentHashMap<Integer, PendingRequest>();
89 List<Object> inIdentities = new ArrayList<Object>();
93 // Object used for writing data
94 public ExecutorService writeExecutor = SHARED_EXECUTOR_SERVICE;
95 TObjectIntHashMap<Object> outIdentities = new TObjectIntHashMap<Object>();
97 AtomicInteger requestCounter = new AtomicInteger(0);
100 // Cached method descriptions
101 Map<String, MethodType> methodTypes = new ConcurrentHashMap<String, MethodType>();
107 * @param localData local data
108 * @return the remote data
109 * @throws IOException
110 * @throws RuntimeException unexpected error (BindingException or EncodingException)
112 public static Handshake handshake(final Socket socket, final Handshake localData)
115 final BinaryReadable bin = new InputStreamReadable( socket.getInputStream(), Long.MAX_VALUE );
116 final BinaryWriteable bout = new OutputStreamWriteable( socket.getOutputStream() );
117 ExecutorService writeExecutor = SHARED_EXECUTOR_SERVICE;
120 final Exception[] writeError = new Exception[1];
121 final Semaphore s = new Semaphore(0);
122 writeExecutor.execute(new Runnable() {
126 TObjectIntHashMap<Object> outIdentities = new TObjectIntHashMap<Object>();
127 Handshake.SERIALIZER.serialize(bout, outIdentities, localData);
129 outIdentities.clear();
130 } catch (IOException e) {
137 // Read remote peer's handshake
138 List<Object> inIdentities = new ArrayList<Object>();
139 Handshake result = (Handshake) Handshake.SERIALIZER.deserialize(bin, inIdentities);
140 inIdentities.clear();
142 // Check that write was ok
145 Exception e = writeError[0];
146 if (e!=null && e instanceof IOException)
147 throw (IOException) e;
149 throw new RuntimeException(e);
150 } catch (InterruptedException e) {
151 throw new RuntimeException(e);
153 // writeExecutor.shutdown();
160 * Create a connection to a hand-shaken socket
163 * @param methodInterface local method handler
166 * @throws IOException
168 public TcpConnection(Socket socket, MethodInterface methodInterface, Handshake localData, Handshake remoteData)
170 if (socket==null || localData==null || remoteData==null)
171 throw new IllegalArgumentException("null arg");
173 this.methodInterface = methodInterface;
174 this.socket = socket;
175 this.local = localData;
176 this.remote = remoteData;
177 this.maxSendSize = Math.min(localData.sendMsgLimit, remoteData.recvMsgLimit);
178 this.maxRecvSize = Math.min(localData.recvMsgLimit, remoteData.sendMsgLimit);
180 this.localMethods = local.methods;
181 this.remoteMethods = remote.methods;
182 this.remoteType = new Interface(this.remoteMethods);
183 this.localMethodsMap = new HashMap<MethodTypeDefinition, Integer>();
184 this.remoteMethodsMap = new HashMap<MethodTypeDefinition, Integer>();
185 for (int i=0; i<localMethods.length; i++)
186 localMethodsMap.put(localMethods[i], i);
187 for (int i=0; i<remoteMethods.length; i++)
188 remoteMethodsMap.put(remoteMethods[i], i);
189 // remoteMethodsMap.trimToSize();
190 // localMethodsMap.trimToSize();
192 in = new InputStreamReadable( socket.getInputStream(), Long.MAX_VALUE );
193 out = new OutputStreamWriteable( socket.getOutputStream() );
195 String threadName = "Connection-"+socket.getInetAddress().getHostAddress()+":"+socket.getPort();
197 thread.setName( threadName );
202 public Interface getInterface() {
207 public Method getMethod(MethodTypeBinding binding)
208 throws MethodNotSupportedException {
209 // consumer suggests object bindings
210 MethodTypeDefinition description = binding.getMethodDefinition();
212 if (!remoteMethodsMap.containsKey(description)) {
214 System.out.println("Method not found: "+description);
215 System.out.println("Existing methods:" );
216 for (MethodTypeDefinition k : remoteMethodsMap.keySet()) {
218 if (k.getType().requestType.getComponentCount()>0) {
219 System.out.print(System.identityHashCode( k.getType().requestType.getComponentType(0) ) );
221 System.out.println();
224 throw new MethodNotSupportedException(description.getName());
227 int id = remoteMethodsMap.get(description);
230 return new MethodImpl(id, binding);
231 } catch (SerializerConstructionException e) {
232 throw new MethodNotSupportedException(e);
237 public Method getMethod(MethodTypeDefinition description)
238 throws MethodNotSupportedException {
239 // producer suggests object bindings
240 if (!remoteMethodsMap.containsKey(description)) {
241 throw new MethodNotSupportedException(description.getName());
243 int id = remoteMethodsMap.get(description);
245 RecordBinding reqBinding = (RecordBinding) Bindings.getMutableBinding(description.getType().getRequestType());
246 Binding resBinding = Bindings.getMutableBinding(description.getType().getResponseType());
247 UnionBinding errBinding = (UnionBinding) Bindings.getMutableBinding(description.getType().getErrorType());
248 MethodTypeBinding binding = new MethodTypeBinding(description, reqBinding, resBinding, errBinding);
251 return new MethodImpl(id, binding);
252 } catch (SerializerConstructionException e) {
253 // Generic binding should work
254 throw new MethodNotSupportedException(e);
258 public Socket getSocket()
263 public interface ConnectionListener {
265 * There was an error and connection was closed
269 void onError(Exception error);
272 * close() was invoked
277 CopyOnWriteArrayList<ConnectionListener> listeners = new CopyOnWriteArrayList<ConnectionListener>();
279 public synchronized void addConnectionListener(ConnectionListener listener) {
280 listeners.add( listener );
283 public void removeConnectionListener(ConnectionListener listener) {
284 listeners.remove( listener );
287 class MethodImpl implements Method {
289 MethodTypeBinding methodBinding;
290 Serializer responseSerializer;
291 Serializer requestSerializer;
292 Serializer errorSerializer;
294 MethodImpl(int methodId, MethodTypeBinding methodBinding) throws SerializerConstructionException
296 this.methodId = methodId;
297 this.methodBinding = methodBinding;
298 this.requestSerializer = Bindings.getSerializer( methodBinding.getRequestBinding() );
299 this.responseSerializer = Bindings.getSerializer( methodBinding.getResponseBinding() );
300 this.errorSerializer = Bindings.getSerializer( methodBinding.getErrorBinding() );
304 public AsyncResult invoke(final Object request) {
306 final PendingRequest result = new PendingRequest(this, requestCounter.getAndIncrement());
307 requests.put(result.requestId, result);
310 result.setInvokeException(new InvokeException(new ConnectionClosedException()));
312 writeExecutor.execute(new Runnable() {
315 synchronized(TcpConnection.this) {
317 int size= requestSerializer.getSize(request, outIdentities);
318 if (size>maxSendSize) {
319 result.setInvokeException(new InvokeException(new MessageOverflowException()));
322 outIdentities.clear();
324 RequestHeader reqHeader = new RequestHeader();
325 reqHeader.methodId = methodId;
326 reqHeader.requestId = result.requestId;
327 MESSAGE_SERIALIZER.serialize(out, outIdentities, reqHeader);
328 outIdentities.clear();
330 requestSerializer.serialize(out, outIdentities, request);
331 outIdentities.clear();
333 } catch (IOException e) {
334 result.setInvokeException(new InvokeException(e));
335 } catch (RuntimeException e) {
336 result.setInvokeException(new InvokeException(e));
345 public MethodTypeBinding getMethodBinding() {
346 return methodBinding;
352 for (ConnectionListener listener : listeners)
355 void setError(Exception e)
357 for (ConnectionListener listener : listeners)
363 * Get method interface that handles services locally (service requests by peer)
365 * @return local method interface
367 public MethodInterface getLocalMethodInterface()
369 return methodInterface;
373 * Get method interface that handles services locally (service requests by peer)
375 * @return local method interface
377 public MethodTypeDefinition[] getLocalMethodDescriptions()
382 public MethodInterface getRemoteMethodInterface() {
387 * Close the connection. All pending service request are canceled.
388 * The socket is not closed.
390 public void close() {
392 // cancel all pending requests
393 ArrayList<PendingRequest> reqs = new ArrayList<PendingRequest>(requests.values());
394 for (PendingRequest pr : reqs) {
395 pr.setInvokeException(new InvokeException(new ConnectionClosedException()));
397 requests.values().removeAll(reqs);
400 // for (ConnectionListener listener : listeners)
401 // listener.onClosed();
405 * Get the active connection of current thread
407 * @return Connection or <code>null</code> if current thread does not run connection
409 public static TcpConnection getCurrentConnection() {
410 Thread t = Thread.currentThread();
411 if (t instanceof ConnectionThread == false) return null;
412 ConnectionThread ct = (ConnectionThread) t;
413 return ct.getConnection();
417 * Connection Thread deserializes incoming messages from the TCP Stream.
420 class ConnectionThread extends Thread {
421 public ConnectionThread() {
425 public TcpConnection getConnection() {
426 return TcpConnection.this;
430 while (!Thread.interrupted()) {
432 Message header = (Message) MESSAGE_SERIALIZER.deserialize(in, inIdentities);
433 if (header instanceof RequestHeader) {
434 final RequestHeader reqHeader = (RequestHeader) header;
436 int size = in.readInt();
437 if (size>maxRecvSize) {
438 setError(new MessageOverflowException());
442 int methodId = reqHeader.methodId;
443 if (methodId<0||methodId>=localMethods.length) {
444 setError(new Exception("ProtocolError"));
447 MethodTypeDefinition methodDescription = localMethods[methodId];
448 // Let back-end determine bindings
450 final Method method = methodInterface.getMethod(methodDescription);
451 final MethodTypeBinding methodBinding = method.getMethodBinding();
452 // Deserialize payload
453 final Object request = Bindings.getSerializerUnchecked(methodBinding.getRequestBinding()).deserialize(in, inIdentities);
454 inIdentities.clear();
457 method.invoke(request).setListener(new InvokeListener() {
459 public void onCompleted(final Object response) {
461 writeExecutor.execute(new Runnable() {
464 synchronized(TcpConnection.this) {
466 Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getResponseBinding());
467 int size = serializer.getSize(response, outIdentities);
468 outIdentities.clear();
469 if (size > maxSendSize) {
470 ResponseTooLargeError tooLarge = new ResponseTooLargeError();
471 tooLarge.requestId = reqHeader.requestId;
472 MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);
473 outIdentities.clear();
477 ResponseHeader respHeader = new ResponseHeader();
478 respHeader.requestId = reqHeader.requestId;
479 MESSAGE_SERIALIZER.serialize(out, outIdentities, respHeader);
480 outIdentities.clear();
483 serializer.serialize(out, outIdentities, response);
484 outIdentities.clear();
486 } catch (IOException e) {
488 } catch (RuntimeException e) {
495 public void onException(final Exception cause) {
497 writeExecutor.execute(new Runnable() {
500 synchronized(TcpConnection.this) {
502 Exception_ msg = new Exception_();
503 msg.message = cause.getClass().getName()+": "+cause.getMessage();
505 MESSAGE_SERIALIZER.serialize(out, outIdentities, msg);
506 outIdentities.clear();
508 } catch (IOException e) {
510 } catch (RuntimeException e) {
517 public void onExecutionError(final Object error) {
519 writeExecutor.execute(new Runnable() {
522 synchronized(TcpConnection.this) {
524 Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getErrorBinding());
525 int size = serializer.getSize(error, outIdentities);
526 outIdentities.clear();
528 if (size > maxSendSize) {
529 ResponseTooLargeError tooLarge = new ResponseTooLargeError();
530 tooLarge.requestId = reqHeader.requestId;
531 MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);
532 outIdentities.clear();
536 ExecutionError_ errorHeader = new ExecutionError_();
537 errorHeader.requestId = reqHeader.requestId;
538 MESSAGE_SERIALIZER.serialize(out, outIdentities, errorHeader);
539 outIdentities.clear();
541 serializer.serialize(out, outIdentities, error);
542 outIdentities.clear();
544 } catch (IOException e) {
546 } catch (RuntimeException e) {
553 } catch (MethodNotSupportedException e) {
555 // return with an error
556 final InvalidMethodError error = new InvalidMethodError();
557 error.requestId = reqHeader.requestId;
558 writeExecutor.execute(new Runnable() {
561 synchronized(TcpConnection.this) {
563 MESSAGE_SERIALIZER.serialize(out, outIdentities, error);
564 outIdentities.clear();
566 } catch (IOException e) {
568 } catch (RuntimeException e) {
577 } else if (header instanceof ResponseHeader) {
578 int requestId = ((ResponseHeader)header).requestId;
579 PendingRequest req = requests.remove(requestId);
581 setError(new RuntimeException("Request by id "+requestId+" does not exist"));
584 int size = in.readInt();
585 if (size>maxRecvSize) {
588 Object response = req.method.responseSerializer.deserialize(in, inIdentities);
589 inIdentities.clear();
590 req.setResponse(response);
591 } else if (header instanceof ExecutionError_) {
592 int requestId = ((ExecutionError_)header).requestId;
593 PendingRequest req = requests.remove(requestId);
595 setError(new RuntimeException("Request by id "+requestId+" does not exist"));
598 int size = in.readInt();
599 if (size>maxRecvSize) {
602 Object executionError = req.method.errorSerializer.deserialize(in, inIdentities);
603 inIdentities.clear();
604 req.setExecutionError(executionError);
605 } else if (header instanceof Exception_) {
606 int requestId = ((Exception_)header).requestId;
607 PendingRequest req = requests.remove(requestId);
608 req.setExecutionError(new Exception(((Exception_)header).message));
609 } else if (header instanceof InvalidMethodError) {
610 int requestId = ((InvalidMethodError)header).requestId;
611 PendingRequest req = requests.remove(requestId);
612 req.setInvokeException(new InvokeException(new MethodNotSupportedException("?")));
613 } else if (header instanceof ResponseTooLargeError) {
614 int requestId = ((ResponseTooLargeError)header).requestId;
615 PendingRequest req = requests.remove(requestId);
616 req.setInvokeException(new InvokeException(new MessageOverflowException()));
619 } catch (EOFException e) {
622 } catch (SocketException e) {
623 if (e.getMessage().equals("Socket Closed"))
628 } catch (IOException e) {
635 } catch (IOException e) {
637 // Close pending requests
642 // Thread that reads input data
643 ConnectionThread thread = new ConnectionThread();
645 class PendingRequest extends AsyncResultImpl {
652 public PendingRequest(MethodImpl method, int requestId) {
653 this.method = method;
654 this.requestId = requestId;
659 @Union({RequestHeader.class, ResponseHeader.class, ExecutionError_.class, Exception_.class, InvalidMethodError.class, ResponseTooLargeError.class})
660 public static class Message {}
662 public static class RequestHeader extends Message {
663 public int requestId;
666 public RequestHeader() {}
669 public static class ResponseHeader extends Message {
670 public int requestId;
672 public ResponseHeader() {}
675 // Error while invoking a method
676 public static class ExecutionError_ extends Message {
677 public int requestId;
679 public ExecutionError_() {}
682 // MethodName does not exist
683 public static class InvalidMethodError extends Message {
684 public int requestId;
685 public InvalidMethodError() {}
688 // Exception, not in method but somewhere else
689 public static class Exception_ extends Message {
690 public int requestId;
691 public String message;
692 public Exception_() {}
695 public static class ResponseTooLargeError extends Message {
696 public int requestId;
697 public ResponseTooLargeError() {}