-// for (ConnectionListener listener : listeners)\r
-// listener.onClosed();\r
- }\r
- \r
- /**\r
- * Get the active connection of current thread\r
- * \r
- * @return Connection or <code>null</code> if current thread does not run connection\r
- */\r
- public static TcpConnection getCurrentConnection() {\r
- Thread t = Thread.currentThread();\r
- if (t instanceof ConnectionThread == false) return null;\r
- ConnectionThread ct = (ConnectionThread) t;\r
- return ct.getConnection();\r
- }\r
- \r
- /**\r
- * Connection Thread deserializes incoming messages from the TCP Stream.\r
- *\r
- */\r
- class ConnectionThread extends Thread {\r
- public ConnectionThread() {\r
- setDaemon(true);\r
- }\r
- \r
- public TcpConnection getConnection() {\r
- return TcpConnection.this;\r
- }\r
- \r
- public void run() {\r
- while (!Thread.interrupted()) {\r
- try {\r
- Message header = (Message) MESSAGE_SERIALIZER.deserialize(in, inIdentities);\r
- if (header instanceof RequestHeader) {\r
- final RequestHeader reqHeader = (RequestHeader) header;\r
-\r
- int size = in.readInt();\r
- if (size>maxRecvSize) {\r
- setError(new MessageOverflowException());\r
- return;\r
- }\r
- \r
- int methodId = reqHeader.methodId;\r
- if (methodId<0||methodId>=localMethods.length) {\r
- setError(new Exception("ProtocolError"));\r
- return;\r
- }\r
- MethodTypeDefinition methodDescription = localMethods[methodId];\r
- // Let back-end determine bindings\r
- try {\r
- final Method method = methodInterface.getMethod(methodDescription);\r
- final MethodTypeBinding methodBinding = method.getMethodBinding();\r
- // Deserialize payload \r
- final Object request = Bindings.getSerializerUnchecked(methodBinding.getRequestBinding()).deserialize(in, inIdentities);\r
- inIdentities.clear();\r
- \r
- // Invoke method\r
- method.invoke(request).setListener(new InvokeListener() {\r
- @Override\r
- public void onCompleted(final Object response) {\r
- // Write RESP\r
- writeExecutor.execute(new Runnable() {\r
- @Override\r
- public void run() {\r
- synchronized(TcpConnection.this) {\r
- try {\r
- Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getResponseBinding());\r
- int size = serializer.getSize(response, outIdentities);\r
- outIdentities.clear();\r
- if (size > maxSendSize) {\r
- ResponseTooLargeError tooLarge = new ResponseTooLargeError();\r
- tooLarge.requestId = reqHeader.requestId;\r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);\r
- outIdentities.clear(); \r
- return;\r
- }\r
-\r
- ResponseHeader respHeader = new ResponseHeader();\r
- respHeader.requestId = reqHeader.requestId;\r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, respHeader);\r
- outIdentities.clear();\r
- out.writeInt(size);\r
- \r
- serializer.serialize(out, outIdentities, response);\r
- outIdentities.clear();\r
- out.flush();\r
- } catch (IOException e) {\r
- setError(e);\r
- } catch (RuntimeException e) {\r
- setError(e);\r
- }\r
- }\r
- }});\r
- }\r
- @Override\r
- public void onException(final Exception cause) {\r
- // Write ERRO\r
- writeExecutor.execute(new Runnable() {\r
- @Override\r
- public void run() {\r
- synchronized(TcpConnection.this) {\r
- try {\r
- Exception_ msg = new Exception_();\r
- msg.message = cause.getClass().getName()+": "+cause.getMessage(); \r
- \r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, msg);\r
- outIdentities.clear();\r
- out.flush();\r
- } catch (IOException e) {\r
- setError(e);\r
- } catch (RuntimeException e) {\r
- setError(e);\r
- }\r
- }\r
- }}); \r
- }\r
- @Override\r
- public void onExecutionError(final Object error) {\r
- // Write ERRO\r
- writeExecutor.execute(new Runnable() {\r
- @Override\r
- public void run() {\r
- synchronized(TcpConnection.this) {\r
- try {\r
- Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getErrorBinding());\r
- int size = serializer.getSize(error, outIdentities);\r
- outIdentities.clear();\r
- \r
- if (size > maxSendSize) {\r
- ResponseTooLargeError tooLarge = new ResponseTooLargeError();\r
- tooLarge.requestId = reqHeader.requestId;\r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);\r
- outIdentities.clear(); \r
- return;\r
- }\r
- \r
- ExecutionError_ errorHeader = new ExecutionError_();\r
- errorHeader.requestId = reqHeader.requestId;\r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, errorHeader);\r
- outIdentities.clear();\r
- out.writeInt(size);\r
- serializer.serialize(out, outIdentities, error);\r
- outIdentities.clear();\r
- out.flush();\r
- } catch (IOException e) {\r
- setError(e);\r
- } catch (RuntimeException e) {\r
- setError(e);\r
- }\r
- }\r
- }}); \r
- }});\r
-\r
- } catch (MethodNotSupportedException e) {\r
- in.skipBytes(size);\r
- // return with an error\r
- final InvalidMethodError error = new InvalidMethodError();\r
- error.requestId = reqHeader.requestId;\r
- writeExecutor.execute(new Runnable() {\r
- @Override\r
- public void run() {\r
- synchronized(TcpConnection.this) {\r
- try {\r
- MESSAGE_SERIALIZER.serialize(out, outIdentities, error);\r
- outIdentities.clear();\r
- out.flush();\r
- } catch (IOException e) {\r
- setError(e);\r
- } catch (RuntimeException e) {\r
- setError(e);\r
- }\r
- }\r
- }}); \r
- continue; \r
- } \r
- \r
- \r
- } else if (header instanceof ResponseHeader) {\r
- int requestId = ((ResponseHeader)header).requestId;\r
- PendingRequest req = requests.remove(requestId);\r
- if (req==null) {\r
- setError(new RuntimeException("Request by id "+requestId+" does not exist"));\r
- return; \r
- } \r
- int size = in.readInt();\r
- if (size>maxRecvSize) {\r
- // TODO SOMETHING\r
- }\r
- Object response = req.method.responseSerializer.deserialize(in, inIdentities);\r
- inIdentities.clear();\r
- req.setResponse(response);\r
- } else if (header instanceof ExecutionError_) { \r
- int requestId = ((ExecutionError_)header).requestId;\r
- PendingRequest req = requests.remove(requestId);\r
- if (req==null) {\r
- setError(new RuntimeException("Request by id "+requestId+" does not exist"));\r
- return;\r
- }\r
- int size = in.readInt();\r
- if (size>maxRecvSize) {\r
- // TODO SOMETHING\r
- }\r
- Object executionError = req.method.errorSerializer.deserialize(in, inIdentities);\r
- inIdentities.clear();\r
- req.setExecutionError(executionError);\r
- } else if (header instanceof Exception_) {\r
- int requestId = ((Exception_)header).requestId;\r
- PendingRequest req = requests.remove(requestId);\r
- req.setExecutionError(new Exception(((Exception_)header).message));\r
- } else if (header instanceof InvalidMethodError) {\r
- int requestId = ((InvalidMethodError)header).requestId;\r
- PendingRequest req = requests.remove(requestId);\r
- req.setInvokeException(new InvokeException(new MethodNotSupportedException("?")));\r
- } else if (header instanceof ResponseTooLargeError) {\r
- int requestId = ((ResponseTooLargeError)header).requestId;\r
- PendingRequest req = requests.remove(requestId);\r
- req.setInvokeException(new InvokeException(new MessageOverflowException()));\r
- }\r
- \r
- } catch (EOFException e) {\r
- setClosed();\r
- break;\r
- } catch (SocketException e) {\r
- if (e.getMessage().equals("Socket Closed"))\r
- setClosed();\r
- else\r
- setError(e);\r
- break;\r
- } catch (IOException e) {\r
- setError(e);\r
- break;\r
- }\r
- }\r
- try {\r
- socket.close();\r
- } catch (IOException e) {\r
- }\r
- // Close pending requests\r
- close();\r
- };\r
+// for (ConnectionListener listener : listeners)
+// listener.onClosed();
+ }
+
+ /**
+ * Get the active connection of current thread
+ *
+ * @return Connection or <code>null</code> if current thread does not run connection
+ */
+ public static TcpConnection getCurrentConnection() {
+ Thread t = Thread.currentThread();
+ if (t instanceof ConnectionThread == false) return null;
+ ConnectionThread ct = (ConnectionThread) t;
+ return ct.getConnection();
+ }
+
+ /**
+ * Connection Thread deserializes incoming messages from the TCP Stream.
+ *
+ */
+ class ConnectionThread extends Thread {
+ public ConnectionThread() {
+ setDaemon(true);
+ }
+
+ public TcpConnection getConnection() {
+ return TcpConnection.this;
+ }
+
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ Message header = (Message) MESSAGE_SERIALIZER.deserialize(in, inIdentities);
+ if (header instanceof RequestHeader) {
+ final RequestHeader reqHeader = (RequestHeader) header;
+
+ int size = in.readInt();
+ if (size>maxRecvSize) {
+ setError(new MessageOverflowException());
+ return;
+ }
+
+ int methodId = reqHeader.methodId;
+ if (methodId<0||methodId>=localMethods.length) {
+ setError(new Exception("ProtocolError"));
+ return;
+ }
+ MethodTypeDefinition methodDescription = localMethods[methodId];
+ // Let back-end determine bindings
+ try {
+ final Method method = methodInterface.getMethod(methodDescription);
+ final MethodTypeBinding methodBinding = method.getMethodBinding();
+ // Deserialize payload
+ final Object request = Bindings.getSerializerUnchecked(methodBinding.getRequestBinding()).deserialize(in, inIdentities);
+ inIdentities.clear();
+
+ // Invoke method
+ method.invoke(request).setListener(new InvokeListener() {
+ @Override
+ public void onCompleted(final Object response) {
+ // Write RESP
+ writeExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ synchronized(TcpConnection.this) {
+ try {
+ Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getResponseBinding());
+ int size = serializer.getSize(response, outIdentities);
+ outIdentities.clear();
+ if (size > maxSendSize) {
+ ResponseTooLargeError tooLarge = new ResponseTooLargeError();
+ tooLarge.requestId = reqHeader.requestId;
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);
+ outIdentities.clear();
+ return;
+ }
+
+ ResponseHeader respHeader = new ResponseHeader();
+ respHeader.requestId = reqHeader.requestId;
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, respHeader);
+ outIdentities.clear();
+ out.writeInt(size);
+
+ serializer.serialize(out, outIdentities, response);
+ outIdentities.clear();
+ out.flush();
+ } catch (IOException e) {
+ setError(e);
+ } catch (RuntimeException e) {
+ setError(e);
+ }
+ }
+ }});
+ }
+ @Override
+ public void onException(final Exception cause) {
+ // Write ERRO
+ writeExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ synchronized(TcpConnection.this) {
+ try {
+ Exception_ msg = new Exception_();
+ msg.message = cause.getClass().getName()+": "+cause.getMessage();
+
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, msg);
+ outIdentities.clear();
+ out.flush();
+ } catch (IOException e) {
+ setError(e);
+ } catch (RuntimeException e) {
+ setError(e);
+ }
+ }
+ }});
+ }
+ @Override
+ public void onExecutionError(final Object error) {
+ // Write ERRO
+ writeExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ synchronized(TcpConnection.this) {
+ try {
+ Serializer serializer = Bindings.getSerializerUnchecked(methodBinding.getErrorBinding());
+ int size = serializer.getSize(error, outIdentities);
+ outIdentities.clear();
+
+ if (size > maxSendSize) {
+ ResponseTooLargeError tooLarge = new ResponseTooLargeError();
+ tooLarge.requestId = reqHeader.requestId;
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, tooLarge);
+ outIdentities.clear();
+ return;
+ }
+
+ ExecutionError_ errorHeader = new ExecutionError_();
+ errorHeader.requestId = reqHeader.requestId;
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, errorHeader);
+ outIdentities.clear();
+ out.writeInt(size);
+ serializer.serialize(out, outIdentities, error);
+ outIdentities.clear();
+ out.flush();
+ } catch (IOException e) {
+ setError(e);
+ } catch (RuntimeException e) {
+ setError(e);
+ }
+ }
+ }});
+ }});
+
+ } catch (MethodNotSupportedException e) {
+ in.skipBytes(size);
+ // return with an error
+ final InvalidMethodError error = new InvalidMethodError();
+ error.requestId = reqHeader.requestId;
+ writeExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ synchronized(TcpConnection.this) {
+ try {
+ MESSAGE_SERIALIZER.serialize(out, outIdentities, error);
+ outIdentities.clear();
+ out.flush();
+ } catch (IOException e) {
+ setError(e);
+ } catch (RuntimeException e) {
+ setError(e);
+ }
+ }
+ }});
+ continue;
+ }
+
+
+ } else if (header instanceof ResponseHeader) {
+ int requestId = ((ResponseHeader)header).requestId;
+ PendingRequest req = requests.remove(requestId);
+ if (req==null) {
+ setError(new RuntimeException("Request by id "+requestId+" does not exist"));
+ return;
+ }
+ int size = in.readInt();
+ if (size>maxRecvSize) {
+ // TODO SOMETHING
+ }
+ Object response = req.method.responseSerializer.deserialize(in, inIdentities);
+ inIdentities.clear();
+ req.setResponse(response);
+ } else if (header instanceof ExecutionError_) {
+ int requestId = ((ExecutionError_)header).requestId;
+ PendingRequest req = requests.remove(requestId);
+ if (req==null) {
+ setError(new RuntimeException("Request by id "+requestId+" does not exist"));
+ return;
+ }
+ int size = in.readInt();
+ if (size>maxRecvSize) {
+ // TODO SOMETHING
+ }
+ Object executionError = req.method.errorSerializer.deserialize(in, inIdentities);
+ inIdentities.clear();
+ req.setExecutionError(executionError);
+ } else if (header instanceof Exception_) {
+ int requestId = ((Exception_)header).requestId;
+ PendingRequest req = requests.remove(requestId);
+ req.setExecutionError(new Exception(((Exception_)header).message));
+ } else if (header instanceof InvalidMethodError) {
+ int requestId = ((InvalidMethodError)header).requestId;
+ PendingRequest req = requests.remove(requestId);
+ req.setInvokeException(new InvokeException(new MethodNotSupportedException("?")));
+ } else if (header instanceof ResponseTooLargeError) {
+ int requestId = ((ResponseTooLargeError)header).requestId;
+ PendingRequest req = requests.remove(requestId);
+ req.setInvokeException(new InvokeException(new MessageOverflowException()));
+ }
+
+ } catch (EOFException e) {
+ setClosed();
+ break;
+ } catch (SocketException e) {
+ if (e.getMessage().equals("Socket Closed"))
+ setClosed();
+ else
+ setError(e);
+ break;
+ } catch (IOException e) {
+ setError(e);
+ break;
+ }
+ }
+ try {
+ socket.close();
+ } catch (IOException e) {
+ }
+ // Close pending requests
+ close();
+ };