package org.simantics.db.testing.common; /******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.Map; public class DatabaseProxy implements Runnable { private static boolean SMALL_PACKETS = false; private static int PORT = 8891; private boolean interrupted = false; private Map databases = new HashMap(); private ServerSocket service = null; /** * Initializes database proxy with static mappings. * */ public DatabaseProxy() { databases.put("/public/", "192.168.3.11:9101"); databases.put("FooBar", "localhost:6671"); } public DatabaseProxy(String host, int port) { databases.put("/test/", host+":"+port); } public void start() { Thread t = new Thread(this); init(); t.start(); } public void stop() { interrupted = true; // We have to interrupt the the blocking accept call. // This is one way of doing it. if(service != null) try { service.close(); } catch (IOException e) { } } public int getPort() { return PORT; } public void init() { try { service = new ServerSocket(PORT); } catch (IOException e) { e.printStackTrace(); return; } } public void run() { System.out.println("Accepting connections"); int connectionNumber = 0; while(!interrupted) { try { final Socket socket = service.accept(); String connectionName = "Connection" + ++connectionNumber; System.out.println(connectionName + ": Client connection accepted."); Runnable runnable = new Runnable() { @Override public void run() { try { serve(socket); socket.close(); } catch(IOException e) { e.printStackTrace(); } } }; Thread t = new Thread(runnable, connectionName); t.start(); } catch (IOException e) { if (interrupted) break; // Blocking accept call intentionally interrupted by closing the socket. e.printStackTrace(); } } // Shutdown service try { if(service != null) service.close(); } catch (IOException ignore) { } } /** * Serves socket connection. Waits for a connection line from the client and parses it. * The connection line must be in form "CONNECT\tDatabaseIdentifier" * If the database name is valid, creates a connection to the database and works as a proxy * between the client and the database. * * @param socket * @throws IOException */ private void serve(Socket socket) throws IOException { String connectionName = Thread.currentThread().getName(); BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream(), true); String line = input.readLine(); String columns[] = line.split("\t"); if(columns.length < 2) { System.out.println(connectionName + ": serve() Invalid connection string [1] '"+line+"'"); out.write("ERROR Invalid connection string\n"); return; } String command = columns[0]; String param = columns[1]; /** * CONNECT is the only supported command */ if("CONNECT".equals(command)) { if(databases.containsKey(param) == false) { System.out.println(connectionName + ": serve() Invalid database '"+param+"'"); out.write("ERROR Invalid database '"+param+"'\n"); return; } // The second parameter is directly mapped to database name. In future some parsing is required. String address = databases.get(param); String[] split = address.split(":"); if (split.length != 2) throw new IllegalArgumentException(connectionName+ ": address does not contain a port, missing ':' character"); InetSocketAddress dbAddress = new InetSocketAddress(split[0], Integer.parseInt(split[1])); Socket dbSocket = new Socket(); try { dbSocket.connect(dbAddress); } catch(IOException e) { System.out.println(connectionName + ":serve() Couldn't connect to database '"+param+"'"); out.write("ERROR Couldn't connect to database '"+param+"'\n"); return; } out.write("OK\n"); out.flush(); proxy(dbSocket, socket); dbSocket.close(); } else { System.out.println(connectionName + ": serve() Invalid command '"+command+"'"); out.write("ERROR Invalid command '"+command+"'\n"); return; } } /** * Proxies data from socket a to socket b and vice versa * This method blocks until one of the sockets closes or exception is thrown. * * @param a * @param b */ private void proxy(final Socket a, final Socket b) { // A -> B Runnable ab = new Runnable() { @Override public void run() { try { byte buffer[] = new byte[256]; while(a.isConnected() && b.isConnected()) { int n = a.getInputStream().read(buffer); if (n < 1) break; if (SMALL_PACKETS) { for (int i=0; i A Runnable ba = new Runnable() { @Override public void run() { try { byte buffer[] = new byte[256]; while(b.isConnected() && a.isConnected()) { int n = b.getInputStream().read(buffer); if (n < 1) break; if (SMALL_PACKETS) { for (int i=0; iclient"); Thread tba = new Thread(ba, t + " client->server"); tab.start(); tba.start(); try { while(tba.isAlive() && tab.isAlive()) { Thread.sleep(1000); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @param args */ public static void main(String[] args) { DatabaseProxy proxy = new DatabaseProxy(); proxy.init(); proxy.run(); } }