X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.databoard%2Fsrc%2Forg%2Fsimantics%2Fdataboard%2Fmethod%2FTcpConnection.java;fp=bundles%2Forg.simantics.databoard%2Fsrc%2Forg%2Fsimantics%2Fdataboard%2Fmethod%2FTcpConnection.java;h=59e230cba3c2fbe98cb73105496493fc330f91df;hb=969bd23cab98a79ca9101af33334000879fb60c5;hp=0000000000000000000000000000000000000000;hpb=866dba5cd5a3929bbeae85991796acb212338a08;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.databoard/src/org/simantics/databoard/method/TcpConnection.java b/bundles/org.simantics.databoard/src/org/simantics/databoard/method/TcpConnection.java new file mode 100644 index 000000000..59e230cba --- /dev/null +++ b/bundles/org.simantics.databoard/src/org/simantics/databoard/method/TcpConnection.java @@ -0,0 +1,701 @@ +/******************************************************************************* + * Copyright (c) 2010 Association for Decentralized Information Management in + * Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.databoard.method; + +import gnu.trove.map.hash.TObjectIntHashMap; + +import java.io.EOFException; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.simantics.databoard.Bindings; +import org.simantics.databoard.annotations.Union; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.RecordBinding; +import org.simantics.databoard.binding.UnionBinding; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.serialization.SerializerConstructionException; +import org.simantics.databoard.util.binary.BinaryReadable; +import org.simantics.databoard.util.binary.BinaryWriteable; +import org.simantics.databoard.util.binary.InputStreamReadable; +import org.simantics.databoard.util.binary.OutputStreamWriteable; + +/** + * Connection is a class that handles request-response communication over a + * socket. + *

+ * Requests have asynchronous result. The result can be acquired using one of + * the three methods: + * 1) Blocking read AsyncResult.waitForResponse() + * 2) Poll AsyncResult.getResponse() + * 3) Listen AsyncResult.setListener() + *

+ * The socket must be established before Connection is instantiated. + * Closing connection does not close its Socket. + * If the socket is closed before connection there an error is thrown. + * The error is available by placing listener. + * The proper order to close a connection is to close Connection first + * and then Socket. + * + * @author Toni Kalajainen + */ +public class TcpConnection implements MethodInterface { + + public static final ExecutorService SHARED_EXECUTOR_SERVICE = + new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue()); + + static final Serializer MESSAGE_SERIALIZER = Bindings.getSerializerUnchecked( Bindings.getBindingUnchecked(Message.class) ); + static Charset UTF8 = Charset.forName("UTF8"); + + Handshake local, remote; + + Interface remoteType; + MethodTypeDefinition[] localMethods, remoteMethods; + HashMap localMethodsMap, remoteMethodsMap; + + Socket socket; + + // if false, there is an error in the socket or the connection has been shutdown + boolean active = true; + + // Objects used for handling local services + MethodInterface methodInterface; + + // Objects used for reading data + ConcurrentHashMap requests = new ConcurrentHashMap(); + List inIdentities = new ArrayList(); + BinaryReadable in; + int maxRecvSize; + + // Object used for writing data + public ExecutorService writeExecutor = SHARED_EXECUTOR_SERVICE; + TObjectIntHashMap outIdentities = new TObjectIntHashMap(); + BinaryWriteable out; + AtomicInteger requestCounter = new AtomicInteger(0); + int maxSendSize; + + // Cached method descriptions + Map methodTypes = new ConcurrentHashMap(); + + /** + * Handshake a socket + * + * @param socket + * @param localData local data + * @return the remote data + * @throws IOException + * @throws RuntimeException unexpected error (BindingException or EncodingException) + */ + public static Handshake handshake(final Socket socket, final Handshake localData) + throws IOException + { + final BinaryReadable bin = new InputStreamReadable( socket.getInputStream(), Long.MAX_VALUE ); + final BinaryWriteable bout = new OutputStreamWriteable( socket.getOutputStream() ); + ExecutorService writeExecutor = SHARED_EXECUTOR_SERVICE; + + // do hand-shake + final Exception[] writeError = new Exception[1]; + final Semaphore s = new Semaphore(0); + writeExecutor.execute(new Runnable() { + @Override + public void run() { + try { + TObjectIntHashMap outIdentities = new TObjectIntHashMap(); + Handshake.SERIALIZER.serialize(bout, outIdentities, localData); + bout.flush(); + outIdentities.clear(); + } catch (IOException e) { + writeError[0] = e; + } finally { + s.release(1); + } + }}); + + // Read remote peer's handshake + List inIdentities = new ArrayList(); + Handshake result = (Handshake) Handshake.SERIALIZER.deserialize(bin, inIdentities); + inIdentities.clear(); + + // Check that write was ok + try { + s.acquire(1); + Exception e = writeError[0]; + if (e!=null && e instanceof IOException) + throw (IOException) e; + if (e!=null) + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { +// writeExecutor.shutdown(); + } + + return result; + } + + /** + * Create a connection to a hand-shaken socket + * + * @param socket + * @param methodInterface local method handler + * @param localData + * @param remoteData + * @throws IOException + */ + public TcpConnection(Socket socket, MethodInterface methodInterface, Handshake localData, Handshake remoteData) + throws IOException { + if (socket==null || localData==null || remoteData==null) + throw new IllegalArgumentException("null arg"); + + this.methodInterface = methodInterface; + this.socket = socket; + this.local = localData; + this.remote = remoteData; + this.maxSendSize = Math.min(localData.sendMsgLimit, remoteData.recvMsgLimit); + this.maxRecvSize = Math.min(localData.recvMsgLimit, remoteData.sendMsgLimit); + + this.localMethods = local.methods; + this.remoteMethods = remote.methods; + this.remoteType = new Interface(this.remoteMethods); + this.localMethodsMap = new HashMap(); + this.remoteMethodsMap = new HashMap(); + for (int i=0; i0) { + System.out.print(System.identityHashCode( k.getType().requestType.getComponentType(0) ) ); + } + System.out.println(); + } +*/ + throw new MethodNotSupportedException(description.getName()); + } + + int id = remoteMethodsMap.get(description); + + try { + return new MethodImpl(id, binding); + } catch (SerializerConstructionException e) { + throw new MethodNotSupportedException(e); + } + } + + @Override + public Method getMethod(MethodTypeDefinition description) + throws MethodNotSupportedException { + // producer suggests object bindings + if (!remoteMethodsMap.containsKey(description)) { + throw new MethodNotSupportedException(description.getName()); + } + int id = remoteMethodsMap.get(description); + + RecordBinding reqBinding = (RecordBinding) Bindings.getMutableBinding(description.getType().getRequestType()); + Binding resBinding = Bindings.getMutableBinding(description.getType().getResponseType()); + UnionBinding errBinding = (UnionBinding) Bindings.getMutableBinding(description.getType().getErrorType()); + MethodTypeBinding binding = new MethodTypeBinding(description, reqBinding, resBinding, errBinding); + + try { + return new MethodImpl(id, binding); + } catch (SerializerConstructionException e) { + // Generic binding should work + throw new MethodNotSupportedException(e); + } + } + + public Socket getSocket() + { + return socket; + } + + public interface ConnectionListener { + /** + * There was an error and connection was closed + * + * @param error + */ + void onError(Exception error); + + /** + * close() was invoked + */ + void onClosed(); + } + + CopyOnWriteArrayList listeners = new CopyOnWriteArrayList(); + + public synchronized void addConnectionListener(ConnectionListener listener) { + listeners.add( listener ); + } + + public void removeConnectionListener(ConnectionListener listener) { + listeners.remove( listener ); + } + + class MethodImpl implements Method { + int methodId; + MethodTypeBinding methodBinding; + Serializer responseSerializer; + Serializer requestSerializer; + Serializer errorSerializer; + + MethodImpl(int methodId, MethodTypeBinding methodBinding) throws SerializerConstructionException + { + this.methodId = methodId; + this.methodBinding = methodBinding; + this.requestSerializer = Bindings.getSerializer( methodBinding.getRequestBinding() ); + this.responseSerializer = Bindings.getSerializer( methodBinding.getResponseBinding() ); + this.errorSerializer = Bindings.getSerializer( methodBinding.getErrorBinding() ); + } + + @Override + public AsyncResult invoke(final Object request) { + // Write, async + final PendingRequest result = new PendingRequest(this, requestCounter.getAndIncrement()); + requests.put(result.requestId, result); + + if (!active) { + result.setInvokeException(new InvokeException(new ConnectionClosedException())); + } else { + writeExecutor.execute(new Runnable() { + @Override + public void run() { + synchronized(TcpConnection.this) { + try { + int size= requestSerializer.getSize(request, outIdentities); + if (size>maxSendSize) { + result.setInvokeException(new InvokeException(new MessageOverflowException())); + return; + } + outIdentities.clear(); + + RequestHeader reqHeader = new RequestHeader(); + reqHeader.methodId = methodId; + reqHeader.requestId = result.requestId; + MESSAGE_SERIALIZER.serialize(out, outIdentities, reqHeader); + outIdentities.clear(); + out.writeInt(size); + requestSerializer.serialize(out, outIdentities, request); + outIdentities.clear(); + out.flush(); + } catch (IOException e) { + result.setInvokeException(new InvokeException(e)); + } catch (RuntimeException e) { + result.setInvokeException(new InvokeException(e)); + } + } + }}); + } + return result; + } + + @Override + public MethodTypeBinding getMethodBinding() { + return methodBinding; + } + } + + void setClosed() + { + for (ConnectionListener listener : listeners) + listener.onClosed(); + } + void setError(Exception e) + { + for (ConnectionListener listener : listeners) + listener.onError(e); + close(); + } + + /** + * Get method interface that handles services locally (service requests by peer) + * + * @return local method interface + */ + public MethodInterface getLocalMethodInterface() + { + return methodInterface; + } + + /** + * Get method interface that handles services locally (service requests by peer) + * + * @return local method interface + */ + public MethodTypeDefinition[] getLocalMethodDescriptions() + { + return localMethods; + } + + public MethodInterface getRemoteMethodInterface() { + return this; + } + + /** + * Close the connection. All pending service request are canceled. + * The socket is not closed. + */ + public void close() { + active = false; + // cancel all pending requests + ArrayList reqs = new ArrayList(requests.values()); + for (PendingRequest pr : reqs) { + pr.setInvokeException(new InvokeException(new ConnectionClosedException())); + } + requests.values().removeAll(reqs); + // shutdown inthread + thread.interrupt(); +// for (ConnectionListener listener : listeners) +// listener.onClosed(); + } + + /** + * Get the active connection of current thread + * + * @return Connection or null if current thread does not run connection + */ + public static TcpConnection getCurrentConnection() { + Thread t = Thread.currentThread(); + if (t instanceof ConnectionThread == false) return null; + ConnectionThread ct = (ConnectionThread) t; + return ct.getConnection(); + } + + /** + * Connection Thread deserializes incoming messages from the TCP Stream. + * + */ + class ConnectionThread extends Thread { + public ConnectionThread() { + setDaemon(true); + } + + public TcpConnection getConnection() { + return TcpConnection.this; + } + + public void run() { + while (!Thread.interrupted()) { + try { + Message header = (Message) MESSAGE_SERIALIZER.deserialize(in, inIdentities); + if (header instanceof RequestHeader) { + final RequestHeader reqHeader = (RequestHeader) header; + + int size = in.readInt(); + if (size>maxRecvSize) { + setError(new MessageOverflowException()); + return; + } + + int methodId = reqHeader.methodId; + if (methodId<0||methodId>=localMethods.length) { + setError(new Exception("ProtocolError")); + return; + } + MethodTypeDefinition methodDescription = localMethods[methodId]; + // Let back-end determine bindings + try { + final Method method = methodInterface.getMethod(methodDescription); + final MethodTypeBinding methodBinding = method.getMethodBinding(); + // Deserialize payload + final Object request = Bindings.getSerializerUnchecked(methodBinding.getRequestBinding()).deserialize(in, inIdentities); + inIdentities.clear(); + + // Invoke method + method.invoke(request).setListener(new InvokeListener() { + @Override + public void onCompleted(final Object response) { + // Write RESP + writeExecutor.execute(new Runnable() { + @Override + public void run() { + synchronized(TcpConnection.this) { + try { + Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getResponseBinding()); + int size = serializer.getSize(response, outIdentities); + outIdentities.clear(); + if (size > maxSendSize) { + ResponseTooLargeError tooLarge = new ResponseTooLargeError(); + tooLarge.requestId = reqHeader.requestId; + MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge); + outIdentities.clear(); + return; + } + + ResponseHeader respHeader = new ResponseHeader(); + respHeader.requestId = reqHeader.requestId; + MESSAGE_SERIALIZER.serialize(out, outIdentities, respHeader); + outIdentities.clear(); + out.writeInt(size); + + serializer.serialize(out, outIdentities, response); + outIdentities.clear(); + out.flush(); + } catch (IOException e) { + setError(e); + } catch (RuntimeException e) { + setError(e); + } + } + }}); + } + @Override + public void onException(final Exception cause) { + // Write ERRO + writeExecutor.execute(new Runnable() { + @Override + public void run() { + synchronized(TcpConnection.this) { + try { + Exception_ msg = new Exception_(); + msg.message = cause.getClass().getName()+": "+cause.getMessage(); + + MESSAGE_SERIALIZER.serialize(out, outIdentities, msg); + outIdentities.clear(); + out.flush(); + } catch (IOException e) { + setError(e); + } catch (RuntimeException e) { + setError(e); + } + } + }}); + } + @Override + public void onExecutionError(final Object error) { + // Write ERRO + writeExecutor.execute(new Runnable() { + @Override + public void run() { + synchronized(TcpConnection.this) { + try { + Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getErrorBinding()); + int size = serializer.getSize(error, outIdentities); + outIdentities.clear(); + + if (size > maxSendSize) { + ResponseTooLargeError tooLarge = new ResponseTooLargeError(); + tooLarge.requestId = reqHeader.requestId; + MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge); + outIdentities.clear(); + return; + } + + ExecutionError_ errorHeader = new ExecutionError_(); + errorHeader.requestId = reqHeader.requestId; + MESSAGE_SERIALIZER.serialize(out, outIdentities, errorHeader); + outIdentities.clear(); + out.writeInt(size); + serializer.serialize(out, outIdentities, error); + outIdentities.clear(); + out.flush(); + } catch (IOException e) { + setError(e); + } catch (RuntimeException e) { + setError(e); + } + } + }}); + }}); + + } catch (MethodNotSupportedException e) { + in.skipBytes(size); + // return with an error + final InvalidMethodError error = new InvalidMethodError(); + error.requestId = reqHeader.requestId; + writeExecutor.execute(new Runnable() { + @Override + public void run() { + synchronized(TcpConnection.this) { + try { + MESSAGE_SERIALIZER.serialize(out, outIdentities, error); + outIdentities.clear(); + out.flush(); + } catch (IOException e) { + setError(e); + } catch (RuntimeException e) { + setError(e); + } + } + }}); + continue; + } + + + } else if (header instanceof ResponseHeader) { + int requestId = ((ResponseHeader)header).requestId; + PendingRequest req = requests.remove(requestId); + if (req==null) { + setError(new RuntimeException("Request by id "+requestId+" does not exist")); + return; + } + int size = in.readInt(); + if (size>maxRecvSize) { + // TODO SOMETHING + } + Object response = req.method.responseSerializer.deserialize(in, inIdentities); + inIdentities.clear(); + req.setResponse(response); + } else if (header instanceof ExecutionError_) { + int requestId = ((ExecutionError_)header).requestId; + PendingRequest req = requests.remove(requestId); + if (req==null) { + setError(new RuntimeException("Request by id "+requestId+" does not exist")); + return; + } + int size = in.readInt(); + if (size>maxRecvSize) { + // TODO SOMETHING + } + Object executionError = req.method.errorSerializer.deserialize(in, inIdentities); + inIdentities.clear(); + req.setExecutionError(executionError); + } else if (header instanceof Exception_) { + int requestId = ((Exception_)header).requestId; + PendingRequest req = requests.remove(requestId); + req.setExecutionError(new Exception(((Exception_)header).message)); + } else if (header instanceof InvalidMethodError) { + int requestId = ((InvalidMethodError)header).requestId; + PendingRequest req = requests.remove(requestId); + req.setInvokeException(new InvokeException(new MethodNotSupportedException("?"))); + } else if (header instanceof ResponseTooLargeError) { + int requestId = ((ResponseTooLargeError)header).requestId; + PendingRequest req = requests.remove(requestId); + req.setInvokeException(new InvokeException(new MessageOverflowException())); + } + + } catch (EOFException e) { + setClosed(); + break; + } catch (SocketException e) { + if (e.getMessage().equals("Socket Closed")) + setClosed(); + else + setError(e); + break; + } catch (IOException e) { + setError(e); + break; + } + } + try { + socket.close(); + } catch (IOException e) { + } + // Close pending requests + close(); + }; + } + + // Thread that reads input data + ConnectionThread thread = new ConnectionThread(); + + class PendingRequest extends AsyncResultImpl { + + MethodImpl method; + + // request id + int requestId; + + public PendingRequest(MethodImpl method, int requestId) { + this.method = method; + this.requestId = requestId; + } + } + + + @Union({RequestHeader.class, ResponseHeader.class, ExecutionError_.class, Exception_.class, InvalidMethodError.class, ResponseTooLargeError.class}) + public static class Message {} + + public static class RequestHeader extends Message { + public int requestId; + public int methodId; + // Request Object + public RequestHeader() {} + } + + public static class ResponseHeader extends Message { + public int requestId; + // Response object + public ResponseHeader() {} + } + + // Error while invoking a method + public static class ExecutionError_ extends Message { + public int requestId; + // Error object + public ExecutionError_() {} + } + + // MethodName does not exist + public static class InvalidMethodError extends Message { + public int requestId; + public InvalidMethodError() {} + } + + // Exception, not in method but somewhere else + public static class Exception_ extends Message { + public int requestId; + public String message; + public Exception_() {} + } + + public static class ResponseTooLargeError extends Message { + public int requestId; + public ResponseTooLargeError() {} + } + +} +