-/*******************************************************************************\r
- * Copyright (c) 2010 Association for Decentralized Information Management in\r
- * Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
+/*******************************************************************************
+ * Copyright (c) 2010 Association for Decentralized Information Management in
+ * Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
package org.simantics.databoard.method;
-import gnu.trove.map.hash.TObjectIntHashMap;\r
-\r
-import java.io.EOFException;\r
-import java.io.IOException;\r
-import java.net.Socket;\r
-import java.net.SocketException;\r
-import java.nio.charset.Charset;\r
-import java.util.ArrayList;\r
-import java.util.HashMap;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.concurrent.ConcurrentHashMap;\r
-import java.util.concurrent.CopyOnWriteArrayList;\r
-import java.util.concurrent.ExecutorService;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.SynchronousQueue;\r
-import java.util.concurrent.ThreadPoolExecutor;\r
-import java.util.concurrent.TimeUnit;\r
-import java.util.concurrent.atomic.AtomicInteger;\r
-\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.annotations.Union;\r
-import org.simantics.databoard.binding.Binding;\r
-import org.simantics.databoard.binding.RecordBinding;\r
-import org.simantics.databoard.binding.UnionBinding;\r
-import org.simantics.databoard.serialization.Serializer;\r
-import org.simantics.databoard.serialization.SerializerConstructionException;\r
-import org.simantics.databoard.util.binary.BinaryReadable;\r
-import org.simantics.databoard.util.binary.BinaryWriteable;\r
-import org.simantics.databoard.util.binary.InputStreamReadable;\r
-import org.simantics.databoard.util.binary.OutputStreamWriteable;\r
+import gnu.trove.map.hash.TObjectIntHashMap;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.annotations.Union;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.binding.RecordBinding;
+import org.simantics.databoard.binding.UnionBinding;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.serialization.SerializerConstructionException;
+import org.simantics.databoard.util.binary.BinaryReadable;
+import org.simantics.databoard.util.binary.BinaryWriteable;
+import org.simantics.databoard.util.binary.InputStreamReadable;
+import org.simantics.databoard.util.binary.OutputStreamWriteable;
/**
* Connection is a class that handles request-response communication over a
* @author Toni Kalajainen <toni.kalajainen@vtt.fi>
*/
public class TcpConnection implements MethodInterface {
-\r
- public static final ExecutorService SHARED_EXECUTOR_SERVICE = \r
- new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());\r
+
+ public static final ExecutorService SHARED_EXECUTOR_SERVICE =
+ new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
static final Serializer MESSAGE_SERIALIZER = Bindings.getSerializerUnchecked( Bindings.getBindingUnchecked(Message.class) );
static Charset UTF8 = Charset.forName("UTF8");
// localMethodsMap.trimToSize();
in = new InputStreamReadable( socket.getInputStream(), Long.MAX_VALUE );
- out = new OutputStreamWriteable( socket.getOutputStream() );\r
- \r
- String threadName = "Connection-"+socket.getInetAddress().getHostAddress()+":"+socket.getPort();\r
- \r
+ out = new OutputStreamWriteable( socket.getOutputStream() );
+
+ String threadName = "Connection-"+socket.getInetAddress().getHostAddress()+":"+socket.getPort();
+
thread.setName( threadName );
thread.start();
}
throws MethodNotSupportedException {
// consumer suggests object bindings
MethodTypeDefinition description = binding.getMethodDefinition();
- \r
- if (!remoteMethodsMap.containsKey(description)) {\r
- /*\r
- System.out.println("Method not found: "+description);\r
- System.out.println("Existing methods:" );\r
- for (MethodTypeDefinition k : remoteMethodsMap.keySet()) {\r
- System.out.print(k);\r
- if (k.getType().requestType.getComponentCount()>0) {\r
- System.out.print(System.identityHashCode( k.getType().requestType.getComponentType(0) ) );\r
- }\r
- System.out.println(); \r
- }\r
+
+ if (!remoteMethodsMap.containsKey(description)) {
+ /*
+ System.out.println("Method not found: "+description);
+ System.out.println("Existing methods:" );
+ for (MethodTypeDefinition k : remoteMethodsMap.keySet()) {
+ System.out.print(k);
+ if (k.getType().requestType.getComponentCount()>0) {
+ System.out.print(System.identityHashCode( k.getType().requestType.getComponentType(0) ) );
+ }
+ System.out.println();
+ }
*/
throw new MethodNotSupportedException(description.getName());
}
*/
void onClosed();
}
- \r
+
CopyOnWriteArrayList<ConnectionListener> listeners = new CopyOnWriteArrayList<ConnectionListener>();
public synchronized void addConnectionListener(ConnectionListener listener) {
listeners.add( listener );
- }\r
- \r
- public void removeConnectionListener(ConnectionListener listener) {\r
- listeners.remove( listener );\r
+ }
+
+ public void removeConnectionListener(ConnectionListener listener) {
+ listeners.remove( listener );
}
class MethodImpl implements Method {
public AsyncResult invoke(final Object request) {
// Write, async
final PendingRequest result = new PendingRequest(this, requestCounter.getAndIncrement());
- requests.put(result.requestId, result);\r
+ requests.put(result.requestId, result);
if (!active) {
result.setInvokeException(new InvokeException(new ConnectionClosedException()));
} else {
writeExecutor.execute(new Runnable() {
@Override
- public void run() { \r
+ public void run() {
synchronized(TcpConnection.this) {
try {
int size= requestSerializer.getSize(request, outIdentities);
result.setInvokeException(new InvokeException(e));
} catch (RuntimeException e) {
result.setInvokeException(new InvokeException(e));
- }\r
+ }
}
}});
}
return methodBinding;
}
}
- \r
- void setClosed() \r
- {\r
- for (ConnectionListener listener : listeners)\r
- listener.onClosed();\r
+
+ void setClosed()
+ {
+ for (ConnectionListener listener : listeners)
+ listener.onClosed();
}
void setError(Exception e)
- {\r
+ {
for (ConnectionListener listener : listeners)
listener.onError(e);
close();
{
return localMethods;
}
- \r
- public MethodInterface getRemoteMethodInterface() {\r
- return this;\r
- }\r
+
+ public MethodInterface getRemoteMethodInterface() {
+ return this;
+ }
/**
* Close the connection. All pending service request are canceled.
requests.values().removeAll(reqs);
// shutdown inthread
thread.interrupt();
-// for (ConnectionListener listener : listeners)\r
-// listener.onClosed();\r
- }\r
- \r
- /**\r
- * Get the active connection of current thread\r
- * \r
- * @return Connection or <code>null</code> if current thread does not run connection\r
- */\r
- public static TcpConnection getCurrentConnection() {\r
- Thread t = Thread.currentThread();\r
- if (t instanceof ConnectionThread == false) return null;\r
- ConnectionThread ct = (ConnectionThread) t;\r
- return ct.getConnection();\r
- }\r
- \r
- /**\r
- * Connection Thread deserializes incoming messages from the TCP Stream.\r
- *\r
- */\r
- class ConnectionThread extends Thread {\r
- public ConnectionThread() {\r
- setDaemon(true);\r
- }\r
- \r
- public TcpConnection getConnection() {\r
- return TcpConnection.this;\r
- }\r
- \r
- public void run() {\r
- while (!Thread.interrupted()) {\r
- try {\r
- Message header = (Message) MESSAGE_SERIALIZER.deserialize(in, inIdentities);\r
- if (header instanceof RequestHeader) {\r
- final RequestHeader reqHeader = (RequestHeader) header;\r
-\r
- int size = in.readInt();\r
- if (size>maxRecvSize) {\r
- setError(new MessageOverflowException());\r
- return;\r
- }\r
- \r
- int methodId = reqHeader.methodId;\r
- if (methodId<0||methodId>=localMethods.length) {\r
- setError(new Exception("ProtocolError"));\r
- return;\r
- }\r
- MethodTypeDefinition methodDescription = localMethods[methodId];\r
- // Let back-end determine bindings\r
- try {\r
- final Method method = methodInterface.getMethod(methodDescription);\r
- final MethodTypeBinding methodBinding = method.getMethodBinding();\r
- // Deserialize payload \r
- final Object request = Bindings.getSerializerUnchecked(methodBinding.getRequestBinding()).deserialize(in, inIdentities);\r
- inIdentities.clear();\r
- \r
- // Invoke method\r
- method.invoke(request).setListener(new InvokeListener() {\r
- @Override\r
- public void onCompleted(final Object response) {\r
- // Write RESP\r
- writeExecutor.execute(new Runnable() {\r
- @Override\r
- public void run() {\r
- synchronized(TcpConnection.this) {\r
- try {\r
- Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getResponseBinding());\r
- int size = serializer.getSize(response, outIdentities);\r
- outIdentities.clear();\r
- if (size > maxSendSize) {\r
- ResponseTooLargeError tooLarge = new ResponseTooLargeError();\r
- tooLarge.requestId = reqHeader.requestId;\r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);\r
- outIdentities.clear(); \r
- return;\r
- }\r
-\r
- ResponseHeader respHeader = new ResponseHeader();\r
- respHeader.requestId = reqHeader.requestId;\r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, respHeader);\r
- outIdentities.clear();\r
- out.writeInt(size);\r
- \r
- serializer.serialize(out, outIdentities, response);\r
- outIdentities.clear();\r
- out.flush();\r
- } catch (IOException e) {\r
- setError(e);\r
- } catch (RuntimeException e) {\r
- setError(e);\r
- }\r
- }\r
- }});\r
- }\r
- @Override\r
- public void onException(final Exception cause) {\r
- // Write ERRO\r
- writeExecutor.execute(new Runnable() {\r
- @Override\r
- public void run() {\r
- synchronized(TcpConnection.this) {\r
- try {\r
- Exception_ msg = new Exception_();\r
- msg.message = cause.getClass().getName()+": "+cause.getMessage(); \r
- \r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, msg);\r
- outIdentities.clear();\r
- out.flush();\r
- } catch (IOException e) {\r
- setError(e);\r
- } catch (RuntimeException e) {\r
- setError(e);\r
- }\r
- }\r
- }}); \r
- }\r
- @Override\r
- public void onExecutionError(final Object error) {\r
- // Write ERRO\r
- writeExecutor.execute(new Runnable() {\r
- @Override\r
- public void run() {\r
- synchronized(TcpConnection.this) {\r
- try {\r
- Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getErrorBinding());\r
- int size = serializer.getSize(error, outIdentities);\r
- outIdentities.clear();\r
- \r
- if (size > maxSendSize) {\r
- ResponseTooLargeError tooLarge = new ResponseTooLargeError();\r
- tooLarge.requestId = reqHeader.requestId;\r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);\r
- outIdentities.clear(); \r
- return;\r
- }\r
- \r
- ExecutionError_ errorHeader = new ExecutionError_();\r
- errorHeader.requestId = reqHeader.requestId;\r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, errorHeader);\r
- outIdentities.clear();\r
- out.writeInt(size);\r
- serializer.serialize(out, outIdentities, error);\r
- outIdentities.clear();\r
- out.flush();\r
- } catch (IOException e) {\r
- setError(e);\r
- } catch (RuntimeException e) {\r
- setError(e);\r
- }\r
- }\r
- }}); \r
- }});\r
-\r
- } catch (MethodNotSupportedException e) {\r
- in.skipBytes(size);\r
- // return with an error\r
- final InvalidMethodError error = new InvalidMethodError();\r
- error.requestId = reqHeader.requestId;\r
- writeExecutor.execute(new Runnable() {\r
- @Override\r
- public void run() {\r
- synchronized(TcpConnection.this) {\r
- try {\r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, error);\r
- outIdentities.clear();\r
- out.flush();\r
- } catch (IOException e) {\r
- setError(e);\r
- } catch (RuntimeException e) {\r
- setError(e);\r
- }\r
- }\r
- }}); \r
- continue; \r
- } \r
- \r
- \r
- } else if (header instanceof ResponseHeader) {\r
- int requestId = ((ResponseHeader)header).requestId;\r
- PendingRequest req = requests.remove(requestId);\r
- if (req==null) {\r
- setError(new RuntimeException("Request by id "+requestId+" does not exist"));\r
- return; \r
- } \r
- int size = in.readInt();\r
- if (size>maxRecvSize) {\r
- // TODO SOMETHING\r
- }\r
- Object response = req.method.responseSerializer.deserialize(in, inIdentities);\r
- inIdentities.clear();\r
- req.setResponse(response);\r
- } else if (header instanceof ExecutionError_) { \r
- int requestId = ((ExecutionError_)header).requestId;\r
- PendingRequest req = requests.remove(requestId);\r
- if (req==null) {\r
- setError(new RuntimeException("Request by id "+requestId+" does not exist"));\r
- return;\r
- }\r
- int size = in.readInt();\r
- if (size>maxRecvSize) {\r
- // TODO SOMETHING\r
- }\r
- Object executionError = req.method.errorSerializer.deserialize(in, inIdentities);\r
- inIdentities.clear();\r
- req.setExecutionError(executionError);\r
- } else if (header instanceof Exception_) {\r
- int requestId = ((Exception_)header).requestId;\r
- PendingRequest req = requests.remove(requestId);\r
- req.setExecutionError(new Exception(((Exception_)header).message));\r
- } else if (header instanceof InvalidMethodError) {\r
- int requestId = ((InvalidMethodError)header).requestId;\r
- PendingRequest req = requests.remove(requestId);\r
- req.setInvokeException(new InvokeException(new MethodNotSupportedException("?")));\r
- } else if (header instanceof ResponseTooLargeError) {\r
- int requestId = ((ResponseTooLargeError)header).requestId;\r
- PendingRequest req = requests.remove(requestId);\r
- req.setInvokeException(new InvokeException(new MessageOverflowException()));\r
- }\r
- \r
- } catch (EOFException e) {\r
- setClosed();\r
- break;\r
- } catch (SocketException e) {\r
- if (e.getMessage().equals("Socket Closed"))\r
- setClosed();\r
- else\r
- setError(e);\r
- break;\r
- } catch (IOException e) {\r
- setError(e);\r
- break;\r
- }\r
- }\r
- try {\r
- socket.close();\r
- } catch (IOException e) {\r
- }\r
- // Close pending requests\r
- close();\r
- };\r
+// for (ConnectionListener listener : listeners)
+// listener.onClosed();
+ }
+
+ /**
+ * Get the active connection of current thread
+ *
+ * @return Connection or <code>null</code> if current thread does not run connection
+ */
+ public static TcpConnection getCurrentConnection() {
+ Thread t = Thread.currentThread();
+ if (t instanceof ConnectionThread == false) return null;
+ ConnectionThread ct = (ConnectionThread) t;
+ return ct.getConnection();
+ }
+
+ /**
+ * Connection Thread deserializes incoming messages from the TCP Stream.
+ *
+ */
+ class ConnectionThread extends Thread {
+ public ConnectionThread() {
+ setDaemon(true);
+ }
+
+ public TcpConnection getConnection() {
+ return TcpConnection.this;
+ }
+
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ Message header = (Message) MESSAGE_SERIALIZER.deserialize(in, inIdentities);
+ if (header instanceof RequestHeader) {
+ final RequestHeader reqHeader = (RequestHeader) header;
+
+ int size = in.readInt();
+ if (size>maxRecvSize) {
+ setError(new MessageOverflowException());
+ return;
+ }
+
+ int methodId = reqHeader.methodId;
+ if (methodId<0||methodId>=localMethods.length) {
+ setError(new Exception("ProtocolError"));
+ return;
+ }
+ MethodTypeDefinition methodDescription = localMethods[methodId];
+ // Let back-end determine bindings
+ try {
+ final Method method = methodInterface.getMethod(methodDescription);
+ final MethodTypeBinding methodBinding = method.getMethodBinding();
+ // Deserialize payload
+ final Object request = Bindings.getSerializerUnchecked(methodBinding.getRequestBinding()).deserialize(in, inIdentities);
+ inIdentities.clear();
+
+ // Invoke method
+ method.invoke(request).setListener(new InvokeListener() {
+ @Override
+ public void onCompleted(final Object response) {
+ // Write RESP
+ writeExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ synchronized(TcpConnection.this) {
+ try {
+ Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getResponseBinding());
+ int size = serializer.getSize(response, outIdentities);
+ outIdentities.clear();
+ if (size > maxSendSize) {
+ ResponseTooLargeError tooLarge = new ResponseTooLargeError();
+ tooLarge.requestId = reqHeader.requestId;
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);
+ outIdentities.clear();
+ return;
+ }
+
+ ResponseHeader respHeader = new ResponseHeader();
+ respHeader.requestId = reqHeader.requestId;
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, respHeader);
+ outIdentities.clear();
+ out.writeInt(size);
+
+ serializer.serialize(out, outIdentities, response);
+ outIdentities.clear();
+ out.flush();
+ } catch (IOException e) {
+ setError(e);
+ } catch (RuntimeException e) {
+ setError(e);
+ }
+ }
+ }});
+ }
+ @Override
+ public void onException(final Exception cause) {
+ // Write ERRO
+ writeExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ synchronized(TcpConnection.this) {
+ try {
+ Exception_ msg = new Exception_();
+ msg.message = cause.getClass().getName()+": "+cause.getMessage();
+
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, msg);
+ outIdentities.clear();
+ out.flush();
+ } catch (IOException e) {
+ setError(e);
+ } catch (RuntimeException e) {
+ setError(e);
+ }
+ }
+ }});
+ }
+ @Override
+ public void onExecutionError(final Object error) {
+ // Write ERRO
+ writeExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ synchronized(TcpConnection.this) {
+ try {
+ Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getErrorBinding());
+ int size = serializer.getSize(error, outIdentities);
+ outIdentities.clear();
+
+ if (size > maxSendSize) {
+ ResponseTooLargeError tooLarge = new ResponseTooLargeError();
+ tooLarge.requestId = reqHeader.requestId;
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);
+ outIdentities.clear();
+ return;
+ }
+
+ ExecutionError_ errorHeader = new ExecutionError_();
+ errorHeader.requestId = reqHeader.requestId;
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, errorHeader);
+ outIdentities.clear();
+ out.writeInt(size);
+ serializer.serialize(out, outIdentities, error);
+ outIdentities.clear();
+ out.flush();
+ } catch (IOException e) {
+ setError(e);
+ } catch (RuntimeException e) {
+ setError(e);
+ }
+ }
+ }});
+ }});
+
+ } catch (MethodNotSupportedException e) {
+ in.skipBytes(size);
+ // return with an error
+ final InvalidMethodError error = new InvalidMethodError();
+ error.requestId = reqHeader.requestId;
+ writeExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ synchronized(TcpConnection.this) {
+ try {
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, error);
+ outIdentities.clear();
+ out.flush();
+ } catch (IOException e) {
+ setError(e);
+ } catch (RuntimeException e) {
+ setError(e);
+ }
+ }
+ }});
+ continue;
+ }
+
+
+ } else if (header instanceof ResponseHeader) {
+ int requestId = ((ResponseHeader)header).requestId;
+ PendingRequest req = requests.remove(requestId);
+ if (req==null) {
+ setError(new RuntimeException("Request by id "+requestId+" does not exist"));
+ return;
+ }
+ int size = in.readInt();
+ if (size>maxRecvSize) {
+ // TODO SOMETHING
+ }
+ Object response = req.method.responseSerializer.deserialize(in, inIdentities);
+ inIdentities.clear();
+ req.setResponse(response);
+ } else if (header instanceof ExecutionError_) {
+ int requestId = ((ExecutionError_)header).requestId;
+ PendingRequest req = requests.remove(requestId);
+ if (req==null) {
+ setError(new RuntimeException("Request by id "+requestId+" does not exist"));
+ return;
+ }
+ int size = in.readInt();
+ if (size>maxRecvSize) {
+ // TODO SOMETHING
+ }
+ Object executionError = req.method.errorSerializer.deserialize(in, inIdentities);
+ inIdentities.clear();
+ req.setExecutionError(executionError);
+ } else if (header instanceof Exception_) {
+ int requestId = ((Exception_)header).requestId;
+ PendingRequest req = requests.remove(requestId);
+ req.setExecutionError(new Exception(((Exception_)header).message));
+ } else if (header instanceof InvalidMethodError) {
+ int requestId = ((InvalidMethodError)header).requestId;
+ PendingRequest req = requests.remove(requestId);
+ req.setInvokeException(new InvokeException(new MethodNotSupportedException("?")));
+ } else if (header instanceof ResponseTooLargeError) {
+ int requestId = ((ResponseTooLargeError)header).requestId;
+ PendingRequest req = requests.remove(requestId);
+ req.setInvokeException(new InvokeException(new MessageOverflowException()));
+ }
+
+ } catch (EOFException e) {
+ setClosed();
+ break;
+ } catch (SocketException e) {
+ if (e.getMessage().equals("Socket Closed"))
+ setClosed();
+ else
+ setError(e);
+ break;
+ } catch (IOException e) {
+ setError(e);
+ break;
+ }
+ }
+ try {
+ socket.close();
+ } catch (IOException e) {
+ }
+ // Close pending requests
+ close();
+ };
}
// Thread that reads input data