X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=bundles%2Forg.simantics.db.testing%2Fsrc%2Forg%2Fsimantics%2Fdb%2Ftesting%2Fcommon%2FDatabaseProxy.java;fp=bundles%2Forg.simantics.db.testing%2Fsrc%2Forg%2Fsimantics%2Fdb%2Ftesting%2Fcommon%2FDatabaseProxy.java;h=5e2a70987b48ef790d0c9170057042ccb07ace32;hb=67fd62f9c742337ec80eef658192db198a0efaac;hp=0000000000000000000000000000000000000000;hpb=cde82ba81327d5515fdca362f7f4c70f5103ae80;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.testing/src/org/simantics/db/testing/common/DatabaseProxy.java b/bundles/org.simantics.db.testing/src/org/simantics/db/testing/common/DatabaseProxy.java new file mode 100644 index 000000000..5e2a70987 --- /dev/null +++ b/bundles/org.simantics.db.testing/src/org/simantics/db/testing/common/DatabaseProxy.java @@ -0,0 +1,274 @@ +package org.simantics.db.testing.common; +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ + + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; + +public class DatabaseProxy implements Runnable { + + private static boolean SMALL_PACKETS = false; + private static int PORT = 8891; + private boolean interrupted = false; + private Map databases = new HashMap(); + + private ServerSocket service = null; + + /** + * Initializes database proxy with static mappings. + * + */ + public DatabaseProxy() { + databases.put("/public/", "192.168.3.11:9101"); + databases.put("FooBar", "localhost:6671"); + } + + public DatabaseProxy(String host, int port) { + databases.put("/test/", host+":"+port); + } + + public void start() { + Thread t = new Thread(this); + init(); + t.start(); + } + + public void stop() { + interrupted = true; + // We have to interrupt the the blocking accept call. + // This is one way of doing it. + if(service != null) + try { + service.close(); + } catch (IOException e) { + } + } + + public int getPort() { + return PORT; + } + + public void init() { + try { + service = new ServerSocket(PORT); + } catch (IOException e) { + e.printStackTrace(); + return; + } + } + + public void run() { + System.out.println("Accepting connections"); + + int connectionNumber = 0; + while(!interrupted) { + try { + final Socket socket = service.accept(); + String connectionName = "Connection" + ++connectionNumber; + System.out.println(connectionName + ": Client connection accepted."); + + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + serve(socket); + socket.close(); + } catch(IOException e) { + e.printStackTrace(); + } + } + }; + Thread t = new Thread(runnable, connectionName); + t.start(); + } catch (IOException e) { + if (interrupted) + break; // Blocking accept call intentionally interrupted by closing the socket. + e.printStackTrace(); + } + } + + // Shutdown service + try { + if(service != null) + service.close(); + } catch (IOException ignore) { + } + } + + /** + * Serves socket connection. Waits for a connection line from the client and parses it. + * The connection line must be in form "CONNECT\tDatabaseIdentifier" + * If the database name is valid, creates a connection to the database and works as a proxy + * between the client and the database. + * + * @param socket + * @throws IOException + */ + private void serve(Socket socket) throws IOException { + String connectionName = Thread.currentThread().getName(); + + BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream())); + PrintWriter out = new PrintWriter(socket.getOutputStream(), true); + + String line = input.readLine(); + String columns[] = line.split("\t"); + if(columns.length < 2) { + System.out.println(connectionName + ": serve() Invalid connection string [1] '"+line+"'"); + out.write("ERROR Invalid connection string\n"); + return; + } + String command = columns[0]; + String param = columns[1]; + + /** + * CONNECT is the only supported command + */ + if("CONNECT".equals(command)) { + if(databases.containsKey(param) == false) { + System.out.println(connectionName + ": serve() Invalid database '"+param+"'"); + out.write("ERROR Invalid database '"+param+"'\n"); + return; + } + // The second parameter is directly mapped to database name. In future some parsing is required. + String address = databases.get(param); + String[] split = address.split(":"); + if (split.length != 2) + throw new IllegalArgumentException(connectionName+ ": address does not contain a port, missing ':' character"); + InetSocketAddress dbAddress = new InetSocketAddress(split[0], Integer.parseInt(split[1])); + + Socket dbSocket = new Socket(); + try { + dbSocket.connect(dbAddress); + } catch(IOException e) { + System.out.println(connectionName + ":serve() Couldn't connect to database '"+param+"'"); + out.write("ERROR Couldn't connect to database '"+param+"'\n"); + return; + } + + out.write("OK\n"); + out.flush(); + proxy(dbSocket, socket); + + dbSocket.close(); + } else { + System.out.println(connectionName + ": serve() Invalid command '"+command+"'"); + out.write("ERROR Invalid command '"+command+"'\n"); + return; + } + } + + /** + * Proxies data from socket a to socket b and vice versa + * This method blocks until one of the sockets closes or exception is thrown. + * + * @param a + * @param b + */ + private void proxy(final Socket a, final Socket b) { + // A -> B + Runnable ab = new Runnable() { + @Override + public void run() { + try { + byte buffer[] = new byte[256]; + while(a.isConnected() && b.isConnected()) { + int n = a.getInputStream().read(buffer); + if (n < 1) + break; + if (SMALL_PACKETS) { + for (int i=0; i A + Runnable ba = new Runnable() { + @Override + public void run() { + try { + byte buffer[] = new byte[256]; + while(b.isConnected() && a.isConnected()) { + int n = b.getInputStream().read(buffer); + if (n < 1) + break; + if (SMALL_PACKETS) { + for (int i=0; iclient"); + Thread tba = new Thread(ba, t + " client->server"); + + tab.start(); + tba.start(); + + try { + while(tba.isAlive() && tab.isAlive()) { + Thread.sleep(1000); + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + /** + * @param args + */ + public static void main(String[] args) { + DatabaseProxy proxy = new DatabaseProxy(); + proxy.init(); + proxy.run(); + } + +}