1 package org.simantics.db.testing.common;
2 /*******************************************************************************
3 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
5 * All rights reserved. This program and the accompanying materials
6 * are made available under the terms of the Eclipse Public License v1.0
7 * which accompanies this distribution, and is available at
8 * http://www.eclipse.org/legal/epl-v10.html
11 * VTT Technical Research Centre of Finland - initial API and implementation
12 *******************************************************************************/
15 import java.io.BufferedReader;
16 import java.io.IOException;
17 import java.io.InputStreamReader;
18 import java.io.PrintWriter;
19 import java.net.InetSocketAddress;
20 import java.net.ServerSocket;
21 import java.net.Socket;
22 import java.util.HashMap;
25 public class DatabaseProxy implements Runnable {
27 private static boolean SMALL_PACKETS = false;
28 private static int PORT = 8891;
29 private boolean interrupted = false;
30 private Map<String, String> databases = new HashMap<String, String>();
32 private ServerSocket service = null;
35 * Initializes database proxy with static mappings.
38 public DatabaseProxy() {
39 databases.put("/public/", "192.168.3.11:9101");
40 databases.put("FooBar", "localhost:6671");
43 public DatabaseProxy(String host, int port) {
44 databases.put("/test/", host+":"+port);
48 Thread t = new Thread(this);
55 // We have to interrupt the the blocking accept call.
56 // This is one way of doing it.
60 } catch (IOException e) {
64 public int getPort() {
70 service = new ServerSocket(PORT);
71 } catch (IOException e) {
78 System.out.println("Accepting connections");
80 int connectionNumber = 0;
83 final Socket socket = service.accept();
84 String connectionName = "Connection" + ++connectionNumber;
85 System.out.println(connectionName + ": Client connection accepted.");
87 Runnable runnable = new Runnable() {
93 } catch(IOException e) {
98 Thread t = new Thread(runnable, connectionName);
100 } catch (IOException e) {
102 break; // Blocking accept call intentionally interrupted by closing the socket.
111 } catch (IOException ignore) {
116 * Serves socket connection. Waits for a connection line from the client and parses it.
117 * The connection line must be in form "CONNECT\tDatabaseIdentifier"
118 * If the database name is valid, creates a connection to the database and works as a proxy
119 * between the client and the database.
122 * @throws IOException
124 private void serve(Socket socket) throws IOException {
125 String connectionName = Thread.currentThread().getName();
127 BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
128 PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
130 String line = input.readLine();
131 String columns[] = line.split("\t");
132 if(columns.length < 2) {
133 System.out.println(connectionName + ": serve() Invalid connection string [1] '"+line+"'");
134 out.write("ERROR Invalid connection string\n");
137 String command = columns[0];
138 String param = columns[1];
141 * CONNECT is the only supported command
143 if("CONNECT".equals(command)) {
144 if(databases.containsKey(param) == false) {
145 System.out.println(connectionName + ": serve() Invalid database '"+param+"'");
146 out.write("ERROR Invalid database '"+param+"'\n");
149 // The second parameter is directly mapped to database name. In future some parsing is required.
150 String address = databases.get(param);
151 String[] split = address.split(":");
152 if (split.length != 2)
153 throw new IllegalArgumentException(connectionName+ ": address does not contain a port, missing ':' character");
154 InetSocketAddress dbAddress = new InetSocketAddress(split[0], Integer.parseInt(split[1]));
156 Socket dbSocket = new Socket();
158 dbSocket.connect(dbAddress);
159 } catch(IOException e) {
160 System.out.println(connectionName + ":serve() Couldn't connect to database '"+param+"'");
161 out.write("ERROR Couldn't connect to database '"+param+"'\n");
167 proxy(dbSocket, socket);
171 System.out.println(connectionName + ": serve() Invalid command '"+command+"'");
172 out.write("ERROR Invalid command '"+command+"'\n");
178 * Proxies data from socket a to socket b and vice versa
179 * This method blocks until one of the sockets closes or exception is thrown.
184 private void proxy(final Socket a, final Socket b) {
186 Runnable ab = new Runnable() {
190 byte buffer[] = new byte[256];
191 while(a.isConnected() && b.isConnected()) {
192 int n = a.getInputStream().read(buffer);
196 for (int i=0; i<n; ++i) {
197 b.getOutputStream().write(buffer, i, 1);
198 b.getOutputStream().flush();
200 Thread.sleep(100); // ms
201 } catch (InterruptedException e) {
202 // TODO Auto-generated catch block
207 b.getOutputStream().write(buffer, 0, n);
208 b.getOutputStream().flush();
211 } catch(IOException e) {
212 return; // for debugging
218 Runnable ba = new Runnable() {
222 byte buffer[] = new byte[256];
223 while(b.isConnected() && a.isConnected()) {
224 int n = b.getInputStream().read(buffer);
228 for (int i=0; i<n; ++i) {
229 a.getOutputStream().write(buffer, i, 1);
231 Thread.sleep(100); // ms
232 } catch (InterruptedException e) {
235 a.getOutputStream().flush();
238 a.getOutputStream().write(buffer, 0, n);
239 a.getOutputStream().flush();
242 } catch(IOException e) {
243 return; // for debugging
248 String t = Thread.currentThread().getName();
249 Thread tab = new Thread(ab, t + " server->client");
250 Thread tba = new Thread(ba, t + " client->server");
256 while(tba.isAlive() && tab.isAlive()) {
259 } catch (InterruptedException e) {
260 // TODO Auto-generated catch block
268 public static void main(String[] args) {
269 DatabaseProxy proxy = new DatabaseProxy();