X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.databoard%2Fsrc%2Forg%2Fsimantics%2Fdataboard%2Fmethod%2FTcpConnection.java;h=b6783bfefaa901ff7b6d4b270bfeacf3218dfe2d;hp=59e230cba3c2fbe98cb73105496493fc330f91df;hb=0ae2b770234dfc3cbb18bd38f324125cf0faca07;hpb=24e2b34260f219f0d1644ca7a138894980e25b14 diff --git a/bundles/org.simantics.databoard/src/org/simantics/databoard/method/TcpConnection.java b/bundles/org.simantics.databoard/src/org/simantics/databoard/method/TcpConnection.java index 59e230cba..b6783bfef 100644 --- a/bundles/org.simantics.databoard/src/org/simantics/databoard/method/TcpConnection.java +++ b/bundles/org.simantics.databoard/src/org/simantics/databoard/method/TcpConnection.java @@ -1,47 +1,47 @@ -/******************************************************************************* - * Copyright (c) 2010 Association for Decentralized Information Management in - * Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ +/******************************************************************************* + * Copyright (c) 2010 Association for Decentralized Information Management in + * Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ package org.simantics.databoard.method; -import gnu.trove.map.hash.TObjectIntHashMap; - -import java.io.EOFException; -import java.io.IOException; -import java.net.Socket; -import java.net.SocketException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.simantics.databoard.Bindings; -import org.simantics.databoard.annotations.Union; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.binding.RecordBinding; -import org.simantics.databoard.binding.UnionBinding; -import org.simantics.databoard.serialization.Serializer; -import org.simantics.databoard.serialization.SerializerConstructionException; -import org.simantics.databoard.util.binary.BinaryReadable; -import org.simantics.databoard.util.binary.BinaryWriteable; -import org.simantics.databoard.util.binary.InputStreamReadable; -import org.simantics.databoard.util.binary.OutputStreamWriteable; +import gnu.trove.map.hash.TObjectIntHashMap; + +import java.io.EOFException; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.simantics.databoard.Bindings; +import org.simantics.databoard.annotations.Union; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.RecordBinding; +import org.simantics.databoard.binding.UnionBinding; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.serialization.SerializerConstructionException; +import org.simantics.databoard.util.binary.BinaryReadable; +import org.simantics.databoard.util.binary.BinaryWriteable; +import org.simantics.databoard.util.binary.InputStreamReadable; +import org.simantics.databoard.util.binary.OutputStreamWriteable; /** * Connection is a class that handles request-response communication over a @@ -63,9 +63,9 @@ import org.simantics.databoard.util.binary.OutputStreamWriteable; * @author Toni Kalajainen */ public class TcpConnection implements MethodInterface { - - public static final ExecutorService SHARED_EXECUTOR_SERVICE = - new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue()); + + public static final ExecutorService SHARED_EXECUTOR_SERVICE = + new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue()); static final Serializer MESSAGE_SERIALIZER = Bindings.getSerializerUnchecked( Bindings.getBindingUnchecked(Message.class) ); static Charset UTF8 = Charset.forName("UTF8"); @@ -190,10 +190,10 @@ public class TcpConnection implements MethodInterface { // localMethodsMap.trimToSize(); in = new InputStreamReadable( socket.getInputStream(), Long.MAX_VALUE ); - out = new OutputStreamWriteable( socket.getOutputStream() ); - - String threadName = "Connection-"+socket.getInetAddress().getHostAddress()+":"+socket.getPort(); - + out = new OutputStreamWriteable( socket.getOutputStream() ); + + String threadName = "Connection-"+socket.getInetAddress().getHostAddress()+":"+socket.getPort(); + thread.setName( threadName ); thread.start(); } @@ -208,18 +208,18 @@ public class TcpConnection implements MethodInterface { throws MethodNotSupportedException { // consumer suggests object bindings MethodTypeDefinition description = binding.getMethodDefinition(); - - if (!remoteMethodsMap.containsKey(description)) { - /* - System.out.println("Method not found: "+description); - System.out.println("Existing methods:" ); - for (MethodTypeDefinition k : remoteMethodsMap.keySet()) { - System.out.print(k); - if (k.getType().requestType.getComponentCount()>0) { - System.out.print(System.identityHashCode( k.getType().requestType.getComponentType(0) ) ); - } - System.out.println(); - } + + if (!remoteMethodsMap.containsKey(description)) { + /* + System.out.println("Method not found: "+description); + System.out.println("Existing methods:" ); + for (MethodTypeDefinition k : remoteMethodsMap.keySet()) { + System.out.print(k); + if (k.getType().requestType.getComponentCount()>0) { + System.out.print(System.identityHashCode( k.getType().requestType.getComponentType(0) ) ); + } + System.out.println(); + } */ throw new MethodNotSupportedException(description.getName()); } @@ -273,15 +273,15 @@ public class TcpConnection implements MethodInterface { */ void onClosed(); } - + CopyOnWriteArrayList listeners = new CopyOnWriteArrayList(); public synchronized void addConnectionListener(ConnectionListener listener) { listeners.add( listener ); - } - - public void removeConnectionListener(ConnectionListener listener) { - listeners.remove( listener ); + } + + public void removeConnectionListener(ConnectionListener listener) { + listeners.remove( listener ); } class MethodImpl implements Method { @@ -304,14 +304,14 @@ public class TcpConnection implements MethodInterface { public AsyncResult invoke(final Object request) { // Write, async final PendingRequest result = new PendingRequest(this, requestCounter.getAndIncrement()); - requests.put(result.requestId, result); + requests.put(result.requestId, result); if (!active) { result.setInvokeException(new InvokeException(new ConnectionClosedException())); } else { writeExecutor.execute(new Runnable() { @Override - public void run() { + public void run() { synchronized(TcpConnection.this) { try { int size= requestSerializer.getSize(request, outIdentities); @@ -334,7 +334,7 @@ public class TcpConnection implements MethodInterface { result.setInvokeException(new InvokeException(e)); } catch (RuntimeException e) { result.setInvokeException(new InvokeException(e)); - } + } } }}); } @@ -346,14 +346,14 @@ public class TcpConnection implements MethodInterface { return methodBinding; } } - - void setClosed() - { - for (ConnectionListener listener : listeners) - listener.onClosed(); + + void setClosed() + { + for (ConnectionListener listener : listeners) + listener.onClosed(); } void setError(Exception e) - { + { for (ConnectionListener listener : listeners) listener.onError(e); close(); @@ -378,10 +378,10 @@ public class TcpConnection implements MethodInterface { { return localMethods; } - - public MethodInterface getRemoteMethodInterface() { - return this; - } + + public MethodInterface getRemoteMethodInterface() { + return this; + } /** * Close the connection. All pending service request are canceled. @@ -397,246 +397,246 @@ public class TcpConnection implements MethodInterface { requests.values().removeAll(reqs); // shutdown inthread thread.interrupt(); -// for (ConnectionListener listener : listeners) -// listener.onClosed(); - } - - /** - * Get the active connection of current thread - * - * @return Connection or null if current thread does not run connection - */ - public static TcpConnection getCurrentConnection() { - Thread t = Thread.currentThread(); - if (t instanceof ConnectionThread == false) return null; - ConnectionThread ct = (ConnectionThread) t; - return ct.getConnection(); - } - - /** - * Connection Thread deserializes incoming messages from the TCP Stream. - * - */ - class ConnectionThread extends Thread { - public ConnectionThread() { - setDaemon(true); - } - - public TcpConnection getConnection() { - return TcpConnection.this; - } - - public void run() { - while (!Thread.interrupted()) { - try { - Message header = (Message) MESSAGE_SERIALIZER.deserialize(in, inIdentities); - if (header instanceof RequestHeader) { - final RequestHeader reqHeader = (RequestHeader) header; - - int size = in.readInt(); - if (size>maxRecvSize) { - setError(new MessageOverflowException()); - return; - } - - int methodId = reqHeader.methodId; - if (methodId<0||methodId>=localMethods.length) { - setError(new Exception("ProtocolError")); - return; - } - MethodTypeDefinition methodDescription = localMethods[methodId]; - // Let back-end determine bindings - try { - final Method method = methodInterface.getMethod(methodDescription); - final MethodTypeBinding methodBinding = method.getMethodBinding(); - // Deserialize payload - final Object request = Bindings.getSerializerUnchecked(methodBinding.getRequestBinding()).deserialize(in, inIdentities); - inIdentities.clear(); - - // Invoke method - method.invoke(request).setListener(new InvokeListener() { - @Override - public void onCompleted(final Object response) { - // Write RESP - writeExecutor.execute(new Runnable() { - @Override - public void run() { - synchronized(TcpConnection.this) { - try { - Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getResponseBinding()); - int size = serializer.getSize(response, outIdentities); - outIdentities.clear(); - if (size > maxSendSize) { - ResponseTooLargeError tooLarge = new ResponseTooLargeError(); - tooLarge.requestId = reqHeader.requestId; - MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge); - outIdentities.clear(); - return; - } - - ResponseHeader respHeader = new ResponseHeader(); - respHeader.requestId = reqHeader.requestId; - MESSAGE_SERIALIZER.serialize(out, outIdentities, respHeader); - outIdentities.clear(); - out.writeInt(size); - - serializer.serialize(out, outIdentities, response); - outIdentities.clear(); - out.flush(); - } catch (IOException e) { - setError(e); - } catch (RuntimeException e) { - setError(e); - } - } - }}); - } - @Override - public void onException(final Exception cause) { - // Write ERRO - writeExecutor.execute(new Runnable() { - @Override - public void run() { - synchronized(TcpConnection.this) { - try { - Exception_ msg = new Exception_(); - msg.message = cause.getClass().getName()+": "+cause.getMessage(); - - MESSAGE_SERIALIZER.serialize(out, outIdentities, msg); - outIdentities.clear(); - out.flush(); - } catch (IOException e) { - setError(e); - } catch (RuntimeException e) { - setError(e); - } - } - }}); - } - @Override - public void onExecutionError(final Object error) { - // Write ERRO - writeExecutor.execute(new Runnable() { - @Override - public void run() { - synchronized(TcpConnection.this) { - try { - Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getErrorBinding()); - int size = serializer.getSize(error, outIdentities); - outIdentities.clear(); - - if (size > maxSendSize) { - ResponseTooLargeError tooLarge = new ResponseTooLargeError(); - tooLarge.requestId = reqHeader.requestId; - MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge); - outIdentities.clear(); - return; - } - - ExecutionError_ errorHeader = new ExecutionError_(); - errorHeader.requestId = reqHeader.requestId; - MESSAGE_SERIALIZER.serialize(out, outIdentities, errorHeader); - outIdentities.clear(); - out.writeInt(size); - serializer.serialize(out, outIdentities, error); - outIdentities.clear(); - out.flush(); - } catch (IOException e) { - setError(e); - } catch (RuntimeException e) { - setError(e); - } - } - }}); - }}); - - } catch (MethodNotSupportedException e) { - in.skipBytes(size); - // return with an error - final InvalidMethodError error = new InvalidMethodError(); - error.requestId = reqHeader.requestId; - writeExecutor.execute(new Runnable() { - @Override - public void run() { - synchronized(TcpConnection.this) { - try { - MESSAGE_SERIALIZER.serialize(out, outIdentities, error); - outIdentities.clear(); - out.flush(); - } catch (IOException e) { - setError(e); - } catch (RuntimeException e) { - setError(e); - } - } - }}); - continue; - } - - - } else if (header instanceof ResponseHeader) { - int requestId = ((ResponseHeader)header).requestId; - PendingRequest req = requests.remove(requestId); - if (req==null) { - setError(new RuntimeException("Request by id "+requestId+" does not exist")); - return; - } - int size = in.readInt(); - if (size>maxRecvSize) { - // TODO SOMETHING - } - Object response = req.method.responseSerializer.deserialize(in, inIdentities); - inIdentities.clear(); - req.setResponse(response); - } else if (header instanceof ExecutionError_) { - int requestId = ((ExecutionError_)header).requestId; - PendingRequest req = requests.remove(requestId); - if (req==null) { - setError(new RuntimeException("Request by id "+requestId+" does not exist")); - return; - } - int size = in.readInt(); - if (size>maxRecvSize) { - // TODO SOMETHING - } - Object executionError = req.method.errorSerializer.deserialize(in, inIdentities); - inIdentities.clear(); - req.setExecutionError(executionError); - } else if (header instanceof Exception_) { - int requestId = ((Exception_)header).requestId; - PendingRequest req = requests.remove(requestId); - req.setExecutionError(new Exception(((Exception_)header).message)); - } else if (header instanceof InvalidMethodError) { - int requestId = ((InvalidMethodError)header).requestId; - PendingRequest req = requests.remove(requestId); - req.setInvokeException(new InvokeException(new MethodNotSupportedException("?"))); - } else if (header instanceof ResponseTooLargeError) { - int requestId = ((ResponseTooLargeError)header).requestId; - PendingRequest req = requests.remove(requestId); - req.setInvokeException(new InvokeException(new MessageOverflowException())); - } - - } catch (EOFException e) { - setClosed(); - break; - } catch (SocketException e) { - if (e.getMessage().equals("Socket Closed")) - setClosed(); - else - setError(e); - break; - } catch (IOException e) { - setError(e); - break; - } - } - try { - socket.close(); - } catch (IOException e) { - } - // Close pending requests - close(); - }; +// for (ConnectionListener listener : listeners) +// listener.onClosed(); + } + + /** + * Get the active connection of current thread + * + * @return Connection or null if current thread does not run connection + */ + public static TcpConnection getCurrentConnection() { + Thread t = Thread.currentThread(); + if (t instanceof ConnectionThread == false) return null; + ConnectionThread ct = (ConnectionThread) t; + return ct.getConnection(); + } + + /** + * Connection Thread deserializes incoming messages from the TCP Stream. + * + */ + class ConnectionThread extends Thread { + public ConnectionThread() { + setDaemon(true); + } + + public TcpConnection getConnection() { + return TcpConnection.this; + } + + public void run() { + while (!Thread.interrupted()) { + try { + Message header = (Message) MESSAGE_SERIALIZER.deserialize(in, inIdentities); + if (header instanceof RequestHeader) { + final RequestHeader reqHeader = (RequestHeader) header; + + int size = in.readInt(); + if (size>maxRecvSize) { + setError(new MessageOverflowException()); + return; + } + + int methodId = reqHeader.methodId; + if (methodId<0||methodId>=localMethods.length) { + setError(new Exception("ProtocolError")); + return; + } + MethodTypeDefinition methodDescription = localMethods[methodId]; + // Let back-end determine bindings + try { + final Method method = methodInterface.getMethod(methodDescription); + final MethodTypeBinding methodBinding = method.getMethodBinding(); + // Deserialize payload + final Object request = Bindings.getSerializerUnchecked(methodBinding.getRequestBinding()).deserialize(in, inIdentities); + inIdentities.clear(); + + // Invoke method + method.invoke(request).setListener(new InvokeListener() { + @Override + public void onCompleted(final Object response) { + // Write RESP + writeExecutor.execute(new Runnable() { + @Override + public void run() { + synchronized(TcpConnection.this) { + try { + Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getResponseBinding()); + int size = serializer.getSize(response, outIdentities); + outIdentities.clear(); + if (size > maxSendSize) { + ResponseTooLargeError tooLarge = new ResponseTooLargeError(); + tooLarge.requestId = reqHeader.requestId; + MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge); + outIdentities.clear(); + return; + } + + ResponseHeader respHeader = new ResponseHeader(); + respHeader.requestId = reqHeader.requestId; + MESSAGE_SERIALIZER.serialize(out, outIdentities, respHeader); + outIdentities.clear(); + out.writeInt(size); + + serializer.serialize(out, outIdentities, response); + outIdentities.clear(); + out.flush(); + } catch (IOException e) { + setError(e); + } catch (RuntimeException e) { + setError(e); + } + } + }}); + } + @Override + public void onException(final Exception cause) { + // Write ERRO + writeExecutor.execute(new Runnable() { + @Override + public void run() { + synchronized(TcpConnection.this) { + try { + Exception_ msg = new Exception_(); + msg.message = cause.getClass().getName()+": "+cause.getMessage(); + + MESSAGE_SERIALIZER.serialize(out, outIdentities, msg); + outIdentities.clear(); + out.flush(); + } catch (IOException e) { + setError(e); + } catch (RuntimeException e) { + setError(e); + } + } + }}); + } + @Override + public void onExecutionError(final Object error) { + // Write ERRO + writeExecutor.execute(new Runnable() { + @Override + public void run() { + synchronized(TcpConnection.this) { + try { + Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getErrorBinding()); + int size = serializer.getSize(error, outIdentities); + outIdentities.clear(); + + if (size > maxSendSize) { + ResponseTooLargeError tooLarge = new ResponseTooLargeError(); + tooLarge.requestId = reqHeader.requestId; + MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge); + outIdentities.clear(); + return; + } + + ExecutionError_ errorHeader = new ExecutionError_(); + errorHeader.requestId = reqHeader.requestId; + MESSAGE_SERIALIZER.serialize(out, outIdentities, errorHeader); + outIdentities.clear(); + out.writeInt(size); + serializer.serialize(out, outIdentities, error); + outIdentities.clear(); + out.flush(); + } catch (IOException e) { + setError(e); + } catch (RuntimeException e) { + setError(e); + } + } + }}); + }}); + + } catch (MethodNotSupportedException e) { + in.skipBytes(size); + // return with an error + final InvalidMethodError error = new InvalidMethodError(); + error.requestId = reqHeader.requestId; + writeExecutor.execute(new Runnable() { + @Override + public void run() { + synchronized(TcpConnection.this) { + try { + MESSAGE_SERIALIZER.serialize(out, outIdentities, error); + outIdentities.clear(); + out.flush(); + } catch (IOException e) { + setError(e); + } catch (RuntimeException e) { + setError(e); + } + } + }}); + continue; + } + + + } else if (header instanceof ResponseHeader) { + int requestId = ((ResponseHeader)header).requestId; + PendingRequest req = requests.remove(requestId); + if (req==null) { + setError(new RuntimeException("Request by id "+requestId+" does not exist")); + return; + } + int size = in.readInt(); + if (size>maxRecvSize) { + // TODO SOMETHING + } + Object response = req.method.responseSerializer.deserialize(in, inIdentities); + inIdentities.clear(); + req.setResponse(response); + } else if (header instanceof ExecutionError_) { + int requestId = ((ExecutionError_)header).requestId; + PendingRequest req = requests.remove(requestId); + if (req==null) { + setError(new RuntimeException("Request by id "+requestId+" does not exist")); + return; + } + int size = in.readInt(); + if (size>maxRecvSize) { + // TODO SOMETHING + } + Object executionError = req.method.errorSerializer.deserialize(in, inIdentities); + inIdentities.clear(); + req.setExecutionError(executionError); + } else if (header instanceof Exception_) { + int requestId = ((Exception_)header).requestId; + PendingRequest req = requests.remove(requestId); + req.setExecutionError(new Exception(((Exception_)header).message)); + } else if (header instanceof InvalidMethodError) { + int requestId = ((InvalidMethodError)header).requestId; + PendingRequest req = requests.remove(requestId); + req.setInvokeException(new InvokeException(new MethodNotSupportedException("?"))); + } else if (header instanceof ResponseTooLargeError) { + int requestId = ((ResponseTooLargeError)header).requestId; + PendingRequest req = requests.remove(requestId); + req.setInvokeException(new InvokeException(new MessageOverflowException())); + } + + } catch (EOFException e) { + setClosed(); + break; + } catch (SocketException e) { + if (e.getMessage().equals("Socket Closed")) + setClosed(); + else + setError(e); + break; + } catch (IOException e) { + setError(e); + break; + } + } + try { + socket.close(); + } catch (IOException e) { + } + // Close pending requests + close(); + }; } // Thread that reads input data