--- /dev/null
+package org.simantics.db.testing.common;
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DatabaseProxy implements Runnable {
+
+ private static boolean SMALL_PACKETS = false;
+ private static int PORT = 8891;
+ private boolean interrupted = false;
+ private Map<String, String> databases = new HashMap<String, String>();
+
+ private ServerSocket service = null;
+
+ /**
+ * Initializes database proxy with static mappings.
+ *
+ */
+ public DatabaseProxy() {
+ databases.put("/public/", "192.168.3.11:9101");
+ databases.put("FooBar", "localhost:6671");
+ }
+
+ public DatabaseProxy(String host, int port) {
+ databases.put("/test/", host+":"+port);
+ }
+
+ public void start() {
+ Thread t = new Thread(this);
+ init();
+ t.start();
+ }
+
+ public void stop() {
+ interrupted = true;
+ // We have to interrupt the the blocking accept call.
+ // This is one way of doing it.
+ if(service != null)
+ try {
+ service.close();
+ } catch (IOException e) {
+ }
+ }
+
+ public int getPort() {
+ return PORT;
+ }
+
+ public void init() {
+ try {
+ service = new ServerSocket(PORT);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+
+ public void run() {
+ System.out.println("Accepting connections");
+
+ int connectionNumber = 0;
+ while(!interrupted) {
+ try {
+ final Socket socket = service.accept();
+ String connectionName = "Connection" + ++connectionNumber;
+ System.out.println(connectionName + ": Client connection accepted.");
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ serve(socket);
+ socket.close();
+ } catch(IOException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ Thread t = new Thread(runnable, connectionName);
+ t.start();
+ } catch (IOException e) {
+ if (interrupted)
+ break; // Blocking accept call intentionally interrupted by closing the socket.
+ e.printStackTrace();
+ }
+ }
+
+ // Shutdown service
+ try {
+ if(service != null)
+ service.close();
+ } catch (IOException ignore) {
+ }
+ }
+
+ /**
+ * Serves socket connection. Waits for a connection line from the client and parses it.
+ * The connection line must be in form "CONNECT\tDatabaseIdentifier"
+ * If the database name is valid, creates a connection to the database and works as a proxy
+ * between the client and the database.
+ *
+ * @param socket
+ * @throws IOException
+ */
+ private void serve(Socket socket) throws IOException {
+ String connectionName = Thread.currentThread().getName();
+
+ BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
+
+ String line = input.readLine();
+ String columns[] = line.split("\t");
+ if(columns.length < 2) {
+ System.out.println(connectionName + ": serve() Invalid connection string [1] '"+line+"'");
+ out.write("ERROR Invalid connection string\n");
+ return;
+ }
+ String command = columns[0];
+ String param = columns[1];
+
+ /**
+ * CONNECT is the only supported command
+ */
+ if("CONNECT".equals(command)) {
+ if(databases.containsKey(param) == false) {
+ System.out.println(connectionName + ": serve() Invalid database '"+param+"'");
+ out.write("ERROR Invalid database '"+param+"'\n");
+ return;
+ }
+ // The second parameter is directly mapped to database name. In future some parsing is required.
+ String address = databases.get(param);
+ String[] split = address.split(":");
+ if (split.length != 2)
+ throw new IllegalArgumentException(connectionName+ ": address does not contain a port, missing ':' character");
+ InetSocketAddress dbAddress = new InetSocketAddress(split[0], Integer.parseInt(split[1]));
+
+ Socket dbSocket = new Socket();
+ try {
+ dbSocket.connect(dbAddress);
+ } catch(IOException e) {
+ System.out.println(connectionName + ":serve() Couldn't connect to database '"+param+"'");
+ out.write("ERROR Couldn't connect to database '"+param+"'\n");
+ return;
+ }
+
+ out.write("OK\n");
+ out.flush();
+ proxy(dbSocket, socket);
+
+ dbSocket.close();
+ } else {
+ System.out.println(connectionName + ": serve() Invalid command '"+command+"'");
+ out.write("ERROR Invalid command '"+command+"'\n");
+ return;
+ }
+ }
+
+ /**
+ * Proxies data from socket a to socket b and vice versa
+ * This method blocks until one of the sockets closes or exception is thrown.
+ *
+ * @param a
+ * @param b
+ */
+ private void proxy(final Socket a, final Socket b) {
+ // A -> B
+ Runnable ab = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ byte buffer[] = new byte[256];
+ while(a.isConnected() && b.isConnected()) {
+ int n = a.getInputStream().read(buffer);
+ if (n < 1)
+ break;
+ if (SMALL_PACKETS) {
+ for (int i=0; i<n; ++i) {
+ b.getOutputStream().write(buffer, i, 1);
+ b.getOutputStream().flush();
+ try {
+ Thread.sleep(100); // ms
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ } else {
+ b.getOutputStream().write(buffer, 0, n);
+ b.getOutputStream().flush();
+ }
+ }
+ } catch(IOException e) {
+ return; // for debugging
+ }
+ }
+ };
+
+ // B -> A
+ Runnable ba = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ byte buffer[] = new byte[256];
+ while(b.isConnected() && a.isConnected()) {
+ int n = b.getInputStream().read(buffer);
+ if (n < 1)
+ break;
+ if (SMALL_PACKETS) {
+ for (int i=0; i<n; ++i) {
+ a.getOutputStream().write(buffer, i, 1);
+ try {
+ Thread.sleep(100); // ms
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ a.getOutputStream().flush();
+ }
+ } else {
+ a.getOutputStream().write(buffer, 0, n);
+ a.getOutputStream().flush();
+ }
+ }
+ } catch(IOException e) {
+ return; // for debugging
+ }
+ }
+ };
+
+ String t = Thread.currentThread().getName();
+ Thread tab = new Thread(ab, t + " server->client");
+ Thread tba = new Thread(ba, t + " client->server");
+
+ tab.start();
+ tba.start();
+
+ try {
+ while(tba.isAlive() && tab.isAlive()) {
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ DatabaseProxy proxy = new DatabaseProxy();
+ proxy.init();
+ proxy.run();
+ }
+
+}